You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/11/21 20:42:24 UTC
samza git commit: SAMZA-1494; Flush operator state at end of stream
Repository: samza
Updated Branches:
refs/heads/master f2c14fa63 -> 7a2e19250
SAMZA-1494; Flush operator state at end of stream
- Propagate operator messages at endOfStream to all down-stream operators.
- Emit all pending windows when endOfStream is reached.
- Flush all state on endOfStream irrespective of auto-commit behavior.
Author: Jagadish <jv...@linkedin.com>
Reviewers: Xinyu Liu <xi...@linkedin.com>
Closes #366 from vjagadish1989/eos-flush
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7a2e1925
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7a2e1925
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7a2e1925
Branch: refs/heads/master
Commit: 7a2e19250f5a74f34eef1d53de5d52399850bc89
Parents: f2c14fa
Author: Jagadish <jv...@linkedin.com>
Authored: Tue Nov 21 12:42:17 2017 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue Nov 21 12:42:17 2017 -0800
----------------------------------------------------------------------
.../org/apache/samza/config/TaskConfigJava.java | 2 +-
.../samza/operators/impl/OperatorImpl.java | 13 +-
.../operators/impl/PartitionByOperatorImpl.java | 3 +-
.../operators/impl/WindowOperatorImpl.java | 13 +
.../org/apache/samza/config/TaskConfig.scala | 5 +
.../apache/samza/config/TestTaskConfigJava.java | 20 +
.../samza/operators/TestWindowOperator.java | 565 ----------------
.../operators/impl/TestWindowOperator.java | 677 +++++++++++++++++++
8 files changed, 728 insertions(+), 570 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
index 0bf078e..04e15dc 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java
@@ -116,7 +116,7 @@ public class TaskConfigJava extends MapConfig {
Set<SystemStream> allInputSS = new HashSet<>();
TaskConfig taskConfig = TaskConfig.Config2Task(this);
- allInputSS.addAll(JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava());
+ allInputSS.addAll((Set<? extends SystemStream>) JavaConverters.setAsJavaSetConverter(taskConfig.getInputStreams()).asJava());
allInputSS.addAll(getBroadcastSystemStreams());
return Collections.unmodifiableSet(allInputSS);
http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 92a563a..862e5f9 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -266,6 +266,7 @@ public abstract class OperatorImpl<M, RM> {
if (eosStates.allEndOfStream()) {
// all inputs have been end-of-stream, shut down the task
LOG.info("All input streams have reached the end for task {}", taskName.getTaskName());
+ coordinator.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
}
@@ -279,7 +280,12 @@ public abstract class OperatorImpl<M, RM> {
*/
private final void onEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
if (inputStreams.stream().allMatch(input -> eosStates.isEndOfStream(input))) {
- handleEndOfStream(collector, coordinator);
+ Collection<RM> results = handleEndOfStream(collector, coordinator);
+
+ results.forEach(rm ->
+ this.registeredOperators.forEach(op ->
+ op.onMessage(rm, collector, coordinator)));
+
this.registeredOperators.forEach(op -> op.onEndOfStream(collector, coordinator));
}
}
@@ -291,9 +297,10 @@ public abstract class OperatorImpl<M, RM> {
* override this to actually propagate EOS over the wire.
* @param collector message collector
* @param coordinator task coordinator
+ * @return results to be emitted when this operator reaches end-of-stream
*/
- protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
- //Do nothing by default
+ protected Collection<RM> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+ return Collections.emptyList();
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
index 3811da8..424c10f 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -91,8 +91,9 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
}
@Override
- protected void handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+ protected Collection<Void> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
sendControlMessage(new EndOfStreamMessage(taskName), collector);
+ return Collections.emptyList();
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index 7031ff9..32406cb 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -53,9 +53,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -195,6 +197,17 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje
}
@Override
+ protected Collection<WindowPane<K, Object>> handleEndOfStream(MessageCollector collector, TaskCoordinator coordinator) {
+ List<WindowPane<K, Object>> results = new ArrayList<>();
+ Set<TriggerKey<K>> triggerKeys = new HashSet<>(triggers.keySet());
+ for(TriggerKey<K> triggerKey : triggerKeys) {
+ Optional<WindowPane<K, Object>> triggerResult = onTriggerFired(triggerKey, collector, coordinator);
+ triggerResult.ifPresent(results::add);
+ }
+ return results;
+ }
+
+ @Override
protected void handleClose() {
if (foldLeftFn != null) {
foldLeftFn.close();
http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 2ee0032..419e15b 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -137,4 +137,9 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case Some(asyncCommit) => Some(asyncCommit.toBoolean)
case _ => None
}
+
+ def isAutoCommitEnabled() = getOption(TaskConfig.COMMIT_MS) match {
+ case Some(commitMs) => commitMs.toInt > 0
+ case _ => true
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
index 878ca01..baf2d4f 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestTaskConfigJava.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
+import com.google.common.collect.ImmutableMap;
import org.apache.samza.Partition;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Test;
@@ -70,4 +71,23 @@ public class TestTaskConfigJava {
}
assertTrue(invalidFormatException);
}
+
+ @Test
+ public void testAutoCommitConfig() {
+ // positive values of commit.ms => autoCommit = true
+ Config config1 = new MapConfig(ImmutableMap.of("task.commit.ms", "1"));
+ assertTrue(new TaskConfig(config1).isAutoCommitEnabled());
+
+ // no value for commit.ms => autoCommit = true
+ Config config2 = new MapConfig(ImmutableMap.of());
+ assertTrue(new TaskConfig(config2).isAutoCommitEnabled());
+
+ // A zero value for commit.ms => autoCommit = false
+ Config config3 = new MapConfig(ImmutableMap.of("task.commit.ms", "0"));
+ assertFalse(new TaskConfig(config3).isAutoCommitEnabled());
+
+ // negative value for commit.ms => autoCommit = false
+ Config config4 = new MapConfig(ImmutableMap.of("task.commit.ms", "-1"));
+ assertFalse(new TaskConfig(config4).isAutoCommitEnabled());
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
deleted file mode 100644
index 8abfb92..0000000
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ /dev/null
@@ -1,565 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import junit.framework.Assert;
-import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplication;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.container.TaskContextImpl;
-import org.apache.samza.metrics.MetricsRegistryMap;
-import org.apache.samza.operators.impl.store.TestInMemoryStore;
-import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
-import org.apache.samza.operators.triggers.FiringType;
-import org.apache.samza.operators.triggers.Trigger;
-import org.apache.samza.operators.triggers.Triggers;
-import org.apache.samza.operators.windows.AccumulationMode;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.runtime.ApplicationRunner;
-import org.apache.samza.serializers.IntegerSerde;
-import org.apache.samza.serializers.KVSerde;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.IncomingMessageEnvelope;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.StreamSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.system.SystemStreamPartition;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.StreamOperatorTask;
-import org.apache.samza.task.TaskCoordinator;
-import org.apache.samza.testUtils.TestClock;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.function.Function;
-
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestWindowOperator {
- private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
- private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
- private Config config;
- private TaskContextImpl taskContext;
- private ApplicationRunner runner;
-
- @Before
- public void setup() throws Exception {
- config = mock(Config.class);
- when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
- when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
- taskContext = mock(TaskContextImpl.class);
- runner = mock(ApplicationRunner.class);
- Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
- Serde storeValSerde = new IntegerEnvelopeSerde();
-
- when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
- .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
- when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-
- when(taskContext.getStore("jobName-jobId-window-w1"))
- .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
- when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
- }
-
- @Test
- public void testTumblingWindowsDiscardingMode() throws Exception {
-
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
- List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-
- TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- task.init(config, taskContext);
- MessageCollector messageCollector =
- envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
- integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
- testClock.advanceTime(Duration.ofSeconds(1));
-
- task.window(messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 5);
- Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
- Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
-
- Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
- Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
-
- Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
- Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
-
- Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
- Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
-
- Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
- Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
- }
-
- @Test
- public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
-
- StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
- List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-
- TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- task.init(config, taskContext);
-
- MessageCollector messageCollector =
- envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
- Assert.assertEquals(windowPanes.size(), 0);
-
- integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
- Assert.assertEquals(windowPanes.size(), 0);
-
- testClock.advanceTime(Duration.ofSeconds(1));
- Assert.assertEquals(windowPanes.size(), 0);
-
- task.window(messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 1);
- Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9);
- }
-
- @Test
- public void testTumblingAggregatingWindowsDiscardingMode() throws Exception {
-
- when(taskContext.getStore("jobName-jobId-window-w1"))
- .thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde()));
-
- StreamApplication sgb = new AggregateTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
- List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>();
-
- TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- task.init(config, taskContext);
- MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage());
- integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
- testClock.advanceTime(Duration.ofSeconds(1));
-
- task.window(messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 5);
- Assert.assertEquals(windowPanes.get(0).getMessage(), new Integer(2));
- Assert.assertEquals(windowPanes.get(1).getMessage(), new Integer(2));
- Assert.assertEquals(windowPanes.get(2).getMessage(), new Integer(2));
- Assert.assertEquals(windowPanes.get(3).getMessage(), new Integer(2));
- Assert.assertEquals(windowPanes.get(4).getMessage(), new Integer(1));
- }
-
- @Test
- public void testTumblingWindowsAccumulatingMode() throws Exception {
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
- List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- task.init(config, taskContext);
-
- MessageCollector messageCollector =
- envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
- integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
- testClock.advanceTime(Duration.ofSeconds(1));
- task.window(messageCollector, taskCoordinator);
-
- Assert.assertEquals(windowPanes.size(), 7);
- Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
- Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
-
- Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
- Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
-
- Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
- Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
-
- Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
- Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
- }
-
- @Test
- public void testSessionWindowsDiscardingMode() throws Exception {
- StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
- TestClock testClock = new TestClock();
- List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- task.init(config, taskContext);
- MessageCollector messageCollector =
- envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- testClock.advanceTime(Duration.ofSeconds(1));
- task.window(messageCollector, taskCoordinator);
-
- Assert.assertEquals(windowPanes.size(), 1);
- Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
- Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
-
- task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
-
- testClock.advanceTime(Duration.ofSeconds(1));
- task.window(messageCollector, taskCoordinator);
-
- Assert.assertEquals(windowPanes.size(), 3);
- Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
- Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
- Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
- Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
- Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
- Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
-
- task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-
- testClock.advanceTime(Duration.ofSeconds(1));
- task.window(messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 4);
- Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
- Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
- Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
-
- }
-
- @Test
- public void testSessionWindowsAccumulatingMode() throws Exception {
- StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofMillis(500));
- TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-
- MessageCollector messageCollector =
- envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
- task.init(config, taskContext);
-
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- testClock.advanceTime(Duration.ofSeconds(1));
-
- task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-
- task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
-
- testClock.advanceTime(Duration.ofSeconds(1));
- task.window(messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 2);
- Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
- Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
- Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
- Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
- Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
- }
-
- @Test
- public void testCancellationOfOnceTrigger() throws Exception {
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
- Duration.ofSeconds(1), Triggers.count(2));
- TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- task.init(config, taskContext);
-
- List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- MessageCollector messageCollector =
- envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 1);
- Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
- Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
- Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
-
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-
- Assert.assertEquals(windowPanes.size(), 1);
-
- testClock.advanceTime(Duration.ofSeconds(1));
- task.window(messageCollector, taskCoordinator);
-
- Assert.assertEquals(windowPanes.size(), 2);
- Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
- Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
- Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
-
- task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
- testClock.advanceTime(Duration.ofSeconds(1));
- task.window(messageCollector, taskCoordinator);
-
- Assert.assertEquals(windowPanes.size(), 3);
- Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
- Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
- Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
- Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
-
- }
-
- @Test
- public void testCancellationOfAnyTrigger() throws Exception {
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
- Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
- TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- task.init(config, taskContext);
-
- List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- MessageCollector messageCollector =
- envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- //assert that the count trigger fired
- Assert.assertEquals(windowPanes.size(), 1);
-
- //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
- testClock.advanceTime(Duration.ofMillis(500));
-
- //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
- Assert.assertEquals(windowPanes.size(), 1);
-
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-
- //advance timer by 500 more millis to enable the default trigger
- testClock.advanceTime(Duration.ofMillis(500));
- task.window(messageCollector, taskCoordinator);
-
- //assert that the default trigger fired
- Assert.assertEquals(windowPanes.size(), 2);
- Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
- Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
- Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
- Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
-
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-
- //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
- testClock.advanceTime(Duration.ofMillis(500));
- task.window(messageCollector, taskCoordinator);
-
- Assert.assertEquals(windowPanes.size(), 3);
- Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
- Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
- Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
-
- //advance timer by > 500 millis to enable the default trigger
- testClock.advanceTime(Duration.ofMillis(900));
- task.window(messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 4);
- Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
- Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
- Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
- }
-
- @Test
- public void testCancelationOfRepeatingNestedTriggers() throws Exception {
-
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
- Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
- List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
-
- MessageCollector messageCollector =
- envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
-
- TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
- task.init(config, taskContext);
-
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
-
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- //assert that the count trigger fired
- Assert.assertEquals(windowPanes.size(), 1);
-
- //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- testClock.advanceTime(Duration.ofMillis(500));
- //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
- task.window(messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 2);
-
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- Assert.assertEquals(windowPanes.size(), 3);
-
- task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
- //advance timer by 500 more millis to enable the default trigger
- testClock.advanceTime(Duration.ofMillis(500));
- task.window(messageCollector, taskCoordinator);
- //assert that the default trigger fired
- Assert.assertEquals(windowPanes.size(), 4);
- }
-
- private class KeyedTumblingWindowStreamApplication implements StreamApplication {
-
- private final AccumulationMode mode;
- private final Duration duration;
- private final Trigger<IntegerEnvelope> earlyTrigger;
- private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
- KeyedTumblingWindowStreamApplication(AccumulationMode mode,
- Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
- this.mode = mode;
- this.duration = timeDuration;
- this.earlyTrigger = earlyTrigger;
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream =
- graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
- .map(kv -> new IntegerEnvelope(kv.getKey()));
- Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
- inStream
- .map(m -> m)
- .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
- .setEarlyTrigger(earlyTrigger)
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- }
- }
-
- private class TumblingWindowStreamApplication implements StreamApplication {
-
- private final AccumulationMode mode;
- private final Duration duration;
- private final Trigger<IntegerEnvelope> earlyTrigger;
- private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
- TumblingWindowStreamApplication(AccumulationMode mode,
- Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
- this.mode = mode;
- this.duration = timeDuration;
- this.earlyTrigger = earlyTrigger;
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream =
- graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
- .map(kv -> new IntegerEnvelope(kv.getKey()));
- Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
- inStream
- .map(m -> m)
- .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde())
- .setEarlyTrigger(earlyTrigger)
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- }
- }
-
- private class AggregateTumblingWindowStreamApplication implements StreamApplication {
-
- private final AccumulationMode mode;
- private final Duration duration;
- private final Trigger<IntegerEnvelope> earlyTrigger;
- private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
- AggregateTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration,
- Trigger<IntegerEnvelope> earlyTrigger) {
- this.mode = mode;
- this.duration = timeDuration;
- this.earlyTrigger = earlyTrigger;
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers",
- KVSerde.of(new IntegerSerde(), new IntegerSerde()));
-
- integers
- .map(kv -> new IntegerEnvelope(kv.getKey()))
- .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(this.duration, () -> 0, (m, c) -> c + 1, new IntegerSerde())
- .setEarlyTrigger(earlyTrigger)
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- }
- }
-
- private class KeyedSessionWindowStreamApplication implements StreamApplication {
-
- private final AccumulationMode mode;
- private final Duration duration;
- private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
- KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
- this.mode = mode;
- this.duration = duration;
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream =
- graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
- .map(kv -> new IntegerEnvelope(kv.getKey()));
- Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
-
- inStream
- .map(m -> m)
- .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- }
- }
-
- private class IntegerEnvelope extends IncomingMessageEnvelope {
-
- IntegerEnvelope(Integer key) {
- super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key);
- }
- }
-
- private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> {
- private final IntegerSerde intSerde = new IntegerSerde();
-
- @Override
- public byte[] toBytes(IntegerEnvelope object) {
- return intSerde.toBytes((Integer) object.getKey());
- }
-
- @Override
- public IntegerEnvelope fromBytes(byte[] bytes) {
- return new IntegerEnvelope(intSerde.fromBytes(bytes));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/7a2e1925/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
new file mode 100644
index 0000000..7d0c623
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -0,0 +1,677 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.container.TaskContextImpl;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.impl.store.TestInMemoryStore;
+import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.Triggers;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.testUtils.TestClock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+public class TestWindowOperator {
+ private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
+ private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+ private Config config;
+ private TaskContextImpl taskContext;
+ private ApplicationRunner runner;
+
+ @Before
+ public void setup() throws Exception {
+ config = mock(Config.class);
+ when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
+ when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
+ taskContext = mock(TaskContextImpl.class);
+ runner = mock(ApplicationRunner.class);
+ Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
+ Serde storeValSerde = new IntegerEnvelopeSerde();
+
+ when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+ .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+ when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
+ when(taskContext.getStore("jobName-jobId-window-w1"))
+ .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
+ when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
+ }
+
+ @Test
+ public void testTumblingWindowsDiscardingMode() throws Exception {
+
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+ integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+ testClock.advanceTime(Duration.ofSeconds(1));
+
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 5);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
+ Assert.assertEquals((windowPanes.get(4).getMessage()).size(), 1);
+ }
+
+ @Test
+ public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
+
+ StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+ Assert.assertEquals(windowPanes.size(), 0);
+
+ integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+ Assert.assertEquals(windowPanes.size(), 0);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ Assert.assertEquals(windowPanes.size(), 0);
+
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 1);
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 9);
+ }
+
+ @Test
+ public void testTumblingAggregatingWindowsDiscardingMode() throws Exception {
+
+ when(taskContext.getStore("jobName-jobId-window-w1"))
+ .thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde()));
+
+ StreamApplication sgb = new AggregateTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>();
+
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+ MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage());
+ integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+ testClock.advanceTime(Duration.ofSeconds(1));
+
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 5);
+ Assert.assertEquals(windowPanes.get(0).getMessage(), new Integer(2));
+ Assert.assertEquals(windowPanes.get(1).getMessage(), new Integer(2));
+ Assert.assertEquals(windowPanes.get(2).getMessage(), new Integer(2));
+ Assert.assertEquals(windowPanes.get(3).getMessage(), new Integer(2));
+ Assert.assertEquals(windowPanes.get(4).getMessage(), new Integer(1));
+ }
+
+ @Test
+ public void testTumblingWindowsAccumulatingMode() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+ integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 7);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 4);
+
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 4);
+ }
+
+ @Test
+ public void testSessionWindowsDiscardingMode() throws Exception {
+ StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ TestClock testClock = new TestClock();
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 1);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+
+ task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 2);
+ Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 2);
+
+ task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 4);
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
+ Assert.assertEquals((windowPanes.get(3).getMessage()).size(), 2);
+ }
+
+ @Test
+ public void testSessionWindowsAccumulatingMode() throws Exception {
+ StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
+ Duration.ofMillis(500));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+ task.init(config, taskContext);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+
+ task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+
+ task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(2), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 4);
+ }
+
+ @Test
+ public void testCancellationOfOnceTrigger() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
+ Duration.ofSeconds(1), Triggers.count(2));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 1);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+
+ task.process(new IntegerEnvelope(3), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+ Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals((windowPanes.get(2).getMessage()).size(), 1);
+
+ }
+
+ @Test
+ public void testCancellationOfAnyTrigger() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+ Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ //assert that the count trigger fired
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+
+ //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+ //advance timer by 500 more millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+
+ //assert that the default trigger fired
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+ Assert.assertEquals((windowPanes.get(1).getMessage()).size(), 5);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+ //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+
+ //advance timer by > 500 millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(900));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 4);
+ Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
+ }
+
+ @Test
+ public void testCancelationOfRepeatingNestedTriggers() throws Exception {
+
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+ Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ //assert that the count trigger fired
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofMillis(500));
+ //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 2);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 3);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ //advance timer by 500 more millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+ //assert that the default trigger fired
+ Assert.assertEquals(windowPanes.size(), 4);
+ }
+
+ @Test
+ public void testEndOfStreamFlushesWithEarlyTriggerFirings() throws Exception {
+ EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
+ "integers", new Partition(0))), Collections.emptyMap());
+
+ when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
+ when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
+ when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+
+ StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+ Assert.assertEquals(windowPanes.size(), 0);
+
+ List<Integer> integerList = ImmutableList.of(1, 2, 1, 2, 1);
+ integerList.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
+
+ // early triggers should emit (1,2) and (1,2) in the same window.
+ Assert.assertEquals(windowPanes.size(), 2);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ Assert.assertEquals(windowPanes.size(), 2);
+
+ final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(
+ new SystemStreamPartition("kafka", "integers", new Partition(0)));
+ task.process(endOfStream, messageCollector, taskCoordinator);
+
+ // end of stream flushes the last entry (1)
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals((windowPanes.get(0).getMessage()).size(), 2);
+ verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+ verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+ }
+
+ @Test
+ public void testEndOfStreamFlushesWithDefaultTriggerFirings() throws Exception {
+ EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
+ "integers", new Partition(0))), Collections.emptyMap());
+
+ when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
+ when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
+ when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+
+ StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ TestClock testClock = new TestClock();
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ testClock.advanceTime(1000);
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+ final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(
+ new SystemStreamPartition("kafka", "integers", new Partition(0)));
+ task.process(endOfStream, messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals(windowPanes.get(0).getMessage().size(), 2);
+ verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+ verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+ }
+
+ @Test
+ public void testEndOfStreamFlushesWithNoTriggerFirings() throws Exception {
+ EndOfStreamStates endOfStreamStates = new EndOfStreamStates(ImmutableSet.of(new SystemStreamPartition("kafka",
+ "integers", new Partition(0))), Collections.emptyMap());
+
+ when(taskContext.getTaskName()).thenReturn(new TaskName("task 1"));
+ when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
+ when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
+
+ StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ TestClock testClock = new TestClock();
+ List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ task.init(config, taskContext);
+
+ MessageCollector messageCollector =
+ envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
+
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+ task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
+
+ final IncomingMessageEnvelope endOfStream = IncomingMessageEnvelope.buildEndOfStreamEnvelope(
+ new SystemStreamPartition("kafka", "integers", new Partition(0)));
+ task.process(endOfStream, messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 1);
+ Assert.assertEquals(windowPanes.get(0).getMessage().size(), 4);
+ verify(taskCoordinator, times(1)).commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+ verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+ }
+
+ private class KeyedTumblingWindowStreamApplication implements StreamApplication {
+
+ private final AccumulationMode mode;
+ private final Duration duration;
+ private final Trigger<IntegerEnvelope> earlyTrigger;
+ private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+
+ KeyedTumblingWindowStreamApplication(AccumulationMode mode,
+ Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
+ this.mode = mode;
+ this.duration = timeDuration;
+ this.earlyTrigger = earlyTrigger;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
+ Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
+ inStream
+ .map(m -> m)
+ .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
+ .setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ }
+ }
+
+ private class TumblingWindowStreamApplication implements StreamApplication {
+
+ private final AccumulationMode mode;
+ private final Duration duration;
+ private final Trigger<IntegerEnvelope> earlyTrigger;
+ private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+
+ TumblingWindowStreamApplication(AccumulationMode mode,
+ Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
+ this.mode = mode;
+ this.duration = timeDuration;
+ this.earlyTrigger = earlyTrigger;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
+ Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
+ inStream
+ .map(m -> m)
+ .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde())
+ .setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ }
+ }
+
+ private class AggregateTumblingWindowStreamApplication implements StreamApplication {
+
+ private final AccumulationMode mode;
+ private final Duration duration;
+ private final Trigger<IntegerEnvelope> earlyTrigger;
+ private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+
+ AggregateTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration,
+ Trigger<IntegerEnvelope> earlyTrigger) {
+ this.mode = mode;
+ this.duration = timeDuration;
+ this.earlyTrigger = earlyTrigger;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers",
+ KVSerde.of(new IntegerSerde(), new IntegerSerde()));
+
+ integers
+ .map(kv -> new IntegerEnvelope(kv.getKey()))
+ .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(this.duration, () -> 0, (m, c) -> c + 1, new IntegerSerde())
+ .setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ }
+ }
+
+ private class KeyedSessionWindowStreamApplication implements StreamApplication {
+
+ private final AccumulationMode mode;
+ private final Duration duration;
+ private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+
+ KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+ this.mode = mode;
+ this.duration = duration;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
+ Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
+
+ inStream
+ .map(m -> m)
+ .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ }
+ }
+
+ private class IntegerEnvelope extends IncomingMessageEnvelope {
+
+ IntegerEnvelope(Integer key) {
+ super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key);
+ }
+ }
+
+ private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> {
+ private final IntegerSerde intSerde = new IntegerSerde();
+
+ @Override
+ public byte[] toBytes(IntegerEnvelope object) {
+ return intSerde.toBytes((Integer) object.getKey());
+ }
+
+ @Override
+ public IntegerEnvelope fromBytes(byte[] bytes) {
+ return new IntegerEnvelope(intSerde.fromBytes(bytes));
+ }
+ }
+}