You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/12/11 07:04:24 UTC
[kafka] branch trunk updated: KAFKA-13522: add position tracking and bounding to IQv2 (#11581)
This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new acd1f9c KAFKA-13522: add position tracking and bounding to IQv2 (#11581)
acd1f9c is described below
commit acd1f9c5631ed2aec2d6ab238e6b81c1a9eb47a2
Author: John Roesler <vv...@users.noreply.github.com>
AuthorDate: Sat Dec 11 01:00:59 2021 -0600
KAFKA-13522: add position tracking and bounding to IQv2 (#11581)
* Fill in the Position response in the IQv2 result.
* Enforce PositionBound in IQv2 queries.
* Update integration testing approach to leverage consistent queries.
Reviewers: Patrick Stuedi <ps...@apache.org>, Vicky Papavasileiou <vp...@confluent.io>, Guozhang Wang <gu...@apache.org>
---
.../org/apache/kafka/streams/KafkaStreams.java | 42 ++++++----
.../org/apache/kafka/streams/query/Position.java | 18 +++--
.../apache/kafka/streams/query/PositionBound.java | 37 ++-------
.../kafka/streams/query/StateQueryResult.java | 12 ++-
.../state/internals/CachingKeyValueStore.java | 4 +-
.../state/internals/InMemoryKeyValueStore.java | 7 +-
.../state/internals/InMemorySessionStore.java | 9 ++-
.../state/internals/InMemoryWindowStore.java | 4 +-
.../streams/state/internals/MemoryLRUCache.java | 7 ++
.../state/internals/MemoryNavigableLRUCache.java | 4 +-
.../streams/state/internals/PositionSerde.java | 4 +-
.../state/internals/RocksDBSessionStore.java | 9 ++-
.../streams/state/internals/RocksDBStore.java | 17 ++--
.../state/internals/RocksDBWindowStore.java | 9 ++-
.../streams/state/internals/StoreQueryUtils.java | 39 ++++++++-
.../ConsistencyVectorIntegrationTest.java | 4 +-
.../streams/integration/IQv2IntegrationTest.java | 20 +++--
.../integration/IQv2StoreIntegrationTest.java | 92 ++++++++++++++++------
.../integration/utils/IntegrationTestUtils.java | 78 ++++++++++++++++--
.../kafka/streams/query/PositionBoundTest.java | 4 +-
.../apache/kafka/streams/query/PositionTest.java | 28 +++----
.../AbstractRocksDBSegmentedBytesStoreTest.java | 14 ++--
.../ChangeLoggingKeyValueBytesStoreTest.java | 4 +-
.../streams/state/internals/RocksDBStoreTest.java | 14 ++--
24 files changed, 327 insertions(+), 153 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index ecc1d01..a8f58d8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1771,17 +1771,14 @@ public class KafkaStreams implements AutoCloseable {
final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores();
if (globalStateStores.containsKey(storeName)) {
- final StateStore store = globalStateStores.get(storeName);
- final QueryResult<R> r =
- store.query(
- request.getQuery(),
- request.getPositionBound(),
- request.executionInfoEnabled()
- );
- result.setGlobalResult(r);
+ // See KAFKA-13523
+ result.setGlobalResult(
+ QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "Global stores do not yet support the KafkaStreams#query API. Use KafkaStreams#store instead."
+ )
+ );
} else {
- final Set<Integer> handledPartitions = new HashSet<>();
-
for (final StreamThread thread : threads) {
final Map<TaskId, Task> tasks = thread.allTasks();
for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
@@ -1818,20 +1815,31 @@ public class KafkaStreams implements AutoCloseable {
);
result.addResult(partition, r);
}
- }
- // optimization: if we have handled all the requested partitions,
- // we can return right away.
- handledPartitions.add(partition);
- if (!request.isAllPartitions()
- && handledPartitions.containsAll(request.getPartitions())) {
- return result;
+
+ // optimization: if we have handled all the requested partitions,
+ // we can return right away.
+ if (!request.isAllPartitions()
+ && result.getPartitionResults().keySet().containsAll(request.getPartitions())) {
+ return result;
+ }
}
}
}
}
}
+ if (!request.isAllPartitions()) {
+ for (final Integer partition : request.getPartitions()) {
+ if (!result.getPartitionResults().containsKey(partition)) {
+ result.addResult(partition, QueryResult.forFailure(
+ FailureReason.NOT_PRESENT,
+ "The requested partition was not present at the time of the query."
+ ));
+ }
+ }
+ }
+
return result;
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/Position.java b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
index 951866c..52ceb23 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/Position.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/Position.java
@@ -93,8 +93,7 @@ public class Position {
}
/**
- * Create a new, structurally independent Position that is the result of merging two other
- * Positions.
+ * Merges the provided Position into the current instance.
* <p>
* If both Positions contain the same topic -> partition -> offset mapping, the resulting
* Position will contain a mapping with the larger of the two offsets.
@@ -103,12 +102,10 @@ public class Position {
if (other == null) {
return this;
} else {
- final ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> copy =
- deepCopy(position);
for (final Entry<String, ConcurrentHashMap<Integer, Long>> entry : other.position.entrySet()) {
final String topic = entry.getKey();
final Map<Integer, Long> partitionMap =
- copy.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
+ position.computeIfAbsent(topic, k -> new ConcurrentHashMap<>());
for (final Entry<Integer, Long> partitionOffset : entry.getValue().entrySet()) {
final Integer partition = partitionOffset.getKey();
final Long offset = partitionOffset.getValue();
@@ -118,7 +115,7 @@ public class Position {
}
}
}
- return new Position(copy);
+ return this;
}
}
@@ -132,8 +129,9 @@ public class Position {
/**
* Return the partition -> offset mapping for a specific topic.
*/
- public Map<Integer, Long> getBound(final String topic) {
- return Collections.unmodifiableMap(position.get(topic));
+ public Map<Integer, Long> getPartitionPositions(final String topic) {
+ final ConcurrentHashMap<Integer, Long> bound = position.get(topic);
+ return bound == null ? Collections.emptyMap() : Collections.unmodifiableMap(bound);
}
private static ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> deepCopy(
@@ -174,4 +172,8 @@ public class Position {
throw new UnsupportedOperationException(
"This mutable object is not suitable as a hash key");
}
+
+ public boolean isEmpty() {
+ return position.isEmpty();
+ }
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
index 46d9376..f5ec3c8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/PositionBound.java
@@ -30,63 +30,42 @@ import java.util.Objects;
public class PositionBound {
private final Position position;
- private final boolean unbounded;
- private PositionBound(final Position position, final boolean unbounded) {
- if (unbounded && position != null) {
- throw new IllegalArgumentException();
- } else if (position != null) {
- this.position = position.copy();
- this.unbounded = false;
- } else {
- this.position = null;
- this.unbounded = unbounded;
- }
+ private PositionBound(final Position position) {
+ this.position = position.copy();
}
/**
* Creates a new PositionBound representing "no bound"
*/
public static PositionBound unbounded() {
- return new PositionBound(null, true);
+ return new PositionBound(Position.emptyPosition());
}
/**
* Creates a new PositionBound representing a specific position.
*/
public static PositionBound at(final Position position) {
- return new PositionBound(position, false);
+ return new PositionBound(position);
}
/**
* Returns true iff this object specifies that there is no position bound.
*/
public boolean isUnbounded() {
- return unbounded;
+ return position.isEmpty();
}
/**
* Returns the specific position of this bound.
- *
- * @throws IllegalArgumentException if this is an "unbounded" position.
*/
public Position position() {
- if (unbounded) {
- throw new IllegalArgumentException(
- "Cannot get the position of an unbounded PositionBound."
- );
- } else {
- return position;
- }
+ return position;
}
@Override
public String toString() {
- if (isUnbounded()) {
- return "PositionBound{unbounded}";
- } else {
- return "PositionBound{position=" + position + '}';
- }
+ return "PositionBound{position=" + position + '}';
}
@Override
@@ -98,7 +77,7 @@ public class PositionBound {
return false;
}
final PositionBound that = (PositionBound) o;
- return unbounded == that.unbounded && Objects.equals(position, that.position);
+ return Objects.equals(position, that.position);
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
index 8b93bd6..c5eaa10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
+++ b/streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
@@ -97,11 +97,15 @@ public class StateQueryResult<R> {
* prior observations.
*/
public Position getPosition() {
- Position position = Position.emptyPosition();
- for (final QueryResult<R> r : partitionResults.values()) {
- position = position.merge(r.getPosition());
+ if (globalResult != null) {
+ return globalResult.getPosition();
+ } else {
+ final Position position = Position.emptyPosition();
+ for (final QueryResult<R> r : partitionResults.values()) {
+ position.merge(r.getPosition());
+ }
+ return position;
}
- return position;
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index df22e8c..66683ea 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -107,10 +107,10 @@ public class CachingKeyValueStore
// we can skip flushing to downstream as well as writing to underlying store
if (rawNewValue != null || rawOldValue != null) {
// we need to get the old values if needed, and then put to store, and then flush
- wrapped().put(entry.key(), entry.newValue());
-
final ProcessorRecordContext current = context.recordContext();
context.setRecordContext(entry.entry().context());
+ wrapped().put(entry.key(), entry.newValue());
+
try {
flushListener.apply(
new Record<>(
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 41977cf..5f47c3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -45,13 +45,12 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
private final String name;
private final NavigableMap<Bytes, byte[]> map = new TreeMap<>();
+ private final Position position = Position.emptyPosition();
private volatile boolean open = false;
private StateStoreContext context;
- private Position position;
public InMemoryKeyValueStore(final String name) {
this.name = name;
- this.position = Position.emptyPosition();
}
@Override
@@ -102,7 +101,9 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
query,
positionBound,
collectExecutionInfo,
- this
+ this,
+ position,
+ context.taskId().partition()
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
index d09ab0f..cf351e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
@@ -317,7 +317,14 @@ public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
- return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+ return StoreQueryUtils.handleBasicQueries(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this,
+ position,
+ context.taskId().partition()
+ );
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 46f4973..1b49e9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -357,7 +357,9 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
query,
positionBound,
collectExecutionInfo,
- this
+ this,
+ position,
+ context.taskId().partition()
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index 22f1215..c81b527 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -22,6 +22,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,6 +36,9 @@ import java.util.Objects;
*/
public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
+ protected StateStoreContext context;
+ protected Position position = Position.emptyPosition();
+
public interface EldestEntryRemovalListener {
void apply(Bytes key, byte[] value);
}
@@ -95,6 +99,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
put(Bytes.wrap(key), value);
restoring = false;
});
+ this.context = context;
}
@Override
@@ -122,6 +127,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
} else {
this.map.put(key, value);
}
+ StoreQueryUtils.updatePosition(position, context);
}
@Override
@@ -144,6 +150,7 @@ public class MemoryLRUCache implements KeyValueStore<Bytes, byte[]> {
@Override
public synchronized byte[] delete(final Bytes key) {
Objects.requireNonNull(key);
+ StoreQueryUtils.updatePosition(position, context);
return this.map.remove(key);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
index eda880e..dd2e8a7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -121,7 +121,9 @@ public class MemoryNavigableLRUCache extends MemoryLRUCache {
query,
positionBound,
collectExecutionInfo,
- this
+ this,
+ position,
+ context.taskId().partition()
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/PositionSerde.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/PositionSerde.java
index 7628889..7458ac4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/PositionSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/PositionSerde.java
@@ -78,7 +78,7 @@ public final class PositionSerde {
arraySize += Integer.SIZE; // topic name length
arraySize += topicBytes.length; // topic name itself
- final Map<Integer, Long> partitionOffsets = position.getBound(topic);
+ final Map<Integer, Long> partitionOffsets = position.getPartitionPositions(topic);
arraySize += Integer.SIZE; // Number of PartitionOffset pairs
arraySize += (Integer.SIZE + Long.SIZE)
* partitionOffsets.size(); // partitionOffsets themselves
@@ -93,7 +93,7 @@ public final class PositionSerde {
buffer.put(topics[i]);
final String topic = entries.get(i);
- final Map<Integer, Long> partitionOffsets = position.getBound(topic);
+ final Map<Integer, Long> partitionOffsets = position.getPartitionPositions(topic);
buffer.putInt(partitionOffsets.size());
for (final Entry<Integer, Long> partitionOffset : partitionOffsets.entrySet()) {
buffer.putInt(partitionOffset.getKey());
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
index c171792..e6e2740 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStore.java
@@ -53,7 +53,14 @@ public class RocksDBSessionStore
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
- return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+ return StoreQueryUtils.handleBasicQueries(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this,
+ position,
+ stateStoreContext.taskId().partition()
+ );
}
@Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 0ab9be9..4f07a15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -326,13 +326,18 @@ public class RocksDBStore implements KeyValueStore<Bytes, byte[]>, BatchWritingS
}
@Override
- public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
- final boolean collectExecutionInfo) {
+ public <R> QueryResult<R> query(
+ final Query<R> query,
+ final PositionBound positionBound,
+ final boolean collectExecutionInfo) {
+
return StoreQueryUtils.handleBasicQueries(
- query,
- positionBound,
- collectExecutionInfo,
- this
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this,
+ position,
+ context.taskId().partition()
);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index efcde31..40415c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -131,7 +131,14 @@ public class RocksDBWindowStore
@Override
public <R> QueryResult<R> query(final Query<R> query, final PositionBound positionBound,
final boolean collectExecutionInfo) {
- return StoreQueryUtils.handleBasicQueries(query, positionBound, collectExecutionInfo, this);
+ return StoreQueryUtils.handleBasicQueries(
+ query,
+ positionBound,
+ collectExecutionInfo,
+ this,
+ position,
+ stateStoreContext.taskId().partition()
+ );
}
private void maybeUpdateSeqnumForDups() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
index 6217877..2172b02 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
@@ -24,6 +24,8 @@ import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
+import java.util.Map;
+
public final class StoreQueryUtils {
// make this class uninstantiable
@@ -35,13 +37,19 @@ public final class StoreQueryUtils {
final Query<R> query,
final PositionBound positionBound,
final boolean collectExecutionInfo,
- final StateStore store) {
+ final StateStore store,
+ final Position position,
+ final int partition
+ ) {
final QueryResult<R> result;
final long start = collectExecutionInfo ? System.nanoTime() : -1L;
- // TODO: position tracking
if (query instanceof PingQuery) {
- result = (QueryResult<R>) QueryResult.forResult(true);
+ if (!isPermitted(position, positionBound, partition)) {
+ result = QueryResult.notUpToBound(position, positionBound, partition);
+ } else {
+ result = (QueryResult<R>) QueryResult.forResult(true);
+ }
} else {
result = QueryResult.forUnknownQueryType(query, store);
}
@@ -50,6 +58,7 @@ public final class StoreQueryUtils {
"Handled in " + store.getClass() + " in " + (System.nanoTime() - start) + "ns"
);
}
+ result.setPosition(position);
return result;
}
@@ -62,4 +71,28 @@ public final class StoreQueryUtils {
position.withComponent(meta.topic(), meta.partition(), meta.offset());
}
}
+
+ public static boolean isPermitted(
+ final Position position,
+ final PositionBound positionBound,
+ final int partition
+ ) {
+ final Position bound = positionBound.position();
+ for (final String topic : bound.getTopics()) {
+ final Map<Integer, Long> partitionBounds = bound.getPartitionPositions(topic);
+ final Map<Integer, Long> seenPartitionPositions = position.getPartitionPositions(topic);
+ if (!partitionBounds.containsKey(partition)) {
+ // this topic isn't bounded for our partition, so just skip over it.
+ } else {
+ if (!seenPartitionPositions.containsKey(partition)) {
+ // we haven't seen a partition that we have a bound for
+ return false;
+ } else if (seenPartitionPositions.get(partition) < partitionBounds.get(partition)) {
+ // our current position is behind the bound
+ return false;
+ }
+ }
+ }
+ return true;
+ }
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
index 8c09450..4ef35c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ConsistencyVectorIntegrationTest.java
@@ -144,8 +144,8 @@ public class ConsistencyVectorIntegrationTest {
for (final TestingRocksDBStore store : supplier.stores) {
if (store.getDbDir() != null) {
assertThat(store.getDbDir().toString().contains("/0_0/"), is(true));
- assertThat(store.getPosition().getBound(INPUT_TOPIC_NAME), notNullValue());
- assertThat(store.getPosition().getBound(INPUT_TOPIC_NAME), hasEntry(0, 99L));
+ assertThat(store.getPosition().getPartitionPositions(INPUT_TOPIC_NAME), notNullValue());
+ assertThat(store.getPosition().getPartitionPositions(INPUT_TOPIC_NAME), hasEntry(0, 99L));
count.incrementAndGet();
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
index 140b5ee..4b40a55 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2IntegrationTest.java
@@ -57,8 +57,10 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -97,6 +99,9 @@ public class IQv2IntegrationTest {
private KafkaStreams kafkaStreams;
+ @Rule
+ public TestName testName = new TestName();
+
@BeforeClass
public static void before()
throws InterruptedException, IOException, ExecutionException, TimeoutException {
@@ -221,7 +226,7 @@ public class IQv2IntegrationTest {
final Object lock = stateLock.get(streamThread);
// wait for the desired partitions to be assigned
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+ IntegrationTestUtils.iqv2WaitForPartitions(
kafkaStreams,
inStore(STORE_NAME).withQuery(query),
partitions
@@ -234,7 +239,7 @@ public class IQv2IntegrationTest {
stateField.set(streamThread, State.PARTITIONS_ASSIGNED);
final StateQueryResult<Boolean> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(
+ IntegrationTestUtils.iqv2WaitForPartitions(
kafkaStreams,
request,
partitions
@@ -266,7 +271,7 @@ public class IQv2IntegrationTest {
kafkaStreams.start();
final StateQueryResult<Boolean> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
}
@@ -280,7 +285,7 @@ public class IQv2IntegrationTest {
kafkaStreams.start();
final StateQueryResult<Boolean> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForPartitions(kafkaStreams, request, partitions);
assertThat(result.getPartitionResults().keySet(), equalTo(partitions));
}
@@ -407,7 +412,7 @@ public class IQv2IntegrationTest {
kafkaStreams.start();
final StateQueryResult<Boolean> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
final QueryResult<Boolean> queryResult = result.getPartitionResults().get(partition);
assertThat(queryResult.isFailure(), is(true));
@@ -419,8 +424,9 @@ public class IQv2IntegrationTest {
}
- private static Properties streamsConfiguration() {
- final String safeTestName = IQv2IntegrationTest.class.getName();
+ private Properties streamsConfiguration() {
+ final String safeTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
final Properties config = new Properties();
config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + safeTestName);
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
index 3a6adaf..308ab9c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
@@ -85,7 +86,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.matchesPattern;
import static org.hamcrest.Matchers.not;
-import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;
@Category({IntegrationTest.class})
@@ -248,6 +248,7 @@ public class IQv2StoreIntegrationTest {
public static void before()
throws InterruptedException, IOException, ExecutionException, TimeoutException {
CLUSTER.start();
+ CLUSTER.deleteAllTopicsAndWait(60 * 1000L);
final int partitions = 2;
CLUSTER.createTopic(INPUT_TOPIC_NAME, partitions, 1);
@@ -295,8 +296,14 @@ public class IQv2StoreIntegrationTest {
@Before
public void beforeTest() {
- final StreamsBuilder builder = new StreamsBuilder();
final StoreSupplier<?> supplier = storeToTest.supplier();
+ final Properties streamsConfig = streamsConfiguration(
+ cache,
+ log,
+ storeToTest.name()
+ );
+
+ final StreamsBuilder builder = new StreamsBuilder();
if (supplier instanceof KeyValueBytesStoreSupplier) {
final Materialized<Integer, Integer, KeyValueStore<Bytes, byte[]>> materialized =
Materialized.as((KeyValueBytesStoreSupplier) supplier);
@@ -389,13 +396,10 @@ public class IQv2StoreIntegrationTest {
// Don't need to wait for running, since tests can use iqv2 to wait until they
// get a valid response.
+
kafkaStreams =
IntegrationTestUtils.getStartedStreams(
- streamsConfiguration(
- cache,
- log,
- supplier.getClass().getSimpleName()
- ),
+ streamsConfig,
builder,
true
);
@@ -414,10 +418,31 @@ public class IQv2StoreIntegrationTest {
@Test
public void verifyStore() {
- shouldRejectUnknownQuery();
- shouldHandlePingQuery();
- shouldCollectExecutionInfo();
- shouldCollectExecutionInfoUnderFailure();
+ if (storeToTest.global()) {
+ // See KAFKA-13523
+ globalShouldRejectAllQueries();
+ } else {
+ shouldRejectUnknownQuery();
+ shouldHandlePingQuery();
+ shouldCollectExecutionInfo();
+ shouldCollectExecutionInfoUnderFailure();
+ }
+ }
+
+ private void globalShouldRejectAllQueries() {
+ // See KAFKA-13523
+
+ final PingQuery query = new PingQuery();
+ final StateQueryRequest<Boolean> request = inStore(STORE_NAME).withQuery(query);
+
+ final StateQueryResult<Boolean> result = kafkaStreams.query(request);
+
+ assertThat(result.getGlobalResult().isFailure(), is(true));
+ assertThat(result.getGlobalResult().getFailureReason(),
+ is(FailureReason.UNKNOWN_QUERY_TYPE));
+ assertThat(result.getGlobalResult().getFailureMessage(),
+ is("Global stores do not yet support the KafkaStreams#query API."
+ + " Use KafkaStreams#store instead."));
}
public void shouldRejectUnknownQuery() {
@@ -427,7 +452,7 @@ public class IQv2StoreIntegrationTest {
final Set<Integer> partitions = mkSet(0, 1);
final StateQueryResult<Void> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForPartitions(kafkaStreams, request, partitions);
makeAssertions(
partitions,
@@ -435,7 +460,6 @@ public class IQv2StoreIntegrationTest {
queryResult -> {
assertThat(queryResult.isFailure(), is(true));
assertThat(queryResult.isSuccess(), is(false));
- assertThat(queryResult.getPosition(), is(nullValue()));
assertThat(queryResult.getFailureReason(),
is(FailureReason.UNKNOWN_QUERY_TYPE));
assertThat(queryResult.getFailureMessage(),
@@ -456,12 +480,18 @@ public class IQv2StoreIntegrationTest {
public void shouldHandlePingQuery() {
final PingQuery query = new PingQuery();
- final StateQueryRequest<Boolean> request =
- inStore(STORE_NAME).withQuery(query);
final Set<Integer> partitions = mkSet(0, 1);
+ final StateQueryRequest<Boolean> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .withPartitions(partitions)
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<Boolean> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForResult(
+ kafkaStreams,
+ request
+ );
makeAssertions(
partitions,
@@ -473,9 +503,6 @@ public class IQv2StoreIntegrationTest {
}
assertThat(queryResult.isSuccess(), is(true));
- // TODO: position not implemented
- assertThat(queryResult.getPosition(), is(nullValue()));
-
assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage);
@@ -483,17 +510,25 @@ public class IQv2StoreIntegrationTest {
assertThat(queryResult.getExecutionInfo(), is(empty()));
});
+ assertThat(result.getPosition(), is(INPUT_POSITION));
}
public void shouldCollectExecutionInfo() {
final PingQuery query = new PingQuery();
- final StateQueryRequest<Boolean> request =
- inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
final Set<Integer> partitions = mkSet(0, 1);
+ final StateQueryRequest<Boolean> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .enableExecutionInfo()
+ .withPartitions(partitions)
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<Boolean> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForResult(
+ kafkaStreams,
+ request
+ );
makeAssertions(
partitions,
@@ -505,12 +540,19 @@ public class IQv2StoreIntegrationTest {
public void shouldCollectExecutionInfoUnderFailure() {
final UnknownQuery query = new UnknownQuery();
- final StateQueryRequest<Void> request =
- inStore(STORE_NAME).withQuery(query).enableExecutionInfo();
final Set<Integer> partitions = mkSet(0, 1);
+ final StateQueryRequest<Void> request =
+ inStore(STORE_NAME)
+ .withQuery(query)
+ .enableExecutionInfo()
+ .withPartitions(partitions)
+ .withPositionBound(PositionBound.at(INPUT_POSITION));
final StateQueryResult<Void> result =
- IntegrationTestUtils.iqv2WaitForPartitionsOrGlobal(kafkaStreams, request, partitions);
+ IntegrationTestUtils.iqv2WaitForResult(
+ kafkaStreams,
+ request
+ );
makeAssertions(
partitions,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 6cfc5d4..202fd1f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -51,6 +51,8 @@ import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentListener;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.QueryableStoreType;
@@ -91,6 +93,7 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import static org.apache.kafka.common.utils.Utils.sleep;
import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -116,7 +119,7 @@ public class IntegrationTestUtils {
* Once position bounding is generally supported, we should migrate tests to wait on the
* expected response position.
*/
- public static <R> StateQueryResult<R> iqv2WaitForPartitionsOrGlobal(
+ public static <R> StateQueryResult<R> iqv2WaitForPartitions(
final KafkaStreams kafkaStreams,
final StateQueryRequest<R> request,
final Set<Integer> partitions) {
@@ -125,22 +128,81 @@ public class IntegrationTestUtils {
final long deadline = start + DEFAULT_TIMEOUT;
do {
+ if (Thread.currentThread().isInterrupted()) {
+ fail("Test was interrupted.");
+ }
final StateQueryResult<R> result = kafkaStreams.query(request);
- if (result.getPartitionResults().keySet().containsAll(partitions)
- || result.getGlobalResult() != null) {
+ if (result.getPartitionResults().keySet().containsAll(partitions)) {
return result;
} else {
- try {
- Thread.sleep(100L);
- } catch (final InterruptedException e) {
- throw new RuntimeException(e);
- }
+ sleep(100L);
}
} while (System.currentTimeMillis() < deadline);
throw new TimeoutException("The query never returned the desired partitions");
}
+ /**
+ * Repeatedly runs the query until the response is valid and then return the response.
+ * <p>
+ * Validity in this case means that the response position is up to the specified bound.
+ * <p>
+ * Once position bounding is generally supported, we should migrate tests to wait on the
+ * expected response position.
+ */
+ public static <R> StateQueryResult<R> iqv2WaitForResult(
+ final KafkaStreams kafkaStreams,
+ final StateQueryRequest<R> request) {
+
+ final long start = System.currentTimeMillis();
+ final long deadline = start + DEFAULT_TIMEOUT;
+
+ StateQueryResult<R> result;
+ do {
+ if (Thread.currentThread().isInterrupted()) {
+ fail("Test was interrupted.");
+ }
+
+ result = kafkaStreams.query(request);
+ final LinkedList<QueryResult<R>> allResults = getAllResults(result);
+
+ if (allResults.isEmpty()) {
+ sleep(100L);
+ } else {
+ final boolean needToWait = allResults
+ .stream()
+ .anyMatch(IntegrationTestUtils::needToWait);
+ if (needToWait) {
+ sleep(100L);
+ } else {
+ return result;
+ }
+ }
+ } while (System.currentTimeMillis() < deadline);
+
+ throw new TimeoutException(
+ "The query never returned within the bound. Last result: "
+ + result
+ );
+ }
+
+ private static <R> LinkedList<QueryResult<R>> getAllResults(
+ final StateQueryResult<R> result) {
+ final LinkedList<QueryResult<R>> allResults =
+ new LinkedList<>(result.getPartitionResults().values());
+ if (result.getGlobalResult() != null) {
+ allResults.add(result.getGlobalResult());
+ }
+ return allResults;
+ }
+
+ private static <R> boolean needToWait(final QueryResult<R> queryResult) {
+ return queryResult.isFailure()
+ && (
+ FailureReason.NOT_UP_TO_BOUND.equals(queryResult.getFailureReason())
+ || FailureReason.NOT_PRESENT.equals(queryResult.getFailureReason()));
+ }
+
/*
* Records state transition for StreamThread
*/
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
index fd881bc..57aed4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionBoundTest.java
@@ -50,9 +50,9 @@ public class PositionBoundTest {
}
@Test
- public void unboundedShouldThrowOnPosition() {
+ public void unboundedShouldReturnEmptyPosition() {
final PositionBound bound = PositionBound.unbounded();
- assertThrows(IllegalArgumentException.class, bound::position);
+ assertThat(bound.position(), equalTo(Position.emptyPosition()));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
index ec29993..eafc28e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/query/PositionTest.java
@@ -47,14 +47,14 @@ public class PositionTest {
final Position position = Position.fromMap(map);
assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1")));
- assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+ assertThat(position.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 5L))));
// Should be a copy of the constructor map
map.get("topic1").put(99, 99L);
// so the position is still the original one
- assertThat(position.getBound("topic1"), equalTo(mkMap(
+ assertThat(position.getPartitionPositions("topic1"), equalTo(mkMap(
mkEntry(0, 5L),
mkEntry(7, 0L)
)));
@@ -85,13 +85,13 @@ public class PositionTest {
final Position merged = position.merge(position1);
assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
- assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 7L))));
- assertThat(merged.getBound("topic1"), equalTo(mkMap(
+ assertThat(merged.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 7L))));
+ assertThat(merged.getPartitionPositions("topic1"), equalTo(mkMap(
mkEntry(0, 5L),
mkEntry(7, 0L),
mkEntry(8, 1L)
)));
- assertThat(merged.getBound("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
+ assertThat(merged.getPartitionPositions("topic2"), equalTo(mkMap(mkEntry(9, 5L))));
}
@Test
@@ -99,9 +99,9 @@ public class PositionTest {
final Position position = Position.emptyPosition();
position.withComponent("topic", 3, 5L);
position.withComponent("topic", 3, 4L);
- assertThat(position.getBound("topic").get(3), equalTo(5L));
+ assertThat(position.getPartitionPositions("topic").get(3), equalTo(5L));
position.withComponent("topic", 3, 6L);
- assertThat(position.getBound("topic").get(3), equalTo(6L));
+ assertThat(position.getPartitionPositions("topic").get(3), equalTo(6L));
}
@Test
@@ -123,21 +123,21 @@ public class PositionTest {
// copy has not changed
assertThat(copy.getTopics(), equalTo(mkSet("topic", "topic1")));
- assertThat(copy.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
- assertThat(copy.getBound("topic1"), equalTo(mkMap(
+ assertThat(copy.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+ assertThat(copy.getPartitionPositions("topic1"), equalTo(mkMap(
mkEntry(0, 5L),
mkEntry(7, 0L)
)));
// original has changed
assertThat(position.getTopics(), equalTo(mkSet("topic", "topic1", "topic2")));
- assertThat(position.getBound("topic"), equalTo(mkMap(mkEntry(0, 6L))));
- assertThat(position.getBound("topic1"), equalTo(mkMap(
+ assertThat(position.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 6L))));
+ assertThat(position.getPartitionPositions("topic1"), equalTo(mkMap(
mkEntry(0, 5L),
mkEntry(7, 0L),
mkEntry(8, 1L)
)));
- assertThat(position.getBound("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
+ assertThat(position.getPartitionPositions("topic2"), equalTo(mkMap(mkEntry(2, 4L))));
}
@Test
@@ -153,8 +153,8 @@ public class PositionTest {
final Position merged = position.merge(null);
assertThat(merged.getTopics(), equalTo(mkSet("topic", "topic1")));
- assertThat(merged.getBound("topic"), equalTo(mkMap(mkEntry(0, 5L))));
- assertThat(merged.getBound("topic1"), equalTo(mkMap(
+ assertThat(merged.getPartitionPositions("topic"), equalTo(mkMap(mkEntry(0, 5L))));
+ assertThat(merged.getPartitionPositions("topic1"), equalTo(mkMap(
mkEntry(0, 5L),
mkEntry(7, 0L)
)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
index c826991..32e3386 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStoreTest.java
@@ -592,8 +592,8 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
- assertThat(bytesStore.getPosition().getBound(""), Matchers.notNullValue());
- assertThat(bytesStore.getPosition().getBound(""), hasEntry(0, 3L));
+ assertThat(bytesStore.getPosition().getPartitionPositions(""), Matchers.notNullValue());
+ assertThat(bytesStore.getPosition().getPartitionPositions(""), hasEntry(0, 3L));
}
@Test
@@ -629,10 +629,10 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
- assertThat(bytesStore.getPosition().getBound("A"), Matchers.notNullValue());
- assertThat(bytesStore.getPosition().getBound("A"), hasEntry(0, 3L));
- assertThat(bytesStore.getPosition().getBound("B"), Matchers.notNullValue());
- assertThat(bytesStore.getPosition().getBound("B"), hasEntry(0, 2L));
+ assertThat(bytesStore.getPosition().getPartitionPositions("A"), Matchers.notNullValue());
+ assertThat(bytesStore.getPosition().getPartitionPositions("A"), hasEntry(0, 3L));
+ assertThat(bytesStore.getPosition().getPartitionPositions("B"), Matchers.notNullValue());
+ assertThat(bytesStore.getPosition().getPartitionPositions("B"), hasEntry(0, 2L));
}
@Test
@@ -665,7 +665,7 @@ public abstract class AbstractRocksDBSegmentedBytesStoreTest<S extends Segment>
final List<KeyValue<Windowed<String>, Long>> results = toList(bytesStore.all());
assertEquals(expected, results);
assertThat(bytesStore.getPosition(), Matchers.notNullValue());
- assertThat(bytesStore.getPosition().getBound("A"), hasEntry(0, 2L));
+ assertThat(bytesStore.getPosition().getPartitionPositions("A"), hasEntry(0, 2L));
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index b5e92f0..0d577e6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -270,8 +270,8 @@ public class ChangeLoggingKeyValueBytesStoreTest {
final Header vectorHeader = collector.collected().get(0).headers().lastHeader(ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY);
assertThat(vectorHeader, is(notNullValue()));
final Position position = PositionSerde.deserialize(ByteBuffer.wrap(vectorHeader.value()));
- assertThat(position.getBound(INPUT_TOPIC_NAME), is(notNullValue()));
- assertThat(position.getBound(INPUT_TOPIC_NAME), hasEntry(0, 100L));
+ assertThat(position.getPartitionPositions(INPUT_TOPIC_NAME), is(notNullValue()));
+ assertThat(position.getPartitionPositions(INPUT_TOPIC_NAME), hasEntry(0, 100L));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
index daa4572..3667c0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBStoreTest.java
@@ -1003,8 +1003,8 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
assertThat(rocksDBStore.getPosition(), Matchers.notNullValue());
- assertThat(rocksDBStore.getPosition().getBound(""), Matchers.notNullValue());
- assertThat(rocksDBStore.getPosition().getBound(""), hasEntry(0, 3L));
+ assertThat(rocksDBStore.getPosition().getPartitionPositions(""), Matchers.notNullValue());
+ assertThat(rocksDBStore.getPosition().getPartitionPositions(""), hasEntry(0, 3L));
}
@Test
@@ -1040,10 +1040,10 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "3")))));
assertThat(rocksDBStore.getPosition(), Matchers.notNullValue());
- assertThat(rocksDBStore.getPosition().getBound("A"), Matchers.notNullValue());
- assertThat(rocksDBStore.getPosition().getBound("A"), hasEntry(0, 3L));
- assertThat(rocksDBStore.getPosition().getBound("B"), Matchers.notNullValue());
- assertThat(rocksDBStore.getPosition().getBound("B"), hasEntry(0, 2L));
+ assertThat(rocksDBStore.getPosition().getPartitionPositions("A"), Matchers.notNullValue());
+ assertThat(rocksDBStore.getPosition().getPartitionPositions("A"), hasEntry(0, 3L));
+ assertThat(rocksDBStore.getPosition().getPartitionPositions("B"), Matchers.notNullValue());
+ assertThat(rocksDBStore.getPosition().getPartitionPositions("B"), hasEntry(0, 2L));
}
@Test
@@ -1067,7 +1067,7 @@ public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
rocksDBStore.get(new Bytes(stringSerializer.serialize(null, "1")))));
assertThat(rocksDBStore.getPosition(), Matchers.notNullValue());
- assertThat(rocksDBStore.getPosition().getBound("A"), hasEntry(0, 2L));
+ assertThat(rocksDBStore.getPosition().getPartitionPositions("A"), hasEntry(0, 2L));
}
@Test