You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/05 21:06:16 UTC

[kafka] branch 1.0 updated: KAFKA-6748: double check before scheduling a new task after the punctuate call (#4827)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new c029aac  KAFKA-6748: double check before scheduling a new task after the punctuate call (#4827)
c029aac is described below

commit c029aacdc0c624abd3e471952d756d07e1041470
Author: fredfp <fr...@gmail.com>
AuthorDate: Thu Apr 5 20:17:40 2018 +0200

    KAFKA-6748: double check before scheduling a new task after the punctuate call (#4827)
    
    After the punctuate() call, we would like to double check on the scheduled flag since the call itself may cancel it.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>, John Roesler <jo...@confluent.io>
---
 .../processor/internals/PunctuationQueue.java      |  5 ++-
 .../processor/internals/PunctuationQueueTest.java  | 37 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index 80eda6c..354c602 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -52,7 +52,10 @@ public class PunctuationQueue {
 
                 if (!sched.isCancelled()) {
                     processorNodePunctuator.punctuate(sched.node(), timestamp, type, sched.punctuator());
-                    pq.add(sched.next(timestamp));
+                    // sched can be cancelled from within the punctuator
+                    if (!sched.isCancelled()) {
+                        pq.add(sched.next(timestamp));
+                    }
                     punctuated = true;
                 }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 09c7a0a..e799688 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -126,6 +127,42 @@ public class PunctuationQueueTest {
         assertEquals(4, processor.punctuatedAt.size());
     }
 
+    @Test
+    public void testPunctuationIntervalCancelFromPunctuator() {
+        final TestProcessor processor = new TestProcessor();
+        final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
+        final PunctuationQueue queue = new PunctuationQueue();
+        final Punctuator punctuator = new Punctuator() {
+            @Override
+            public void punctuate(long timestamp) {
+                node.processor().punctuate(timestamp);
+            }
+        };
+
+        final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
+        final long now = sched.timestamp - 100L;
+
+        final Cancellable cancellable = queue.schedule(sched);
+
+        ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
+            @Override
+            public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) {
+                punctuator.punctuate(time);
+                // simulate scheduler cancelled from within punctuator
+                cancellable.cancel();
+            }
+        };
+
+        queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(0, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+
+        queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+        assertEquals(1, processor.punctuatedAt.size());
+    }
+
     private static class TestProcessor extends AbstractProcessor<String, String> {
 
         public final ArrayList<Long> punctuatedAt = new ArrayList<>();

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.