You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/03/19 19:31:47 UTC
[1/2] samza git commit: SAMZA-1108: Implementation of Windows and
various kinds of Triggers
Repository: samza
Updated Branches:
refs/heads/master 05915bfc8 -> d399d6f3c
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
new file mode 100644
index 0000000..8544efd
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link TimeSinceLastMessageTrigger}
+ * @param <M> the type of the incoming message
+ */
+public class TimeSinceLastMessageTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TimeSinceLastMessageTriggerImpl.class);
+ private final TimeSinceLastMessageTrigger<M> trigger;
+ private final long durationMs;
+ private final Clock clock;
+ private final TriggerKey<WK> triggerKey;
+ private long callbackTime = Integer.MIN_VALUE;
+ private Cancellable cancellable = null;
+ private boolean shouldFire = false;
+
+ public TimeSinceLastMessageTriggerImpl(TimeSinceLastMessageTrigger<M> trigger, Clock clock, TriggerKey<WK> key) {
+ this.trigger = trigger;
+ this.durationMs = trigger.getDuration().toMillis();
+ this.clock = clock;
+ this.triggerKey = key;
+ }
+
+ @Override
+ public void onMessage(M message, TriggerScheduler<WK> context) {
+ if (!shouldFire) {
+ long currTime = clock.currentTimeMillis();
+
+ if (currTime < callbackTime && cancellable != null) {
+ cancellable.cancel();
+ }
+
+ callbackTime = currTime + durationMs;
+ Runnable runnable = () -> {
+ LOG.trace("Time since last message trigger fired");
+ shouldFire = true;
+ };
+
+ cancellable = context.scheduleCallback(runnable, callbackTime, triggerKey);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (cancellable != null) {
+ cancellable.cancel();
+ }
+ }
+
+ @Override
+ public boolean shouldFire() {
+ return shouldFire;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
new file mode 100644
index 0000000..2454ce9
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link TimeTrigger}
+ */
+public class TimeTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TimeTriggerImpl.class);
+
+ private final TimeTrigger<M> trigger;
+ private final TriggerKey<WK> triggerKey;
+ private Cancellable cancellable;
+ private final Clock clock;
+ private boolean shouldFire = false;
+
+ public TimeTriggerImpl(TimeTrigger<M> trigger, Clock clock, TriggerKey<WK> key) {
+ this.trigger = trigger;
+ this.clock = clock;
+ this.triggerKey = key;
+ }
+
+ public void onMessage(M message, TriggerScheduler<WK> context) {
+ final long now = clock.currentTimeMillis();
+ long triggerDurationMs = trigger.getDuration().toMillis();
+ Long callbackTime = (now - now % triggerDurationMs) + triggerDurationMs;
+
+ if (cancellable == null) {
+ cancellable = context.scheduleCallback(() -> {
+ LOG.trace("Time trigger fired");
+ shouldFire = true;
+ }, callbackTime, triggerKey);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (cancellable != null) {
+ cancellable.cancel();
+ }
+ }
+
+ @Override
+ public boolean shouldFire() {
+ return shouldFire;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
new file mode 100644
index 0000000..705cab7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+
+import org.apache.samza.operators.impl.TriggerScheduler;
+
+/**
+ * Implementation class for a {@link Trigger}. A {@link TriggerImpl} is used with a
+ * which is invoked when the trigger fires.
+ *
+ * <p> When messages arrive in the {@code WindowOperatorImpl}, they are assigned to one or more windows. An
+ * instance of a {@link TriggerImpl} is created corresponding to each {@link Trigger} configured for a window. For every
+ * message added to the window, the {@code WindowOperatorImpl} invokes the {@link #onMessage} on its corresponding
+ * {@link TriggerImpl}s. A {@link TriggerImpl} instance is scoped to a window and its firing determines when results for
+ * its window are emitted.
+ *
+ * {@link TriggerImpl}s can use the {@link TriggerScheduler} to schedule and cancel callbacks (for example, implementations
+ * of time-based triggers).
+ *
+ * <p> State management: The state maintained by {@link TriggerImpl}s is not durable across re-starts and is transient.
+ * New instances of {@link TriggerImpl} are created on a re-start.
+ *
+ */
+public interface TriggerImpl<M, WK> {
+
+ /**
+ * Invoked when a message is added to the window corresponding to this {@link TriggerImpl}.
+ * @param message the incoming message
+ * @param context the {@link TriggerScheduler} to schedule and cancel callbacks
+ */
+ public void onMessage(M message, TriggerScheduler<WK> context);
+
+ /**
+ * Returns {@code true} if the current state of the trigger indicates that its condition
+ * is satisfied and it is ready to fire.
+ * @return if this trigger should fire.
+ */
+ public boolean shouldFire();
+
+ /**
+ * Invoked when the execution of this {@link TriggerImpl} is canceled by an up-stream {@link TriggerImpl}.
+ *
+ * No calls to {@link #onMessage(Object, TriggerScheduler)} or {@link #shouldFire()} will be invoked
+ * after this invocation.
+ */
+ public void cancel();
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
new file mode 100644
index 0000000..f64a1db
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.util.Clock;
+
+/**
+ * Factory methods for instantiating {@link TriggerImpl}s from individual {@link Trigger}s.
+ */
+public class TriggerImpls {
+
+ public static <M, WK> TriggerImpl<M, WK> createTriggerImpl(Trigger<M> trigger, Clock clock, TriggerKey<WK> triggerKey) {
+
+ if (trigger == null) {
+ throw new IllegalArgumentException("Trigger must not be null");
+ }
+
+ if (trigger instanceof CountTrigger) {
+ return new CountTriggerImpl<>((CountTrigger<M>) trigger, triggerKey);
+ } else if (trigger instanceof RepeatingTrigger) {
+ return new RepeatingTriggerImpl<>((RepeatingTrigger<M>) trigger, clock, triggerKey);
+ } else if (trigger instanceof AnyTrigger) {
+ return new AnyTriggerImpl<>((AnyTrigger<M>) trigger, clock, triggerKey);
+ } else if (trigger instanceof TimeSinceLastMessageTrigger) {
+ return new TimeSinceLastMessageTriggerImpl<>((TimeSinceLastMessageTrigger<M>) trigger, clock, triggerKey);
+ } else if (trigger instanceof TimeTrigger) {
+ return new TimeTriggerImpl((TimeTrigger<M>) trigger, clock, triggerKey);
+ } else if (trigger instanceof TimeSinceFirstMessageTrigger) {
+ return new TimeSinceFirstMessageTriggerImpl<>((TimeSinceFirstMessageTrigger<M>) trigger, clock, triggerKey);
+ }
+
+ throw new SamzaException("No implementation class defined for the trigger " + trigger.getClass().getCanonicalName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
index e5dab80..b8672c6 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.samza.operators.util;
import org.apache.samza.storage.kv.Entry;
@@ -30,24 +31,28 @@ import java.util.Map;
/**
* Implements a {@link KeyValueStore} using an in-memory Java Map.
- * @param <K> the type of the key in the store
- * @param <V> the type of the value in the store
+ * @param <K> the type of key
+ * @param <V> the type of value
+ *
+ * TODO: This class is a stop-gap until we implement persistent store creation from TaskContext.
*
- * TODO HIGH prateekm: Remove when we switch to an persistent implementation for KeyValueStore API.
*/
public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
- final Map<K, V> map = new LinkedHashMap<>();
+ private final Map<K, V> map = new LinkedHashMap<>();
@Override
public V get(K key) {
+ if (key == null) {
+ throw new NullPointerException("Null key provided");
+ }
return map.get(key);
}
@Override
public Map<K, V> getAll(List<K> keys) {
Map<K, V> values = new HashMap<>();
- for (K key: keys) {
+ for (K key : keys) {
values.put(key, map.get(key));
}
return values;
@@ -55,18 +60,24 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
@Override
public void put(K key, V value) {
+ if (key == null) {
+ throw new NullPointerException("Null key provided");
+ }
map.put(key, value);
}
@Override
public void putAll(List<Entry<K, V>> entries) {
- for (Entry<K, V> entry: entries) {
+ for (Entry<K, V> entry : entries) {
put(entry.getKey(), entry.getValue());
}
}
@Override
public void delete(K key) {
+ if (key == null) {
+ throw new NullPointerException("Null key provided");
+ }
map.remove(key);
}
@@ -119,4 +130,4 @@ public class InternalInMemoryStore<K, V> implements KeyValueStore<K, V> {
public void flush() {
//not applicable
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index 9ec8e5a..d4224c3 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -20,6 +20,8 @@ package org.apache.samza.task;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
import org.apache.samza.operators.MessageStreamImpl;
@@ -30,6 +32,9 @@ import org.apache.samza.operators.impl.OperatorGraph;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
/**
@@ -65,16 +70,27 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
/**
* A mapping from each {@link SystemStream} to the root node of its operator chain DAG.
*/
- private final OperatorGraph operatorGraph = new OperatorGraph();
+ private final OperatorGraph operatorGraph;
private final StreamApplication graphBuilder;
private final ApplicationRunner runner;
+ private final Clock clock;
+
private ContextManager contextManager;
+ private Set<SystemStreamPartition> systemStreamPartitions;
+
public StreamOperatorTask(StreamApplication graphBuilder, ApplicationRunner runner) {
+ this(graphBuilder, SystemClock.instance(), runner);
+ }
+
+ // purely for testing.
+ public StreamOperatorTask(StreamApplication graphBuilder, Clock clock, ApplicationRunner runner) {
this.graphBuilder = graphBuilder;
+ this.operatorGraph = new OperatorGraph(clock);
+ this.clock = clock;
this.runner = runner;
}
@@ -85,9 +101,10 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
this.graphBuilder.init(streamGraph, config);
// get the context manager of the {@link StreamGraph} and initialize the task-specific context
this.contextManager = streamGraph.getContextManager();
+ this.systemStreamPartitions = context.getSystemStreamPartitions();
Map<SystemStream, MessageStreamImpl> inputBySystemStream = new HashMap<>();
- context.getSystemStreamPartitions().forEach(ssp -> {
+ systemStreamPartitions.forEach(ssp -> {
if (!inputBySystemStream.containsKey(ssp.getSystemStream())) {
// create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream}
inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streamGraph.getInputStream(ssp.getSystemStream()));
@@ -103,8 +120,11 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
}
@Override
- public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
- this.operatorGraph.getAll().forEach(r -> r.onTimer(collector, coordinator));
+ public final void window(MessageCollector collector, TaskCoordinator coordinator) {
+ systemStreamPartitions.forEach(ssp -> {
+ this.operatorGraph.get(ssp.getSystemStream())
+ .onTick(collector, coordinator);
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index 543716a..6edf048 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -33,6 +33,7 @@ import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
+import java.util.function.Supplier;
/**
@@ -44,9 +45,10 @@ public class PageViewCounterExample implements StreamApplication {
MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+ Supplier<Integer> initialValue = () -> 0;
pageViewEvents.
- window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1).
+ window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), initialValue, (m, c) -> c + 1).
setEarlyTrigger(Triggers.repeat(Triggers.count(5))).
setAccumulationMode(AccumulationMode.DISCARDING)).
map(MyStreamOutput::new).
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 729b26f..e222fe4 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -31,6 +31,7 @@ import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
+import java.util.function.Supplier;
/**
@@ -67,11 +68,12 @@ public class RepartitionExample implements StreamApplication {
MessageStream<PageViewEvent> pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>());
OutputStream<MyStreamOutput> pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>());
+ Supplier<Integer> initialValue = () -> 0;
pageViewEvents.
partitionBy(m -> m.getMessage().memberId).
window(Windows.<PageViewEvent, String, Integer>keyedTumblingWindow(
- msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)).
+ msg -> msg.getMessage().memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1)).
map(MyStreamOutput::new).
sendTo(pageViewPerMemberCounters);
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
index d988270..1c30a21 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java
@@ -21,13 +21,15 @@ package org.apache.samza.example;
import java.time.Duration;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.Supplier;
+
import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.data.InputMessageEnvelope;
import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.system.StreamSpec;
@@ -65,18 +67,20 @@ public class TestBroadcastExample extends TestExampleBase {
@Override
public void init(StreamGraph graph, Config config) {
- BiFunction<JsonMessageEnvelope, Integer, Integer> sumAggregator = (m, c) -> c + 1;
+ FoldLeftFunction<JsonMessageEnvelope, Integer> sumAggregator = (m, c) -> c + 1;
+ Supplier<Integer> initialValue = () -> 0;
+
inputs.keySet().forEach(entry -> {
MessageStream<JsonMessageEnvelope> inputStream = graph.<Object, Object, InputMessageEnvelope>createInStream(
new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage);
- inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
.setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
- inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
.setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
- inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator)
+ inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator)
.setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10)))));
});
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
index 6896da5..c88df7c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java
@@ -25,13 +25,14 @@ import org.apache.samza.operators.data.InputMessageEnvelope;
import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStreamPartition;
import java.time.Duration;
-import java.util.function.BiFunction;
import java.util.Set;
+import java.util.function.Supplier;
/**
@@ -57,11 +58,12 @@ public class TestWindowExample extends TestExampleBase {
@Override
public void init(StreamGraph graph, Config config) {
- BiFunction<JsonMessageEnvelope, Integer, Integer> maxAggregator = (m, c) -> c + 1;
+ FoldLeftFunction<JsonMessageEnvelope, Integer> maxAggregator = (m, c) -> c + 1;
+ Supplier<Integer> initialValue = () -> 0;
inputs.keySet().forEach(source -> graph.<Object, Object, InputMessageEnvelope>createInStream(
new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null).
map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(),
- m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator)));
+ m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), initialValue, maxAggregator)));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 361972e..5722dbd 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -50,7 +50,12 @@ public class TestOperatorImpl {
TestOperatorImpl.this.curCollector = collector;
TestOperatorImpl.this.curCoordinator = coordinator;
}
- };
+ @Override
+ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+
+ }
+
+ };
// verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
OperatorImpl mockSub = mock(OperatorImpl.class);
opImpl.registerNextOperator(mockSub);
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
index 088cb00..31f6f4a 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -38,6 +38,7 @@ import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.internal.WindowType;
import org.apache.samza.task.TaskContext;
import org.junit.Before;
import org.junit.Test;
@@ -77,7 +78,7 @@ public class TestOperatorImpls {
public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
// get window operator
WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
- WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null);
+ WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING);
when(mockWnd.getWindow()).thenReturn(windowInternal);
MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
Config mockConfig = mock(Config.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
index ae3d151..ec1d74c 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -25,18 +25,20 @@ import org.apache.samza.operators.TestMessageStreamImplUtil;
import org.apache.samza.operators.TestOutputMessageEnvelope;
import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.functions.PartialJoinFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.internal.WindowType;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -69,17 +71,17 @@ public class TestOperatorSpecs {
@Test
public void testGetWindowOperator() throws Exception {
Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
- BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c + 1;
-
+ FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
+ Supplier<Integer> initialValue = () -> 0;
//instantiate a window using reflection
- WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null);
+ WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING);
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
assertEquals(spec.getWindow(), window);
assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
- assertEquals(spec.getWindow().getFoldFunction(), aggregator);
+ assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator);
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
new file mode 100644
index 0000000..674a8f1
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.util.Clock;
+
+import java.time.Duration;
+
+/**
+ * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration.
+ * Used for testing.
+ */
+public class TestClock implements Clock {
+
+ long currentTime = 1;
+
+ public void advanceTime(Duration duration) {
+ currentTime += duration.toMillis();
+ }
+
+ public void advanceTime(long millis) {
+ currentTime += millis;
+ }
+
+ @Override
+ public long currentTimeMillis() {
+ return currentTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
new file mode 100644
index 0000000..0d720dd
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import junit.framework.Assert;
+import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamOperatorTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestWindowOperator {
+ private final MessageCollector messageCollector = mock(MessageCollector.class);
+ private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
+ private final List<WindowPane<Integer, Collection<MessageEnvelope<Integer, Integer>>>> windowPanes = new ArrayList<>();
+ private final List<Integer> integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3);
+ private Config config;
+ private TaskContext taskContext;
+ private ApplicationRunner runner;
+
+ @Before
+ public void setup() throws Exception {
+ windowPanes.clear();
+
+ config = mock(Config.class);
+ taskContext = mock(TaskContext.class);
+ runner = mock(ApplicationRunner.class);
+ when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
+ .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
+
+ }
+
+ @Test
+ public void testTumblingWindowsDiscardingMode() throws Exception {
+
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+ task.init(config, taskContext);
+
+ integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+ testClock.advanceTime(Duration.ofSeconds(1));
+
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 5);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(4).getMessage()).size(), 1);
+ }
+
+ @Test
+ public void testTumblingWindowsAccumulatingMode() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+ task.init(config, taskContext);
+
+ integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator));
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 7);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
+
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 4);
+
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 4);
+ }
+
+ @Test
+ public void testSessionWindowsDiscardingMode() throws Exception {
+ StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 1);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1");
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001");
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001");
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 2);
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 2);
+
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 4);
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2));
+ Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001");
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(3).getMessage()).size(), 2);
+
+ }
+
+ @Test
+ public void testSessionWindowsAccumulatingMode() throws Exception {
+ StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2));
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(0).getMessage()).size(), 2);
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 4);
+ }
+
+ @Test
+ public void testCancelationOfOnceTrigger() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.count(2));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 1);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY);
+
+ task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+ Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+
+ task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofSeconds(1));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3));
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+ Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(2).getMessage()).size(), 1);
+
+ }
+
+ @Test
+ public void testCancelationOfAnyTrigger() throws Exception {
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+ Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+ //assert that the count trigger fired
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+
+ //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+ //advance timer by 500 more millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+
+ //assert that the default trigger fired
+ Assert.assertEquals(windowPanes.size(), 2);
+ Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0");
+ Assert.assertEquals(((Collection<MessageEnvelope<Integer, Integer>>) windowPanes.get(1).getMessage()).size(), 5);
+
+ task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+
+ //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+
+ Assert.assertEquals(windowPanes.size(), 3);
+ Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY);
+ Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000");
+
+ //advance timer by > 500 millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(900));
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 4);
+ Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT);
+ Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1));
+ Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000");
+ }
+
+ @Test
+ public void testCancelationOfRepeatingNestedTriggers() throws Exception {
+
+ StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+ Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+ TestClock testClock = new TestClock();
+ StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner);
+ task.init(config, taskContext);
+
+ task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator);
+
+ task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator);
+ //assert that the count trigger fired
+ Assert.assertEquals(windowPanes.size(), 1);
+
+ //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger
+ task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+ testClock.advanceTime(Duration.ofMillis(500));
+ //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger
+ task.window(messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 2);
+
+ task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator);
+ task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator);
+ Assert.assertEquals(windowPanes.size(), 3);
+
+ task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator);
+ //advance timer by 500 more millis to enable the default trigger
+ testClock.advanceTime(Duration.ofMillis(500));
+ task.window(messageCollector, taskCoordinator);
+ //assert that the default trigger fired
+ Assert.assertEquals(windowPanes.size(), 4);
+ }
+
+ private class KeyedTumblingWindowStreamApplication implements StreamApplication {
+
+ private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka");
+ private final AccumulationMode mode;
+ private final Duration duration;
+ private final Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger;
+
+ KeyedTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration, Trigger<MessageEnvelope<Integer, Integer>> earlyTrigger) {
+ this.mode = mode;
+ this.duration = timeDuration;
+ this.earlyTrigger = earlyTrigger;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null);
+ Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+ inStream
+ .map(m -> m)
+ .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode))
+ .map(m -> {
+ windowPanes.add(m);
+ return m;
+ });
+ }
+ }
+
+ private class KeyedSessionWindowStreamApplication implements StreamApplication {
+
+ private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka");
+ private final AccumulationMode mode;
+ private final Duration duration;
+
+ KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
+ this.mode = mode;
+ this.duration = duration;
+ }
+
+ @Override
+ public void init(StreamGraph graph, Config config) {
+ MessageStream<MessageEnvelope<Integer, Integer>> inStream = graph.createInStream(streamSpec, null, null);
+ Function<MessageEnvelope<Integer, Integer>, Integer> keyFn = m -> m.getKey();
+
+ inStream
+ .map(m -> m)
+ .window(Windows.keyedSessionWindow(keyFn, duration)
+ .setAccumulationMode(mode))
+ .map(m -> {
+ windowPanes.add(m);
+ return m;
+ });
+ }
+ }
+
+ private class IntegerMessageEnvelope extends IncomingMessageEnvelope {
+ IntegerMessageEnvelope(int key, int msg) {
+ super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg);
+ }
+ }
+}
[2/2] samza git commit: SAMZA-1108: Implementation of Windows and
various kinds of Triggers
Posted by ja...@apache.org.
SAMZA-1108: Implementation of Windows and various kinds of Triggers
* Implemented various triggers and the orchestration logic of the window operator.
* Implemented wire-up of window and the flow of messages through various trigger implementations.
* Implementations for count, time, timeSinceFirst, timeSinceLast, Any, Repeating triggers.
Author: vjagadish1989 <jv...@linkedin.com>
Reviewers: Yi Pan (Data Infrastructure) <ni...@gmail.com>, Prateek Maheshwari <pm...@linkedin.com>, Chris Pettitt <cp...@linkedin.com>
Closes #66 from vjagadish1989/window-impl
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/d399d6f3
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/d399d6f3
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/d399d6f3
Branch: refs/heads/master
Commit: d399d6f3ca0fa20afcfad5864ec7f8550aa7a00f
Parents: 05915bf
Author: vjagadish1989 <jv...@linkedin.com>
Authored: Sun Mar 19 12:25:40 2017 -0700
Committer: vjagadish1989 <jv...@linkedin.com>
Committed: Sun Mar 19 12:31:21 2017 -0700
----------------------------------------------------------------------
checkstyle/checkstyle.xml | 7 +-
.../operators/functions/FilterFunction.java | 2 +-
.../operators/functions/FoldLeftFunction.java | 36 ++
.../samza/operators/functions/MapFunction.java | 5 +-
.../samza/operators/triggers/AnyTrigger.java | 9 +-
.../samza/operators/triggers/CountTrigger.java | 2 +-
.../samza/operators/triggers/FiringType.java | 29 ++
.../operators/triggers/RepeatingTrigger.java | 4 +
.../triggers/TimeSinceFirstMessageTrigger.java | 2 +-
.../triggers/TimeSinceLastMessageTrigger.java | 3 +-
.../samza/operators/triggers/TimeTrigger.java | 2 +-
.../samza/operators/windows/WindowKey.java | 36 +-
.../samza/operators/windows/WindowPane.java | 14 +-
.../apache/samza/operators/windows/Windows.java | 153 ++------
.../windows/internal/WindowInternal.java | 65 +++-
.../operators/windows/internal/WindowType.java | 24 ++
.../samza/operators/windows/TestWindowPane.java | 3 +-
.../apache/samza/operators/StreamGraphImpl.java | 1 +
.../org/apache/samza/operators/WindowState.java | 44 +++
.../samza/operators/impl/OperatorGraph.java | 16 +-
.../samza/operators/impl/OperatorImpl.java | 30 +-
.../operators/impl/PartialJoinOperatorImpl.java | 1 -
.../apache/samza/operators/impl/TriggerKey.java | 73 ++++
.../samza/operators/impl/TriggerScheduler.java | 120 ++++++
.../operators/impl/WindowOperatorImpl.java | 290 +++++++++++++-
.../operators/spec/WindowOperatorSpec.java | 11 +-
.../operators/triggers/AnyTriggerImpl.java | 80 ++++
.../samza/operators/triggers/Cancellable.java | 34 ++
.../operators/triggers/CountTriggerImpl.java | 61 +++
.../triggers/RepeatingTriggerImpl.java | 67 ++++
.../TimeSinceFirstMessageTriggerImpl.java | 71 ++++
.../TimeSinceLastMessageTriggerImpl.java | 79 ++++
.../operators/triggers/TimeTriggerImpl.java | 71 ++++
.../samza/operators/triggers/TriggerImpl.java | 66 ++++
.../samza/operators/triggers/TriggerImpls.java | 53 +++
.../operators/util/InternalInMemoryStore.java | 25 +-
.../apache/samza/task/StreamOperatorTask.java | 28 +-
.../samza/example/PageViewCounterExample.java | 4 +-
.../samza/example/RepartitionExample.java | 4 +-
.../samza/example/TestBroadcastExample.java | 14 +-
.../apache/samza/example/TestWindowExample.java | 8 +-
.../samza/operators/impl/TestOperatorImpl.java | 7 +-
.../samza/operators/impl/TestOperatorImpls.java | 3 +-
.../samza/operators/spec/TestOperatorSpecs.java | 12 +-
.../samza/operators/triggers/TestClock.java | 45 +++
.../operators/triggers/TestWindowOperator.java | 389 +++++++++++++++++++
46 files changed, 1885 insertions(+), 218 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index 0999fd7..775d674 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -20,7 +20,9 @@
-->
<module name="Checker">
<property name="localeLanguage" value="en"/>
-
+ <!-- allow suppression for specific files -->
+ <module name="SuppressionCommentFilter"/>
+
<module name="FileTabCharacter"/>
<!-- header: use one star only -->
@@ -32,6 +34,7 @@
<!-- code cleanup -->
<module name="UnusedImports"/>
+ <module name="FileContentsHolder"/>
<module name="RedundantImport"/>
<module name="IllegalImport" />
<module name="EqualsHashCode"/>
@@ -62,8 +65,8 @@
<!-- whitespace -->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
- <module name="WhitespaceAfter" />
<module name="NoWhitespaceAfter"/>
+ <module name="WhitespaceAfter" />
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
index 58479d6..143bae0 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java
@@ -31,7 +31,7 @@ public interface FilterFunction<M> extends InitableFunction {
/**
* Returns a boolean indicating whether this message should be retained or filtered out.
- * @param message the input message to be checked
+ * @param message the input message to be checked. This object should not be mutated.
* @return true if {@code message} should be retained
*/
boolean apply(M message);
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
new file mode 100644
index 0000000..58e88fd
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.functions;
+
+/**
+ * A fold function that incrementally combines and aggregates values for a window.
+ */
+public interface FoldLeftFunction<M, WV> extends InitableFunction {
+
+ /**
+ * Incrementally combine and aggregate values for the window. Guaranteed to be invoked for every
+ * message added to the window.
+ *
+ * @param message the incoming message that is added to the window. This object should not be mutated.
+ * @param oldValue the previous value
+ * @return the new value
+ */
+ WV apply(M message, WV oldValue);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
index 05a554f..b09fb99 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java
@@ -31,8 +31,9 @@ import org.apache.samza.annotation.InterfaceStability;
public interface MapFunction<M, OM> extends InitableFunction {
/**
- * Transforms the provided message into another message
- * @param message the input message to be transformed
+ * Transforms the provided message into another message.
+ *
+ * @param message the input message to be transformed. This object should not be mutated.
* @return the transformed message
*/
OM apply(M message);
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
index 6e134df..f52b57b 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java
@@ -23,16 +23,15 @@ import java.util.List;
/**
* A {@link Trigger} fires as soon as any of its individual triggers has fired.
*/
-public class AnyTrigger<M> implements Trigger {
+public class AnyTrigger<M> implements Trigger<M> {
- private final List<Trigger> triggers;
+ private final List<Trigger<M>> triggers;
- AnyTrigger(List<Trigger> triggers) {
+ AnyTrigger(List<Trigger<M>> triggers) {
this.triggers = triggers;
}
- public List<Trigger> getTriggers() {
+ public List<Trigger<M>> getTriggers() {
return triggers;
}
}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
index 1cf930c..dbae3a9 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java
@@ -22,7 +22,7 @@ package org.apache.samza.operators.triggers;
* A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane}
* reaches the specified count.
*/
-public class CountTrigger<M> implements Trigger {
+public class CountTrigger<M> implements Trigger<M> {
private final long count;
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java
new file mode 100644
index 0000000..49d971d
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+/**
+ * The type of the {@link org.apache.samza.operators.triggers.Trigger} firing.
+ * Firings can be either early or late or default. Late triggers are not supported currently.
+ */
+public enum FiringType {
+ EARLY,
+ DEFAULT,
+ LATE
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
index 7f78eb8..166d0d9 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java
@@ -28,5 +28,9 @@ class RepeatingTrigger<M> implements Trigger<M> {
RepeatingTrigger(Trigger<M> trigger) {
this.trigger = trigger;
}
+
+ public Trigger<M> getTrigger() {
+ return trigger;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
index 4de60a2..94b7769 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java
@@ -26,7 +26,7 @@ import java.time.Duration;
* A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in
* the window pane.
*/
-public class TimeSinceFirstMessageTrigger<M> implements Trigger {
+public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
index 6b09625..2231fd4 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java
@@ -22,8 +22,9 @@ import java.time.Duration;
/*
* A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration.
+ * @param <M> the type of the incoming {@link MessageEnvelope}
*/
-public class TimeSinceLastMessageTrigger<M> implements Trigger {
+public class TimeSinceLastMessageTrigger<M> implements Trigger<M> {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
index c5875aa..d854d74 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java
@@ -23,7 +23,7 @@ import java.time.Duration;
/*
* A {@link Trigger} that fires after the specified duration in processing time.
*/
-public class TimeTrigger<M> implements Trigger {
+public class TimeTrigger<M> implements Trigger<M> {
private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
index 14bd5ab..bf52724 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java
@@ -26,14 +26,19 @@ package org.apache.samza.operators.windows;
*
*/
public class WindowKey<K> {
-
+ /**
+ * A (key,paneId) tuple uniquely identifies an emission from a window. For instance, in case of keyed-tumbling time windows,
+ * the key is provided by the keyExtractor function, and the paneId is the start of the time window boundary. In case
+ * of session windows, the key is provided by the keyExtractor function, and the paneId is the time at which the earliest
+ * message in the window arrived.
+ */
private final K key;
private final String paneId;
- public WindowKey(K key, String windowId) {
+ public WindowKey(K key, String paneId) {
this.key = key;
- this.paneId = windowId;
+ this.paneId = paneId;
}
public K getKey() {
@@ -52,4 +57,29 @@ public class WindowKey<K> {
}
return String.format("%s%s", wndKey, paneId);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ WindowKey<?> windowKey = (WindowKey<?>) o;
+
+ if (!key.equals(windowKey.key)) return false;
+
+ if (paneId == null) {
+ return windowKey.paneId == null;
+ }
+
+ return paneId.equals(windowKey.paneId);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = key.hashCode();
+ result = 31 * result + (paneId != null ? paneId.hashCode() : 0);
+ return result;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
index 3b66bd1..3b19f8a 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java
@@ -18,6 +18,8 @@
*/
package org.apache.samza.operators.windows;
+import org.apache.samza.operators.triggers.FiringType;
+
/**
* Specifies the result emitted from a {@link Window}.
*
@@ -32,10 +34,16 @@ public final class WindowPane<K, V> {
private final AccumulationMode mode;
- WindowPane(WindowKey<K> key, V value, AccumulationMode mode) {
+ /**
+ * The type of the trigger that emitted this result. Results can be emitted from early, late or default triggers.
+ */
+ private final FiringType type;
+
+ public WindowPane(WindowKey<K> key, V value, AccumulationMode mode, FiringType type) {
this.key = key;
this.value = value;
this.mode = mode;
+ this.type = type;
}
public V getMessage() {
@@ -46,8 +54,8 @@ public final class WindowPane<K, V> {
return this.key;
}
- static public <K, M> WindowPane<K, M> of(WindowKey<K> key, M result) {
- return new WindowPane<>(key, result, AccumulationMode.DISCARDING);
+ public FiringType getFiringType() {
+ return type;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
index 73fb5c8..9192fc1 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java
@@ -20,15 +20,18 @@
package org.apache.samza.operators.windows;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.triggers.TimeTrigger;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.internal.WindowType;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Supplier;
/**
* APIs for creating different types of {@link Window}s.
@@ -84,6 +87,8 @@ import java.util.function.Function;
* and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window
* types.
*
+ * <p> Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of
+ * finer granularity are not supported.
*/
@InterfaceStability.Unstable
public final class Windows {
@@ -107,17 +112,18 @@ public final class Windows {
*
* @param keyFn the function to extract the window key from a message
* @param interval the duration in processing time
+ * @param initialValue the initial value to be used for aggregations
* @param foldFn the function to aggregate messages in the {@link WindowPane}
* @param <M> the type of the input message
* @param <WV> the type of the {@link WindowPane} output value
* @param <K> the type of the key in the {@link Window}
* @return the created {@link Window} function.
*/
- public static <M, K, WV> Window<M, K, WV>
- keyedTumblingWindow(Function<M, K> keyFn, Duration interval, BiFunction<M, WV, WV> foldFn) {
+ public static <M, K, WV> Window<M, K, WV> keyedTumblingWindow(Function<M, K> keyFn, Duration interval,
+ Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) {
Trigger<M> defaultTrigger = new TimeTrigger<>(interval);
- return new WindowInternal<M, K, WV>(defaultTrigger, foldFn, keyFn, null);
+ return new WindowInternal<M, K, WV>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.TUMBLING);
}
@@ -142,11 +148,10 @@ public final class Windows {
* @return the created {@link Window} function
*/
public static <M, K> Window<M, K, Collection<M>> keyedTumblingWindow(Function<M, K> keyFn, Duration interval) {
- BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
- c.add(m);
- return c;
- };
- return keyedTumblingWindow(keyFn, interval, aggregator);
+ FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
+
+ Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
+ return keyedTumblingWindow(keyFn, interval, initialValue, aggregator);
}
/**
@@ -164,15 +169,16 @@ public final class Windows {
* </pre>
*
* @param duration the duration in processing time
+ * @param initialValue the initial value to be used for aggregations
* @param foldFn to aggregate messages in the {@link WindowPane}
* @param <M> the type of the input message
* @param <WV> the type of the {@link WindowPane} output value
* @return the created {@link Window} function
*/
- public static <M, WV> Window<M, Void, WV>
- tumblingWindow(Duration duration, BiFunction<M, WV, WV> foldFn) {
+ public static <M, WV> Window<M, Void, WV> tumblingWindow(Duration duration, Supplier<WV> initialValue,
+ FoldLeftFunction<M, WV> foldFn) {
Trigger<M> defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration));
- return new WindowInternal<>(defaultTrigger, foldFn, null, null);
+ return new WindowInternal<>(defaultTrigger, initialValue, foldFn, null, null, WindowType.TUMBLING);
}
/**
@@ -195,11 +201,10 @@ public final class Windows {
* @return the created {@link Window} function
*/
public static <M> Window<M, Void, Collection<M>> tumblingWindow(Duration duration) {
- BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
- c.add(m);
- return c;
- };
- return tumblingWindow(duration, aggregator);
+ FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
+
+ Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
+ return tumblingWindow(duration, initialValue, aggregator);
}
/**
@@ -223,15 +228,17 @@ public final class Windows {
*
* @param keyFn the function to extract the window key from a message
* @param sessionGap the timeout gap for defining the session
+ * @param initialValue the initial value to be used for aggregations
* @param foldFn the function to aggregate messages in the {@link WindowPane}
* @param <M> the type of the input message
* @param <K> the type of the key in the {@link Window}
* @param <WV> the type of the output value in the {@link WindowPane}
* @return the created {@link Window} function
*/
- public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap, BiFunction<M, WV, WV> foldFn) {
+ public static <M, K, WV> Window<M, K, WV> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap,
+ Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldFn) {
Trigger<M> defaultTrigger = Triggers.timeSinceLastMessage(sessionGap);
- return new WindowInternal<>(defaultTrigger, foldFn, keyFn, null);
+ return new WindowInternal<>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.SESSION);
}
/**
@@ -260,114 +267,18 @@ public final class Windows {
*/
public static <M, K> Window<M, K, Collection<M>> keyedSessionWindow(Function<M, K> keyFn, Duration sessionGap) {
- BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
- c.add(m);
- return c;
- };
- return keyedSessionWindow(keyFn, sessionGap, aggregator);
- }
-
+ FoldLeftFunction<M, Collection<M>> aggregator = createAggregator();
- /**
- * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a
- * default trigger. The triggering behavior must be specified by setting an early trigger.
- *
- * <p>The below example computes the maximum value over a count based window. The window emits {@link WindowPane}s when
- * there are either 50 messages in the window pane or when 10 seconds have passed since the first message in the pane.
- *
- * <pre> {@code
- * MessageStream<Long> stream = ...;
- * BiFunction<Long, Long, Long> maxAggregator = (m, c)-> Math.max(m, c);
- * MessageStream<WindowPane<Void, Long>> windowedStream = stream.window(Windows.globalWindow(maxAggregator)
- * .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
- * }
- * </pre>
- *
- * @param foldFn the function to aggregate messages in the {@link WindowPane}
- * @param <M> the type of message
- * @param <WV> type of the output value in the {@link WindowPane}
- * @return the created {@link Window} function.
- */
- public static <M, WV> Window<M, Void, WV> globalWindow(BiFunction<M, WV, WV> foldFn) {
- return new WindowInternal<>(null, foldFn, null, null);
+ Supplier<Collection<M>> initialValue = () -> new ArrayList<>();
+ return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator);
}
- /**
- * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a
- * default trigger. The triggering behavior must be specified by setting an early trigger.
- *
- * The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes.
- * <pre> {@code
- * MessageStream<Long> stream = ...;
- * MessageStream<WindowPane<Void, Collection<Long>> windowedStream = stream.window(Windows.globalWindow()
- * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
- * }
- * </pre>
- *
- * @param <M> the type of message
- * @return the created {@link Window} function.
- */
- public static <M> Window<M, Void, Collection<M>> globalWindow() {
- BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
- c.add(m);
- return c;
- };
- return globalWindow(aggregator);
- }
- /**
- * Returns a global {@link Window} that groups incoming messages using the provided keyFn.
- * The window does not have a default trigger. The triggering behavior must be specified by setting an early
- * trigger.
- *
- * <p> The below example groups the stream into count based windows. The window triggers every 50 messages or every
- * 10 minutes.
- *
- * <pre> {@code
- * MessageStream<UserClick> stream = ...;
- * BiFunction<UserClick, Long, Long> maxAggregator = (m, c)-> Math.max(parseLongField(m), c);
- * Function<UserClick, String> keyFn = ...;
- * MessageStream<WindowPane<String, Long>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
- * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
- * }
- * </pre>
- *
- * @param keyFn the function to extract the window key from a message
- * @param foldFn the function to aggregate messages in the {@link WindowPane}
- * @param <M> the type of message
- * @param <K> type of the key in the {@link Window}
- * @param <WV> the type of the output value in the {@link WindowPane}
- * @return the created {@link Window} function
- */
- public static <M, K, WV> Window<M, K, WV> keyedGlobalWindow(Function<M, K> keyFn, BiFunction<M, WV, WV> foldFn) {
- return new WindowInternal<M, K, WV>(null, foldFn, keyFn, null);
- }
-
- /**
- * Returns a global {@link Window} that groups incoming messages using the provided keyFn.
- * The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger.
- *
- * <p> The below example groups the stream per-key into count based windows. The window triggers every 50 messages or
- * every 10 minutes.
- *
- * <pre> {@code
- * MessageStream<UserClick> stream = ...;
- * Function<UserClick, String> keyFn = ...;
- * MessageStream<WindowPane<String, Collection<UserClick>> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
- * .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
- * }
- * </pre>
- *
- * @param keyFn the function to extract the window key from a message
- * @param <M> the type of message
- * @param <K> the type of the key in the {@link Window}
- * @return the created {@link Window} function
- */
- public static <M, K> Window<M, K, Collection<M>> keyedGlobalWindow(Function<M, K> keyFn) {
- BiFunction<M, Collection<M>, Collection<M>> aggregator = (m, c) -> {
+ private static <M> FoldLeftFunction<M, Collection<M>> createAggregator() {
+ return (m, c) -> {
c.add(m);
return c;
};
- return keyedGlobalWindow(keyFn, aggregator);
}
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
index 9479eea..f6ac301 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java
@@ -18,12 +18,13 @@
*/
package org.apache.samza.operators.windows.internal;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.functions.FoldLeftFunction;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.Window;
-import java.util.function.BiFunction;
import java.util.function.Function;
+import java.util.function.Supplier;
/**
* Internal representation of a {@link Window}. This specifies default, early and late triggers for the {@link Window}
@@ -32,81 +33,105 @@ import java.util.function.Function;
* Note: This class is meant to be used internally by Samza, and is not to be instantiated by programmers.
*
* @param <M> the type of input message
- * @param <K> the type of key for the window
+ * @param <WK> the type of key for the window
* @param <WV> the type of aggregated value in the window output
*/
@InterfaceStability.Unstable
-public final class WindowInternal<M, K, WV> implements Window<M, K, WV> {
+public final class WindowInternal<M, WK, WV> implements Window<M, WK, WV> {
- private final Trigger defaultTrigger;
+ private final Trigger<M> defaultTrigger;
+
+ /**
+ * The supplier of initial value to be used for windowed aggregations
+ */
+ private final Supplier<WV> initializer;
/*
* The function that is applied each time a {@link MessageEnvelope} is added to this window.
*/
- private final BiFunction<M, WV, WV> foldFunction;
+ private final FoldLeftFunction<M, WV> foldLeftFunction;
/*
* The function that extracts the key from a {@link MessageEnvelope}
*/
- private final Function<M, K> keyExtractor;
+ private final Function<M, WK> keyExtractor;
/*
* The function that extracts the event time from a {@link MessageEnvelope}
*/
private final Function<M, Long> eventTimeExtractor;
- private Trigger earlyTrigger;
+ /**
+ * The type of this window. Tumbling and Session windows are supported for now.
+ */
+ private final WindowType windowType;
+
+ private Trigger<M> earlyTrigger;
- private Trigger lateTrigger;
+ private Trigger<M> lateTrigger;
private AccumulationMode mode;
- public WindowInternal(Trigger defaultTrigger, BiFunction<M, WV, WV> foldFunction, Function<M, K> keyExtractor, Function<M, Long> eventTimeExtractor) {
- this.foldFunction = foldFunction;
+ public WindowInternal(Trigger<M> defaultTrigger, Supplier<WV> initialValue, FoldLeftFunction<M, WV> foldLeftFunction, Function<M, WK> keyExtractor, Function<M, Long> eventTimeExtractor, WindowType windowType) {
+ this.defaultTrigger = defaultTrigger;
+ this.initializer = initialValue;
+ this.foldLeftFunction = foldLeftFunction;
this.eventTimeExtractor = eventTimeExtractor;
this.keyExtractor = keyExtractor;
- this.defaultTrigger = defaultTrigger;
+ this.windowType = windowType;
}
@Override
- public Window<M, K, WV> setEarlyTrigger(Trigger trigger) {
+ public Window<M, WK, WV> setEarlyTrigger(Trigger<M> trigger) {
this.earlyTrigger = trigger;
return this;
}
@Override
- public Window<M, K, WV> setLateTrigger(Trigger trigger) {
+ public Window<M, WK, WV> setLateTrigger(Trigger<M> trigger) {
this.lateTrigger = trigger;
return this;
}
@Override
- public Window<M, K, WV> setAccumulationMode(AccumulationMode mode) {
+ public Window<M, WK, WV> setAccumulationMode(AccumulationMode mode) {
this.mode = mode;
return this;
}
- public Trigger getDefaultTrigger() {
+ public Trigger<M> getDefaultTrigger() {
return defaultTrigger;
}
- public Trigger getEarlyTrigger() {
+ public Trigger<M> getEarlyTrigger() {
return earlyTrigger;
}
- public Trigger getLateTrigger() {
+ public Trigger<M> getLateTrigger() {
return lateTrigger;
}
- public BiFunction<M, WV, WV> getFoldFunction() {
- return foldFunction;
+ public Supplier<WV> getInitializer() {
+ return initializer;
}
- public Function<M, K> getKeyExtractor() {
+ public FoldLeftFunction<M, WV> getFoldLeftFunction() {
+ return foldLeftFunction;
+ }
+
+ public Function<M, WK> getKeyExtractor() {
return keyExtractor;
}
public Function<M, Long> getEventTimeExtractor() {
return eventTimeExtractor;
}
+
+ public WindowType getWindowType() {
+ return windowType;
+ }
+
+ public AccumulationMode getAccumulationMode() {
+ return mode;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java
new file mode 100644
index 0000000..409d56a
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows.internal;
+
+public enum WindowType {
+ TUMBLING, SESSION
+ //,SLIDING
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
index 54d0b2f..4184c9d 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.windows;
+import org.apache.samza.operators.triggers.FiringType;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -26,7 +27,7 @@ import static org.junit.Assert.assertEquals;
public class TestWindowPane {
@Test
public void testConstructor() {
- WindowPane<String, Integer> wndOutput = WindowPane.of(new WindowKey<>("testMsg", null), 10);
+ WindowPane<String, Integer> wndOutput = new WindowPane(new WindowKey("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY);
assertEquals(wndOutput.getKey().getKey(), "testMsg");
assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index f801097..1b36f76 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
+
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.data.MessageEnvelope;
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
new file mode 100644
index 0000000..4e80862
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+/**
+ * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata.
+ */
+public class WindowState<WV> {
+
+ final WV wv;
+ /**
+ * Time of the first message in the window
+ */
+ final long earliestRecvTime;
+
+ public WindowState(WV wv, long earliestRecvTime) {
+ this.wv = wv;
+ this.earliestRecvTime = earliestRecvTime;
+ }
+
+ public WV getWindowValue() {
+ return wv;
+ }
+
+ public long getEarliestTimestamp() {
+ return earliestRecvTime;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
index 3efd5f5..ca8e34b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
@@ -27,6 +27,8 @@ import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.TaskContext;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.SystemClock;
import java.util.Collection;
import java.util.Collections;
@@ -52,6 +54,16 @@ public class OperatorGraph {
*/
private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>();
+ private final Clock clock;
+
+ public OperatorGraph(Clock clock) {
+ this.clock = clock;
+ }
+
+ public OperatorGraph() {
+ this(SystemClock.instance());
+ }
+
/**
* Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}.
* This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and
@@ -156,14 +168,14 @@ public class OperatorGraph {
* @param context the {@link TaskContext} required to instantiate operators
* @return the {@link OperatorImpl} implementation instance
*/
- private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
+ private <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
if (operatorSpec instanceof StreamOperatorSpec) {
StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
} else if (operatorSpec instanceof SinkOperatorSpec) {
return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
} else if (operatorSpec instanceof WindowOperatorSpec) {
- return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context);
+ return new WindowOperatorImpl((WindowOperatorSpec<M, ?, ?>) operatorSpec, clock);
} else if (operatorSpec instanceof PartialJoinOperatorSpec) {
return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 9983307..b9a606b 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -52,41 +52,37 @@ public abstract class OperatorImpl<M, RM> {
public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator);
/**
- * Perform the actions required on a timer tick and call the downstream operators.
- *
- * Overriding implementations must call {@link #propagateTimer} to propagate the timer tick to registered
- * downstream operators correctly.
+ * Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)}
*
* @param collector the {@link MessageCollector} in the context
* @param coordinator the {@link TaskCoordinator} in the context
*/
- public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
- propagateTimer(collector, coordinator);
+ public final void onTick(MessageCollector collector, TaskCoordinator coordinator) {
+ onTimer(collector, coordinator);
+ nextOperators.forEach(sub -> sub.onTick(collector, coordinator));
}
/**
- * Helper method to propagate the output of this operator to all registered downstream operators.
- *
- * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly.
+ * Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output
+ * to registered downstream operators.
*
- * @param outputMessage output message
* @param collector the {@link MessageCollector} in the context
* @param coordinator the {@link TaskCoordinator} in the context
*/
- void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
- nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
+ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
}
/**
- * Helper method to propagate the timer tick to all registered downstream operators.
+ * Helper method to propagate the output of this operator to all registered downstream operators.
*
- * This method <b>must</b> be called from {@link #onTimer} to propagate the timer tick correctly.
+ * This method <b>must</b> be called from {@link #onNext} and {@link #onTimer}
+ * to propagate the operator output correctly.
*
+ * @param outputMessage output message
* @param collector the {@link MessageCollector} in the context
* @param coordinator the {@link TaskCoordinator} in the context
*/
- void propagateTimer(MessageCollector collector, TaskCoordinator coordinator) {
- nextOperators.forEach(sub -> sub.onTimer(collector, coordinator));
+ void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
+ nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
index f704f3f..b2948a3 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
@@ -93,7 +93,6 @@ class PartialJoinOperatorImpl<K, M, JM, RM> extends OperatorImpl<M, RM> {
thisState.deleteAll(keysToRemove);
LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, System.currentTimeMillis() - now);
- this.propagateTimer(collector, coordinator);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
new file mode 100644
index 0000000..49fefc0
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.operators.windows.WindowKey;
+
+/**
+ * Uniquely identifies a trigger firing
+ */
+public class TriggerKey<WK> {
+ private final FiringType type;
+ private final WindowKey<WK> key;
+
+ public TriggerKey(FiringType type, WindowKey<WK> key) {
+ if (type == null) {
+ throw new IllegalArgumentException("Firing type cannot be null");
+ }
+
+ if (key == null) {
+ throw new IllegalArgumentException("WindowKey cannot be null");
+ }
+
+ this.type = type;
+ this.key = key;
+ }
+
+ /**
+ * Equality is determined by both the type, and the window key.
+ */
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TriggerKey<WK> that = (TriggerKey<WK>) o;
+ return type == that.type && key.equals(that.key);
+ }
+
+ /**
+ * Hashcode is computed by from the type, and the window key.
+ */
+ @Override
+ public int hashCode() {
+ int result = type.hashCode();
+ result = 31 * result + key.hashCode();
+ return result;
+ }
+
+ public WindowKey<WK> getKey() {
+ return key;
+ }
+
+ public FiringType getType() {
+ return type;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java
new file mode 100644
index 0000000..952d9f1
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.triggers.Cancellable;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+
+/**
+ * Allows to schedule and cancel callbacks for triggers.
+ */
+public class TriggerScheduler<WK> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TriggerScheduler.class);
+
+ private final PriorityQueue<TriggerCallbackState<WK>> pendingCallbacks;
+ private final Clock clock;
+
+ public TriggerScheduler(Clock clock) {
+ this.pendingCallbacks = new PriorityQueue<>();
+ this.clock = clock;
+ }
+
+ /**
+ * Schedule the provided runnable for execution at the specified duration.
+ * @param runnable the provided runnable to schedule.
+ * @param scheduledTimeMs time at which the runnable must be scheduled for execution
+ * @param triggerKey a key that uniquely identifies the corresponding trigger firing.
+ * @return a {@link Cancellable} that can be used to cancel the execution of this runnable.
+ */
+ public Cancellable scheduleCallback(Runnable runnable, long scheduledTimeMs, TriggerKey<WK> triggerKey) {
+ TriggerCallbackState<WK> timerState = new TriggerCallbackState(triggerKey, runnable, scheduledTimeMs);
+ pendingCallbacks.add(timerState);
+ LOG.trace("Scheduled a new callback: {} at {} for triggerKey {}", new Object[] {runnable, scheduledTimeMs, triggerKey});
+ return timerState;
+ }
+
+ /**
+ * Run all pending callbacks that are ready to be scheduled. A callback is defined as "ready" if it's scheduledTime
+ * is less than or equal to {@link Clock#currentTimeMillis()}
+ *
+ * @return the list of {@link TriggerKey}s corresponding to the callbacks that were run.
+ */
+ public List<TriggerKey<WK>> runPendingCallbacks() {
+ TriggerCallbackState<WK> state;
+ List<TriggerKey<WK>> keys = new ArrayList<>();
+ long now = clock.currentTimeMillis();
+
+ while ((state = pendingCallbacks.peek()) != null && state.getScheduledTimeMs() <= now) {
+ pendingCallbacks.remove();
+ state.getCallback().run();
+ TriggerKey<WK> key = state.getTriggerKey();
+ keys.add(key);
+ }
+ return keys;
+ }
+
+ /**
+ * State corresponding to pending timer callbacks scheduled by various triggers.
+ */
+ private class TriggerCallbackState<WK> implements Comparable<TriggerCallbackState<WK>>, Cancellable {
+
+ private final TriggerKey<WK> triggerKey;
+ private final Runnable callback;
+
+ // the time at which the callback should trigger
+ private final long scheduledTimeMs;
+
+ private TriggerCallbackState(TriggerKey<WK> triggerKey, Runnable callback, long scheduledTimeMs) {
+ this.triggerKey = triggerKey;
+ this.callback = callback;
+ this.scheduledTimeMs = scheduledTimeMs;
+ }
+
+ private Runnable getCallback() {
+ return callback;
+ }
+
+ private long getScheduledTimeMs() {
+ return scheduledTimeMs;
+ }
+
+ private TriggerKey<WK> getTriggerKey() {
+ return triggerKey;
+ }
+
+ @Override
+ public int compareTo(TriggerCallbackState<WK> other) {
+ return Long.compare(this.scheduledTimeMs, other.scheduledTimeMs);
+ }
+
+ @Override
+ public boolean cancel() {
+ LOG.trace("Cancelled a callback: {} at {} for triggerKey {}", new Object[] {callback, scheduledTimeMs, triggerKey});
+ return pendingCallbacks.remove(this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index af00553..cd3b1bc 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -1,3 +1,5 @@
+// CHECKSTYLE:OFF
+// Turn off checkstyle for this class because of a checkstyle bug in handling nested typed collections
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,26 +20,300 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.triggers.RepeatingTriggerImpl;
+import org.apache.samza.operators.triggers.TimeTrigger;
+import org.apache.samza.operators.triggers.Trigger;
+import org.apache.samza.operators.triggers.TriggerImpl;
+import org.apache.samza.operators.triggers.TriggerImpls;
+import org.apache.samza.operators.triggers.FiringType;
+import org.apache.samza.operators.util.InternalInMemoryStore;
+import org.apache.samza.operators.windows.AccumulationMode;
+import org.apache.samza.operators.windows.WindowKey;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.internal.WindowType;
+import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * Implementation of a window operator that groups messages into finite windows for processing.
+ *
+ * This class implements the processing logic for various types of windows and triggers. It tracks and manages state for
+ * all open windows, the active triggers that correspond to each of the windows and the pending callbacks. It provides
+ * an implementation of {@link TriggerScheduler} that {@link TriggerImpl}s can use to schedule and cancel callbacks. It
+ * also orchestrates the flow of messages through the various {@link TriggerImpl}s.
+ *
+ * <p> An instance of a {@link TriggerImplHandler} is created corresponding to each {@link Trigger} configured for a
+ * particular window. For every message added to the window, this class looks up the corresponding {@link TriggerImplHandler}
+ * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}.
+ * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet
+ * or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A
+ * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. The
+ * {@link WindowOperatorImpl} checks if the trigger fired, and propagates the result of the firing to its downstream
+ * operators.
+ *
+ * @param <M> the type of the incoming message
+ * @param <WK> the type of the key in this {@link org.apache.samza.operators.MessageStream}
+ * @param <WV> the type of the value in the emitted window pane
+ *
+ */
public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> {
+ private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class);
+
private final WindowInternal<M, WK, WV> window;
+ private final KeyValueStore<WindowKey<WK>, WindowState<WV>> store = new InternalInMemoryStore<>();
+ TriggerScheduler<WK> triggerScheduler ;
+
+ // The trigger state corresponding to each {@link TriggerKey}.
+ private final Map<TriggerKey<WK>, TriggerImplHandler> triggers = new HashMap<>();
+ private final Clock clock;
- public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) {
- // source, config, and context are used to initialize the window kv-store
- window = spec.getWindow();
+ public WindowOperatorImpl(WindowOperatorSpec<M, WK, WV> spec, Clock clock) {
+ this.clock = clock;
+ this.window = spec.getWindow();
+ this.triggerScheduler= new TriggerScheduler(clock);
}
@Override
public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
+ LOG.trace("Processing message envelope: {}", message);
+ WindowKey<WK> storeKey = getStoreKey(message);
+ WindowState<WV> existingState = store.get(storeKey);
+ WindowState<WV> newState = applyFoldFunction(existingState, message);
+
+ LOG.trace("New window value: {}, earliest timestamp: {}", newState.getWindowValue(), newState.getEarliestTimestamp());
+ store.put(storeKey, newState);
+
+ if (window.getEarlyTrigger() != null) {
+ TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey);
+
+ getOrCreateTriggerImplWrapper(triggerKey, window.getEarlyTrigger())
+ .onMessage(triggerKey, message, collector, coordinator);
+ }
+
+ if (window.getDefaultTrigger() != null) {
+ TriggerKey<WK> triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey);
+ getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger())
+ .onMessage(triggerKey, message, collector, coordinator);
+ }
}
-}
+
+ @Override
+ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks();
+
+ for (TriggerKey<WK> key : keys) {
+ TriggerImplHandler triggerImplHandler = triggers.get(key);
+ if (triggerImplHandler != null) {
+ triggerImplHandler.onTimer(key, collector, coordinator);
+ }
+ }
+
+ }
+
+ /**
+ * Get the key to be used for lookups in the store for this message.
+ */
+ private WindowKey<WK> getStoreKey(M message) {
+ Function<M, WK> keyExtractor = window.getKeyExtractor();
+ WK key = null;
+
+ if (keyExtractor != null) {
+ key = keyExtractor.apply(message);
+ }
+
+ String paneId = null;
+
+ if (window.getWindowType() == WindowType.TUMBLING) {
+ long triggerDurationMs = ((TimeTrigger<M>) window.getDefaultTrigger()).getDuration().toMillis();
+ final long now = clock.currentTimeMillis();
+ Long windowBoundary = now - now % triggerDurationMs;
+ paneId = windowBoundary.toString();
+ }
+
+ return new WindowKey<>(key, paneId);
+ }
+
+ private WindowState<WV> applyFoldFunction(WindowState<WV> existingState, M message) {
+ WV wv;
+ long earliestTimestamp;
+
+ if (existingState == null) {
+ LOG.trace("No existing state found for key");
+ wv = window.getInitializer().get();
+ earliestTimestamp = clock.currentTimeMillis();
+ } else {
+ wv = existingState.getWindowValue();
+ earliestTimestamp = existingState.getEarliestTimestamp();
+ }
+
+ WV newVal = window.getFoldLeftFunction().apply(message, wv);
+ WindowState<WV> newState = new WindowState(newVal, earliestTimestamp);
+
+ return newState;
+ }
+
+ private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey<WK> triggerKey, Trigger<M> trigger) {
+ TriggerImplHandler wrapper = triggers.get(triggerKey);
+ if (wrapper != null) {
+ LOG.trace("Returning existing trigger wrapper for {}", triggerKey);
+ return wrapper;
+ }
+
+ LOG.trace("Creating a new trigger wrapper for {}", triggerKey);
+
+ TriggerImpl<M, WK> triggerImpl = TriggerImpls.createTriggerImpl(trigger, clock, triggerKey);
+ wrapper = new TriggerImplHandler(triggerKey, triggerImpl);
+ triggers.put(triggerKey, wrapper);
+
+ return wrapper;
+ }
+
+ /**
+ * Handles trigger firings, and propagates results to downstream operators.
+ */
+ private void onTriggerFired(TriggerKey<WK> triggerKey, MessageCollector collector, TaskCoordinator coordinator) {
+ LOG.trace("Trigger key {} fired." , triggerKey);
+
+ TriggerImplHandler wrapper = triggers.get(triggerKey);
+ WindowKey<WK> windowKey = triggerKey.getKey();
+ WindowState<WV> state = store.get(windowKey);
+
+ if (state == null) {
+ LOG.trace("No state found for triggerKey: {}", triggerKey);
+ return;
+ }
+
+ WindowPane<WK, WV> paneOutput = computePaneOutput(triggerKey, state);
+ super.propagateResult(paneOutput, collector, coordinator);
+
+ // Handle accumulation modes.
+ if (window.getAccumulationMode() == AccumulationMode.DISCARDING) {
+ LOG.trace("Clearing state for trigger key: {}", triggerKey);
+ store.put(windowKey, null);
+ }
+
+ // Cancel all early triggers too when the default trigger fires. Also, clean all state for the key.
+ // note: We don't handle late arrivals yet, So, all arrivals are either early or on-time.
+ if (triggerKey.getType() == FiringType.DEFAULT) {
+
+ LOG.trace("Default trigger fired. Canceling triggers for {}", triggerKey);
+
+ cancelTrigger(triggerKey, true);
+ cancelTrigger(new TriggerKey(FiringType.EARLY, triggerKey.getKey()), true);
+ WindowKey<WK> key = triggerKey.getKey();
+ store.delete(key);
+ }
+
+ // Cancel non-repeating early triggers. All early triggers should be removed from the "triggers" map only after the
+ // firing of their corresponding default trigger. Removing them pre-maturely (immediately after cancellation) will
+ // will create a new {@link TriggerImplWrapper} instance at a future invocation of getOrCreateTriggerWrapper().
+ // This would cause an already canceled trigger to fire again for the window.
+
+ if (triggerKey.getType() == FiringType.EARLY && !wrapper.isRepeating()) {
+ cancelTrigger(triggerKey, false);
+ }
+ }
+
+ /**
+ * Computes the pane output corresponding to a {@link TriggerKey} that fired.
+ */
+ private WindowPane<WK, WV> computePaneOutput(TriggerKey<WK> triggerKey, WindowState<WV> state) {
+ WindowKey<WK> windowKey = triggerKey.getKey();
+ WV windowVal = state.getWindowValue();
+
+ // For session windows, we will create a new window key by using the time of the first message in the window as
+ //the paneId.
+ if (window.getWindowType() == WindowType.SESSION) {
+ windowKey = new WindowKey<>(windowKey.getKey(), Long.toString(state.getEarliestTimestamp()));
+ }
+
+ // Make a defensive copy so that we are immune to further mutations on the collection
+ if (windowVal instanceof Collection) {
+ windowVal = (WV) new ArrayList<>((Collection<WV>) windowVal);
+ }
+
+ WindowPane<WK, WV> paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType());
+ LOG.trace("Emitting pane output for trigger key {}", triggerKey);
+ return paneOutput;
+ }
+
+ /**
+ * Cancels the firing of the {@link TriggerImpl} identified by this {@link TriggerKey} and optionally removes it.
+ */
+ private void cancelTrigger(TriggerKey<WK> triggerKey, boolean shouldRemove) {
+ TriggerImplHandler triggerImplHandler = triggers.get(triggerKey);
+ if (triggerImplHandler != null) {
+ triggerImplHandler.cancel();
+ }
+ if (shouldRemove && triggerKey != null) {
+ triggers.remove(triggerKey);
+ }
+ }
+
+ /**
+ * State corresponding to a created {@link TriggerImpl} instance.
+ */
+ private class TriggerImplHandler {
+ // The context, and the {@link TriggerImpl} instance corresponding to this triggerKey
+ private final TriggerImpl<M, WK> impl;
+ // Guard to ensure that we don't invoke onMessage or onTimer on already cancelled triggers
+ private boolean isCancelled = false;
+
+ public TriggerImplHandler(TriggerKey<WK> key, TriggerImpl<M, WK> impl) {
+ this.impl = impl;
+ }
+
+ public void onMessage(TriggerKey<WK> triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) {
+ if (!isCancelled) {
+ LOG.trace("Forwarding callbacks for {}", message);
+ impl.onMessage(message, triggerScheduler);
+
+ if (impl.shouldFire()) {
+ // repeating trigger can trigger multiple times, So, clear the state to allow future triggerings.
+ if (impl instanceof RepeatingTriggerImpl) {
+ ((RepeatingTriggerImpl<M, WK>) impl).clear();
+ }
+ onTriggerFired(triggerKey, collector, coordinator);
+ }
+ }
+ }
+
+ public void onTimer(TriggerKey<WK> key, MessageCollector collector, TaskCoordinator coordinator) {
+ if (impl.shouldFire() && !isCancelled) {
+ LOG.trace("Triggering timer triggers");
+
+ // repeating trigger can trigger multiple times, So, clear the trigger to allow future triggerings.
+ if (impl instanceof RepeatingTriggerImpl) {
+ ((RepeatingTriggerImpl<M, WK>) impl).clear();
+ }
+ onTriggerFired(key, collector, coordinator);
+ }
+ }
+
+ public void cancel() {
+ impl.cancel();
+ isCancelled = true;
+ }
+
+ public boolean isRepeating() {
+ return this.impl instanceof RepeatingTriggerImpl;
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
index 46417ed..6d948d7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java
@@ -19,9 +19,11 @@
package org.apache.samza.operators.spec;
+import org.apache.samza.config.Config;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.TaskContext;
/**
@@ -54,11 +56,18 @@ public class WindowOperatorSpec<M, WK, WV> implements OperatorSpec<WindowPane<WK
}
@Override
+ public void init(Config config, TaskContext context) {
+ if (window.getFoldLeftFunction() != null) {
+ window.getFoldLeftFunction().init(config, context);
+ }
+ }
+
+ @Override
public MessageStreamImpl<WindowPane<WK, WV>> getNextStream() {
return this.outputStream;
}
- public WindowInternal getWindow() {
+ public WindowInternal<M, WK, WV> getWindow() {
return window;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java
new file mode 100644
index 0000000..a0aa384
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Implementation of an {@link AnyTrigger}
+ */
+public class AnyTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+ private final List<Trigger<M>> triggers;
+
+ private final List<TriggerImpl<M, WK>> triggerImpls = new ArrayList<>();
+ private final Clock clock;
+ private boolean shouldFire = false;
+
+ public AnyTriggerImpl(AnyTrigger<M> anyTrigger, Clock clock, TriggerKey<WK> triggerKey) {
+ this.triggers = anyTrigger.getTriggers();
+ this.clock = clock;
+ for (Trigger<M> trigger : triggers) {
+ triggerImpls.add(TriggerImpls.createTriggerImpl(trigger, clock, triggerKey));
+ }
+ }
+
+ @Override
+ public void onMessage(M message, TriggerScheduler<WK> context) {
+ for (TriggerImpl<M, WK> impl : triggerImpls) {
+ impl.onMessage(message, context);
+ if (impl.shouldFire()) {
+ shouldFire = true;
+ break;
+ }
+ }
+ if (shouldFire) {
+ cancel();
+ }
+ }
+
+ public void cancel() {
+ for (Iterator<TriggerImpl<M, WK>> it = triggerImpls.iterator(); it.hasNext(); ) {
+ TriggerImpl<M, WK> impl = it.next();
+ impl.cancel();
+ it.remove();
+ }
+ }
+
+ @Override
+ public boolean shouldFire() {
+ for (TriggerImpl<M, WK> impl : triggerImpls) {
+ if (impl.shouldFire()) {
+ shouldFire = true;
+ break;
+ }
+ }
+ return shouldFire;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
new file mode 100644
index 0000000..ca0ba67
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+/**
+ * Represents a task or an operation whose execution can be cancelled.
+ */
+public interface Cancellable {
+
+ /**
+ * Cancel the execution of this operation (if it is not scheduled for execution yet). If the operation is in progress,
+ * it is not interrupted / cancelled.
+ *
+ * @return the result of the cancelation
+ */
+ public boolean cancel();
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java
new file mode 100644
index 0000000..da1efda
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link CountTrigger}
+ */
+public class CountTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+ private static final Logger LOG = LoggerFactory.getLogger(CountTriggerImpl.class);
+
+ private final long triggerCount;
+ private final TriggerKey<WK> triggerKey;
+ private long currentCount;
+ private boolean shouldFire = false;
+
+ public CountTriggerImpl(CountTrigger<M> triggerCount, TriggerKey<WK> triggerKey) {
+ this.triggerCount = triggerCount.getCount();
+ this.currentCount = 0;
+ this.triggerKey = triggerKey;
+ }
+
+ public void onMessage(M message, TriggerScheduler<WK> context) {
+ currentCount++;
+ if (currentCount == triggerCount) {
+ LOG.trace("count trigger fired for {}", message);
+ shouldFire = true;
+ }
+ }
+
+ @Override
+ public void cancel() {
+ //no-op
+ }
+
+ @Override
+ public boolean shouldFire() {
+ return shouldFire;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java
new file mode 100644
index 0000000..39b9b40
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link RepeatingTrigger}
+ */
+public class RepeatingTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+ private static final Logger LOG = LoggerFactory.getLogger(RepeatingTriggerImpl.class);
+
+ private final Trigger<M> repeatingTrigger;
+ private final Clock clock;
+ private final TriggerKey<WK> triggerKey;
+
+ private TriggerImpl<M, WK> currentTriggerImpl;
+
+ public RepeatingTriggerImpl(RepeatingTrigger<M> repeatingTrigger, Clock clock, TriggerKey<WK> key) {
+ this.repeatingTrigger = repeatingTrigger.getTrigger();
+ this.clock = clock;
+ this.triggerKey = key;
+ this.currentTriggerImpl = TriggerImpls.createTriggerImpl(this.repeatingTrigger, clock, triggerKey);
+ }
+
+ @Override
+ public void onMessage(M message, TriggerScheduler<WK> context) {
+ currentTriggerImpl.onMessage(message, context);
+ }
+
+ @Override
+ public void cancel() {
+ currentTriggerImpl.cancel();
+ }
+
+ public void clear() {
+ LOG.trace("Clearing state for repeating trigger");
+ currentTriggerImpl.cancel();
+ currentTriggerImpl = TriggerImpls.createTriggerImpl(repeatingTrigger, clock, triggerKey);
+ }
+
+ @Override
+ public boolean shouldFire() {
+ return currentTriggerImpl.shouldFire();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/d399d6f3/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java
new file mode 100644
index 0000000..32bf988
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators.triggers;
+
+import org.apache.samza.operators.impl.TriggerKey;
+import org.apache.samza.operators.impl.TriggerScheduler;
+import org.apache.samza.util.Clock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation class for a {@link TimeSinceFirstMessageTrigger}
+ * @param <M> the type of the incoming message
+ */
+public class TimeSinceFirstMessageTriggerImpl<M, WK> implements TriggerImpl<M, WK> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TimeSinceFirstMessageTriggerImpl.class);
+
+ private final TimeSinceFirstMessageTrigger<M> trigger;
+ private final Clock clock;
+ private final TriggerKey<WK> triggerKey;
+ private Cancellable cancellable;
+ private boolean shouldFire = false;
+
+ public TimeSinceFirstMessageTriggerImpl(TimeSinceFirstMessageTrigger<M> trigger, Clock clock, TriggerKey<WK> key) {
+ this.trigger = trigger;
+ this.clock = clock;
+ this.triggerKey = key;
+ }
+
+ public void onMessage(M message, TriggerScheduler<WK> context) {
+ if (cancellable == null && !shouldFire) {
+ final long now = clock.currentTimeMillis();
+ long triggerDurationMs = trigger.getDuration().toMillis();
+ Long callbackTime = now + triggerDurationMs;
+ cancellable = context.scheduleCallback(() -> {
+ LOG.trace("Time since first message trigger fired");
+ shouldFire = true;
+ }, callbackTime, triggerKey);
+ }
+ }
+
+ @Override
+ public void cancel() {
+ if (cancellable != null) {
+ cancellable.cancel();
+ }
+ }
+
+ @Override
+ public boolean shouldFire() {
+ return shouldFire;
+ }
+}