You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/08 22:53:15 UTC

[GitHub] [kafka] vvcephei opened a new pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

vvcephei opened a new pull request #11581:
URL: https://github.com/apache/kafka/pull/11581


   * Fill in the Position response in the IQv2 result.
   * Enforce PositionBound in IQv2 queries.
   * Update integration testing approach to leverage consistent queries.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11581:
URL: https://github.com/apache/kafka/pull/11581


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766221104



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1764,20 +1764,18 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
             );
         }
         final StateQueryResult<R> result = new StateQueryResult<>();
+        final Set<Integer> handledPartitions = new HashSet<>();

Review comment:
       Oh, sorry about that. That was an artifact from a recent refactor. But now that I'm looking at it, I realize we don't need a separate set for tracking this, since we can use the result's partition set itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r767028987



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,28 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(

Review comment:
       I was hesitant to add anything extra to the Position class, but there's no particular reason for it.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -125,22 +128,81 @@
         final long deadline = start + DEFAULT_TIMEOUT;
 
         do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
             final StateQueryResult<R> result = kafkaStreams.query(request);
-            if (result.getPartitionResults().keySet().containsAll(partitions)
-                || result.getGlobalResult() != null) {
+            if (result.getPartitionResults().keySet().containsAll(partitions)) {
                 return result;
             } else {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                sleep(100L);
             }
         } while (System.currentTimeMillis() < deadline);
 
         throw new TimeoutException("The query never returned the desired partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response position is up to the specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(

Review comment:
       Thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       I'm not sure I follow; we always did set the context, we just did it (incorrectly) between the `wrapped().put` and forwarding downstream. Why should we do the `wrapped().put` with the wrong context and then forward with the right context?
   
   Taking your example sequence, the correct offset for record `A` is 0.
   The old behavior was that we would do `wrapped().put(A)` with offset 2 and then forward `A` with offset 0.
   The new behavior is that we do `wrapped().put(A)` with offset 0 and then forward `A` with offset 0.
   
   There's no scenario in which we would forward A with offset 2.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -97,11 +97,15 @@ public void addResult(final int partition, final QueryResult<R> r) {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {

Review comment:
       That's correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766731204



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -125,22 +128,81 @@
         final long deadline = start + DEFAULT_TIMEOUT;
 
         do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
             final StateQueryResult<R> result = kafkaStreams.query(request);
-            if (result.getPartitionResults().keySet().containsAll(partitions)
-                || result.getGlobalResult() != null) {
+            if (result.getPartitionResults().keySet().containsAll(partitions)) {
                 return result;
             } else {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                sleep(100L);
             }
         } while (System.currentTimeMillis() < deadline);
 
         throw new TimeoutException("The query never returned the desired partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response position is up to the specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(

Review comment:
       Nice!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766899186



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       I think there's some rationales to not set the processor context for the evicted record, e.g. let's say we first put record A offset 0 into the cache, and then record B offset 1, and then record C offset 2, the third insert caused the cache to evict A, which will then be written to the underlying store and also be forwarded downstreams, if we set the metadata as offset `2` for record `A` that may not be correct --- but I admit that if we do not set it, then it would be either some order context like offset `1` or even just a null, but it seems forwarding `A` with offset `2` is not appropriate still since we would prefer to forward `A` with offset `0` ideally




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #11581:
URL: https://github.com/apache/kafka/pull/11581


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r767028987



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,28 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(

Review comment:
       I was hesitant to add anything extra to the Position class, but there's no particular reason for it.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -125,22 +128,81 @@
         final long deadline = start + DEFAULT_TIMEOUT;
 
         do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
             final StateQueryResult<R> result = kafkaStreams.query(request);
-            if (result.getPartitionResults().keySet().containsAll(partitions)
-                || result.getGlobalResult() != null) {
+            if (result.getPartitionResults().keySet().containsAll(partitions)) {
                 return result;
             } else {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                sleep(100L);
             }
         } while (System.currentTimeMillis() < deadline);
 
         throw new TimeoutException("The query never returned the desired partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response position is up to the specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(

Review comment:
       Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vpapavas commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vpapavas commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766726224



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,28 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(

Review comment:
       Should we add this as a method in `Position`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r765750361



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {

Review comment:
       I know we've discussed this, but just for the record :-), I believe the type PositionBound could be saved, unbounded is basically an empty Position. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {

Review comment:
       I still think this entire check should be method on Position, as in, Position::dominates(PositionBound)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r767030936



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       I'm not sure I follow; we always did set the context, we just did it (incorrectly) between the `wrapped().put` and forwarding downstream. Why should we do the `wrapped().put` with the wrong context and then forward with the right context?
   
   Taking your example sequence, the correct offset for record `A` is 0.
   The old behavior was that we would do `wrapped().put(A)` with offset 2 and then forward `A` with offset 0.
   The new behavior is that we do `wrapped().put(A)` with offset 0 and then forward `A` with offset 0.
   
   There's no scenario in which we would forward A with offset 2.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766893813



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -97,11 +97,15 @@ public void addResult(final int partition, final QueryResult<R> r) {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {

Review comment:
       Okay, but still just to clarify, at least for now we do not expect the `if` block to ever trigger right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766163659



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {
+            return true;
+        } else {
+            final Position bound = positionBound.position();
+            for (final String topic : bound.getTopics()) {
+                final Map<Integer, Long> partitionBounds = bound.getBound(topic);
+                final Map<Integer, Long> seenPartitionBounds = position.getBound(topic);

Review comment:
       qq: why we name this function of position `getBound` while it seems just retrieving the current positions of the topic, not really a bound?
   
   Also for this local variables, similarly, why name it `seenPartitionBounds` if the returned value semantics are not really "bounds"? Should that just be `currentOffsets` or something?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       Why reorder the steps here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1764,20 +1764,18 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
             );
         }
         final StateQueryResult<R> result = new StateQueryResult<>();
+        final Set<Integer> handledPartitions = new HashSet<>();

Review comment:
       Why moving it out of the else block scope?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -125,22 +128,81 @@
         final long deadline = start + DEFAULT_TIMEOUT;
 
         do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
             final StateQueryResult<R> result = kafkaStreams.query(request);
-            if (result.getPartitionResults().keySet().containsAll(partitions)
-                || result.getGlobalResult() != null) {
+            if (result.getPartitionResults().keySet().containsAll(partitions)) {
                 return result;
             } else {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                sleep(100L);
             }
         } while (System.currentTimeMillis() < deadline);
 
         throw new TimeoutException("The query never returned the desired partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response position is up to the specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(
+        final KafkaStreams kafkaStreams,
+        final StateQueryRequest<R> request) {
+
+        final long start = System.currentTimeMillis();
+        final long deadline = start + DEFAULT_TIMEOUT;
+
+        StateQueryResult<R> result;
+        do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
+
+            result = kafkaStreams.query(request);
+            final LinkedList<QueryResult<R>> allResults = getAllResults(result);
+
+            if (allResults.isEmpty()) {

Review comment:
       Just curious when would the result be empty actually? I thought even if the tasks were not initialized, we would still return the `NOT_PRESENT` error instead of returning empty.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -97,11 +97,15 @@ public void addResult(final int partition, final QueryResult<R> r) {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {

Review comment:
       Not sure I understand this: in this PR we've effectively turned off the only caller of "result.setGlobalResult(r);" so this should never hit any more right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#issuecomment-989876770


   Looks good, thanks John!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766220034



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {

Review comment:
       Ok, so the situation is a little different now than when I originally added a separate PositionBound class. At that point, we also needed a bound representing "latest" (i.e., that the query is executed on an active running task), for which there's no sensible Position representation.
   
   My mind was still half in that world last time we spoke about this point. Now, I can see that the only special position bound is the "unbounded" one, which is the same thing semantically as an empty position.
   
   I just tried out a change to completely get rid of the PositionBound class, but I think it makes the API more confusing. As a user, it seems more clear to create an "unbounded" PositionBound than an "empty" Position. I think it makes sense if you think about in terms of comparing vectors (an empty vector is "less than" all other vectors, so when it's used as a lower bound, it permits everything). But I don't want people to have to think that hard about it.
   
   Another option I considered is to add a `Position.unbounded()` factory, but that doesn't completely make sense either, since a Position is just a point in vector space. It doesn't bound anything by itself, though it can be used as a bound.
   
   Plus, I think query handling implementation, both for Streams and for custom user stores, is easier to keep track of if you have two types. You simply can't mix up which Position was supposed to be the bound.
   
   On balance, it still seems better to keep the separate PositionBound class. However, I did realize that the logic of PositionBound and the internal logic in Streams can be simplified with this observation that an unbounded position is equivalent to an empty position. I just added that commit to this PR.
   
   Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#issuecomment-991498463


   Thanks for the reviews, all!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#issuecomment-991497134






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766223374



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       This is actually important. We were not previously setting the record context before passing the cache-evicted record down to the lower store layers. Previously, the context was incorrectly not set during that operation, and if stores relied on the record context (via the old ProcessorContext), they were getting the wrong metadata.
   
   Apparently, this work is the first time we added a feature in Streams that actually relied on that metadata. What is happening now is that we use that metadata to set the Position in the lower store, and if it's not set, then we get an error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r767034399



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       You are right! This is a good fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766152319



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {

Review comment:
       Hmm, I think I finally see what you are talking about. Let me give it a shot...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r767034399



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       You are right! This is a good fix.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r767031044



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -97,11 +97,15 @@ public void addResult(final int partition, final QueryResult<R> r) {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {

Review comment:
       That's correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#issuecomment-991497134


   The test failures are unrelated:
   ```
   
   Build / JDK 11 and Scala 2.13 / kafka.security.authorizer.AclAuthorizerWithZkSaslTest.testAclUpdateWithSessionExpiration() | 16 sec | 1
   
   Build  / JDK 8 and Scala 2.12 /  org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766226822



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -125,22 +128,81 @@
         final long deadline = start + DEFAULT_TIMEOUT;
 
         do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
             final StateQueryResult<R> result = kafkaStreams.query(request);
-            if (result.getPartitionResults().keySet().containsAll(partitions)
-                || result.getGlobalResult() != null) {
+            if (result.getPartitionResults().keySet().containsAll(partitions)) {
                 return result;
             } else {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                sleep(100L);
             }
         } while (System.currentTimeMillis() < deadline);
 
         throw new TimeoutException("The query never returned the desired partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response position is up to the specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(
+        final KafkaStreams kafkaStreams,
+        final StateQueryRequest<R> request) {
+
+        final long start = System.currentTimeMillis();
+        final long deadline = start + DEFAULT_TIMEOUT;
+
+        StateQueryResult<R> result;
+        do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
+
+            result = kafkaStreams.query(request);
+            final LinkedList<QueryResult<R>> allResults = getAllResults(result);
+
+            if (allResults.isEmpty()) {

Review comment:
       You'll only get a `NOT_PRESENT` response if you specifically request a partition. The default is to just get all locally present partitions. This check is actually just an assumption that in the context of an integration test, if you call this method, you're probably expecting at least one result.
   
   It is good to note, though, that if a test is looking for results for a specific set of partitions, it should include that in the query.

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {
+            return true;
+        } else {
+            final Position bound = positionBound.position();
+            for (final String topic : bound.getTopics()) {
+                final Map<Integer, Long> partitionBounds = bound.getBound(topic);
+                final Map<Integer, Long> seenPartitionBounds = position.getBound(topic);

Review comment:
       Woah, you're absolutely right. I'll fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766221800



##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -97,11 +97,15 @@ public void addResult(final int partition, final QueryResult<R> r) {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {

Review comment:
       That's correct, but since the old version of this method was incorrect, I figured I'd go ahead and fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] patrickstuedi commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

Posted by GitBox <gi...@apache.org>.
patrickstuedi commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766603920



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {

Review comment:
       Sounds good, thanks for thinking about it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org