You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/05 21:06:16 UTC
[kafka] branch 1.0 updated: KAFKA-6748: double check before
scheduling a new task after the punctuate call (#4827)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.0 by this push:
new c029aac KAFKA-6748: double check before scheduling a new task after the punctuate call (#4827)
c029aac is described below
commit c029aacdc0c624abd3e471952d756d07e1041470
Author: fredfp <fr...@gmail.com>
AuthorDate: Thu Apr 5 20:17:40 2018 +0200
KAFKA-6748: double check before scheduling a new task after the punctuate call (#4827)
After the punctuate() call, we would like to double check on the scheduled flag since the call itself may cancel it.
Reviewers: Guozhang Wang <wa...@gmail.com>, John Roesler <jo...@confluent.io>
---
.../processor/internals/PunctuationQueue.java | 5 ++-
.../processor/internals/PunctuationQueueTest.java | 37 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
index 80eda6c..354c602 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PunctuationQueue.java
@@ -52,7 +52,10 @@ public class PunctuationQueue {
if (!sched.isCancelled()) {
processorNodePunctuator.punctuate(sched.node(), timestamp, type, sched.punctuator());
- pq.add(sched.next(timestamp));
+ // sched can be cancelled from within the punctuator
+ if (!sched.isCancelled()) {
+ pq.add(sched.next(timestamp));
+ }
punctuated = true;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
index 09c7a0a..e799688 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PunctuationQueueTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
@@ -126,6 +127,42 @@ public class PunctuationQueueTest {
assertEquals(4, processor.punctuatedAt.size());
}
+ @Test
+ public void testPunctuationIntervalCancelFromPunctuator() {
+ final TestProcessor processor = new TestProcessor();
+ final ProcessorNode<String, String> node = new ProcessorNode<>("test", processor, null);
+ final PunctuationQueue queue = new PunctuationQueue();
+ final Punctuator punctuator = new Punctuator() {
+ @Override
+ public void punctuate(long timestamp) {
+ node.processor().punctuate(timestamp);
+ }
+ };
+
+ final PunctuationSchedule sched = new PunctuationSchedule(node, 0L, 100L, punctuator);
+ final long now = sched.timestamp - 100L;
+
+ final Cancellable cancellable = queue.schedule(sched);
+
+ ProcessorNodePunctuator processorNodePunctuator = new ProcessorNodePunctuator() {
+ @Override
+ public void punctuate(ProcessorNode node, long time, PunctuationType type, Punctuator punctuator) {
+ punctuator.punctuate(time);
+ // simulate scheduler cancelled from within punctuator
+ cancellable.cancel();
+ }
+ };
+
+ queue.mayPunctuate(now, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(0, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 100L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(1, processor.punctuatedAt.size());
+
+ queue.mayPunctuate(now + 200L, PunctuationType.STREAM_TIME, processorNodePunctuator);
+ assertEquals(1, processor.punctuatedAt.size());
+ }
+
private static class TestProcessor extends AbstractProcessor<String, String> {
public final ArrayList<Long> punctuatedAt = new ArrayList<>();
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.