You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/09 21:26:16 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

guozhangwang commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766163659



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##########
@@ -62,4 +71,32 @@ public static void updatePosition(
             position.withComponent(meta.topic(), meta.partition(), meta.offset());
         }
     }
+
+    public static boolean isPermitted(
+        final Position position,
+        final PositionBound positionBound,
+        final int partition
+    ) {
+        if (positionBound.isUnbounded()) {
+            return true;
+        } else {
+            final Position bound = positionBound.position();
+            for (final String topic : bound.getTopics()) {
+                final Map<Integer, Long> partitionBounds = bound.getBound(topic);
+                final Map<Integer, Long> seenPartitionBounds = position.getBound(topic);

Review comment:
       qq: why we name this function of position `getBound` while it seems just retrieving the current positions of the topic, not really a bound?
   
   Also for this local variables, similarly, why name it `seenPartitionBounds` if the returned value semantics are not really "bounds"? Should that just be `currentOffsets` or something?

##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##########
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry,
             // we can skip flushing to downstream as well as writing to underlying store
             if (rawNewValue != null || rawOldValue != null) {
                 // we need to get the old values if needed, and then put to store, and then flush
-                wrapped().put(entry.key(), entry.newValue());
-
                 final ProcessorRecordContext current = context.recordContext();
                 context.setRecordContext(entry.entry().context());
+                wrapped().put(entry.key(), entry.newValue());

Review comment:
       Why reorder the steps here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1764,20 +1764,18 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) {
             );
         }
         final StateQueryResult<R> result = new StateQueryResult<>();
+        final Set<Integer> handledPartitions = new HashSet<>();

Review comment:
       Why moving it out of the else block scope?

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##########
@@ -125,22 +128,81 @@
         final long deadline = start + DEFAULT_TIMEOUT;
 
         do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
             final StateQueryResult<R> result = kafkaStreams.query(request);
-            if (result.getPartitionResults().keySet().containsAll(partitions)
-                || result.getGlobalResult() != null) {
+            if (result.getPartitionResults().keySet().containsAll(partitions)) {
                 return result;
             } else {
-                try {
-                    Thread.sleep(100L);
-                } catch (final InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+                sleep(100L);
             }
         } while (System.currentTimeMillis() < deadline);
 
         throw new TimeoutException("The query never returned the desired partitions");
     }
 
+    /**
+     * Repeatedly runs the query until the response is valid and then return the response.
+     * <p>
+     * Validity in this case means that the response position is up to the specified bound.
+     * <p>
+     * Once position bounding is generally supported, we should migrate tests to wait on the
+     * expected response position.
+     */
+    public static <R> StateQueryResult<R> iqv2WaitForResult(
+        final KafkaStreams kafkaStreams,
+        final StateQueryRequest<R> request) {
+
+        final long start = System.currentTimeMillis();
+        final long deadline = start + DEFAULT_TIMEOUT;
+
+        StateQueryResult<R> result;
+        do {
+            if (Thread.currentThread().isInterrupted()) {
+                fail("Test was interrupted.");
+            }
+
+            result = kafkaStreams.query(request);
+            final LinkedList<QueryResult<R>> allResults = getAllResults(result);
+
+            if (allResults.isEmpty()) {

Review comment:
       Just curious when would the result be empty actually? I thought even if the tasks were not initialized, we would still return the `NOT_PRESENT` error instead of returning empty.

##########
File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##########
@@ -97,11 +97,15 @@ public void addResult(final int partition, final QueryResult<R> r) {
      * prior observations.
      */
     public Position getPosition() {
-        Position position = Position.emptyPosition();
-        for (final QueryResult<R> r : partitionResults.values()) {
-            position = position.merge(r.getPosition());
+        if (globalResult != null) {

Review comment:
       Not sure I understand this: in this PR we've effectively turned off the only caller of "result.setGlobalResult(r);" so this should never hit any more right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org