You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2016/11/01 21:59:17 UTC
[3/4] samza git commit: SAMZA-1045: Move classes from operator/api
into samza-api
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
new file mode 100644
index 0000000..da813b1
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/task/StreamOperatorTask.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.task;
+
+import org.apache.samza.operators.MessageStreams.SystemMessageStream;
+
+import java.util.Collection;
+
+/**
+ * This interface defines the methods that user needs to implement via the operator programming APIs.
+ */
+public interface StreamOperatorTask {
+
+ /**
+ * Defines the method for users to initialize the operator chains consuming from all {@link SystemMessageStream}s.
+ * Users have to implement this function to define their transformation logic on each of the incoming
+ * {@link SystemMessageStream}.
+ *
+ * Note that each {@link SystemMessageStream} corresponds to an input {@link org.apache.samza.system.SystemStreamPartition}
+ *
+ * @param sources the collection of {@link SystemMessageStream}s that takes {@link org.apache.samza.operators.data.IncomingSystemMessage}
+ * from a {@link org.apache.samza.system.SystemStreamPartition}
+ */
+ void initOperators(Collection<SystemMessageStream> sources);
+
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
index 963ccf2..adb6264 100644
--- a/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
+++ b/samza-api/src/main/java/org/apache/samza/storage/StorageEngineFactory.java
@@ -47,12 +47,12 @@ public interface StorageEngineFactory<K, V> {
* @return The storage engine instance.
*/
public StorageEngine getStorageEngine(
- String storeName,
- File storeDir,
- Serde<K> keySerde,
- Serde<V> msgSerde,
- MessageCollector collector,
- MetricsRegistry registry,
- SystemStreamPartition changeLogSystemStreamPartition,
- SamzaContainerContext containerContext);
+ String storeName,
+ File storeDir,
+ Serde<K> keySerde,
+ Serde<V> msgSerde,
+ MessageCollector collector,
+ MetricsRegistry registry,
+ SystemStreamPartition changeLogSystemStreamPartition,
+ SamzaContainerContext containerContext);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
new file mode 100644
index 0000000..8c56287
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.Message;
+
+
+public class TestMessage implements Message<String, String> {
+
+ private final String key;
+ private final String value;
+ private final long timestamp;
+
+ TestMessage(String key, String value, long timestamp) {
+ this.key = key;
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ @Override public String getMessage() {
+ return this.value;
+ }
+
+ @Override public String getKey() {
+ return this.key;
+ }
+
+ @Override public long getTimestamp() {
+ return this.timestamp;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
new file mode 100644
index 0000000..4dbe233
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStream.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.internal.Operators.*;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStream {
+
+ @Test public void testMap() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ Function<TestMessage, TestOutputMessage> xMap = m -> new TestOutputMessage(m.getKey(), m.getMessage().length() + 1, m.getTimestamp() + 2);
+ MessageStream<TestOutputMessage> outputStream = inputStream.map(xMap);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestOutputMessage> mapOp = subs.iterator().next();
+ assertTrue(mapOp instanceof StreamOperator);
+ assertEquals(mapOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ TestMessage xTestMsg = mock(TestMessage.class);
+ when(xTestMsg.getKey()).thenReturn("test-msg-key");
+ when(xTestMsg.getMessage()).thenReturn("123456789");
+ when(xTestMsg.getTimestamp()).thenReturn(12345L);
+ Collection<TestOutputMessage> cOutputMsg = ((StreamOperator<TestMessage, TestOutputMessage>) mapOp).getFunction().apply(xTestMsg);
+ assertEquals(cOutputMsg.size(), 1);
+ TestOutputMessage outputMessage = cOutputMsg.iterator().next();
+ assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+ assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().length() + 1));
+ assertEquals(outputMessage.getTimestamp(), xTestMsg.getTimestamp() + 2);
+ }
+
+ @Test public void testFlatMap() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ Set<TestOutputMessage> flatOuts = new HashSet<TestOutputMessage>() { {
+ this.add(mock(TestOutputMessage.class));
+ this.add(mock(TestOutputMessage.class));
+ this.add(mock(TestOutputMessage.class));
+ } };
+ Function<TestMessage, Collection<TestOutputMessage>> xFlatMap = m -> flatOuts;
+ MessageStream<TestOutputMessage> outputStream = inputStream.flatMap(xFlatMap);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestOutputMessage> flatMapOp = subs.iterator().next();
+ assertTrue(flatMapOp instanceof StreamOperator);
+ assertEquals(flatMapOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ assertEquals(((StreamOperator<TestMessage, TestOutputMessage>) flatMapOp).getFunction(), xFlatMap);
+ }
+
+ @Test public void testFilter() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ Function<TestMessage, Boolean> xFilter = m -> m.getTimestamp() > 123456L;
+ MessageStream<TestMessage> outputStream = inputStream.filter(xFilter);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> filterOp = subs.iterator().next();
+ assertTrue(filterOp instanceof StreamOperator);
+ assertEquals(filterOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ Function<TestMessage, Collection<TestMessage>> txfmFn = ((StreamOperator<TestMessage, TestMessage>) filterOp).getFunction();
+ TestMessage mockMsg = mock(TestMessage.class);
+ when(mockMsg.getTimestamp()).thenReturn(11111L);
+ Collection<TestMessage> output = txfmFn.apply(mockMsg);
+ assertTrue(output.isEmpty());
+ when(mockMsg.getTimestamp()).thenReturn(999999L);
+ output = txfmFn.apply(mockMsg);
+ assertEquals(output.size(), 1);
+ assertEquals(output.iterator().next(), mockMsg);
+ }
+
+ @Test public void testSink() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> xSink = (m, mc, tc) -> {
+ mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
+ tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+ };
+ inputStream.sink(xSink);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> sinkOp = subs.iterator().next();
+ assertTrue(sinkOp instanceof SinkOperator);
+ assertEquals(((SinkOperator) sinkOp).getFunction(), xSink);
+ assertNull(((SinkOperator) sinkOp).getOutputStream());
+ }
+
+ @Test public void testWindow() {
+ MessageStream<TestMessage> inputStream = new MessageStream<>();
+ Windows.SessionWindow<TestMessage, String, Integer> window = mock(Windows.SessionWindow.class);
+ MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window);
+ Collection<Operator> subs = inputStream.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> wndOp = subs.iterator().next();
+ assertTrue(wndOp instanceof WindowOperator);
+ assertEquals(((WindowOperator) wndOp).getOutputStream(), outStream);
+ }
+
+ @Test public void testJoin() {
+ MessageStream<TestMessage> source1 = new MessageStream<>();
+ MessageStream<TestMessage> source2 = new MessageStream<>();
+ BiFunction<TestMessage, TestMessage, TestOutputMessage> joiner = (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp());
+ MessageStream<TestOutputMessage> joinOutput = source1.join(source2, joiner);
+ Collection<Operator> subs = source1.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> joinOp1 = subs.iterator().next();
+ assertTrue(joinOp1 instanceof PartialJoinOperator);
+ assertEquals(((PartialJoinOperator) joinOp1).getOutputStream(), joinOutput);
+ subs = source2.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> joinOp2 = subs.iterator().next();
+ assertTrue(joinOp2 instanceof PartialJoinOperator);
+ assertEquals(((PartialJoinOperator) joinOp2).getOutputStream(), joinOutput);
+ TestMessage joinMsg1 = new TestMessage("test-join-1", "join-msg-001", 11111L);
+ TestMessage joinMsg2 = new TestMessage("test-join-2", "join-msg-002", 22222L);
+ TestOutputMessage xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp1).getFunction().apply(joinMsg1, joinMsg2);
+ assertEquals(xOut.getKey(), "test-join-1");
+ assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ assertEquals(xOut.getTimestamp(), 11111L);
+ xOut = (TestOutputMessage) ((PartialJoinOperator) joinOp2).getFunction().apply(joinMsg2, joinMsg1);
+ assertEquals(xOut.getKey(), "test-join-1");
+ assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ assertEquals(xOut.getTimestamp(), 11111L);
+ }
+
+ @Test public void testMerge() {
+ MessageStream<TestMessage> merge1 = new MessageStream<>();
+ Collection<MessageStream<TestMessage>> others = new ArrayList<MessageStream<TestMessage>>() { {
+ this.add(new MessageStream<>());
+ this.add(new MessageStream<>());
+ } };
+ MessageStream<TestMessage> mergeOutput = merge1.merge(others);
+ validateMergeOperator(merge1, mergeOutput);
+
+ others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+ }
+
+ private void validateMergeOperator(MessageStream<TestMessage> mergeSource, MessageStream<TestMessage> mergeOutput) {
+ Collection<Operator> subs = mergeSource.getSubscribers();
+ assertEquals(subs.size(), 1);
+ Operator<TestMessage> mergeOp = subs.iterator().next();
+ assertTrue(mergeOp instanceof StreamOperator);
+ assertEquals(((StreamOperator) mergeOp).getOutputStream(), mergeOutput);
+ TestMessage mockMsg = mock(TestMessage.class);
+ Collection<TestMessage> outputs = ((StreamOperator<TestMessage, TestMessage>) mergeOp).getFunction().apply(mockMsg);
+ assertEquals(outputs.size(), 1);
+ assertEquals(outputs.iterator().next(), mockMsg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
new file mode 100644
index 0000000..c5fcceb
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestMessageStreams.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.Partition;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestMessageStreams {
+
+ @Test public void testInput() {
+ SystemStreamPartition ssp = new SystemStreamPartition("my-system", "my-stream", new Partition(0));
+ MessageStreams.SystemMessageStream mSysStream = MessageStreams.input(ssp);
+ assertEquals(mSysStream.getSystemStreamPartition(), ssp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
new file mode 100644
index 0000000..14e6562
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.Message;
+
+
+public class TestOutputMessage implements Message<String, Integer> {
+ private final String key;
+ private final Integer value;
+ private final long timestamp;
+
+ public TestOutputMessage(String key, Integer value, long timestamp) {
+ this.key = key;
+ this.value = value;
+ this.timestamp = timestamp;
+ }
+
+ @Override public Integer getMessage() {
+ return this.value;
+ }
+
+ @Override public String getKey() {
+ return this.key;
+ }
+
+ @Override public long getTimestamp() {
+ return this.timestamp;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
new file mode 100644
index 0000000..e6d9e4a
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTriggerBuilder {
+ private Field earlyTriggerField;
+ private Field lateTriggerField;
+ private Field timerTriggerField;
+ private Field earlyTriggerUpdater;
+ private Field lateTriggerUpdater;
+
+ @Before
+ public void testPrep() throws Exception {
+ this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger");
+ this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger");
+ this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger");
+ this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
+ this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
+
+ this.earlyTriggerField.setAccessible(true);
+ this.lateTriggerField.setAccessible(true);
+ this.timerTriggerField.setAccessible(true);
+ this.earlyTriggerUpdater.setAccessible(true);
+ this.lateTriggerUpdater.setAccessible(true);
+ }
+
+ @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException {
+ TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
+ (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+ WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(triggerField.apply(null, mockState));
+ when(mockState.getNumberMessages()).thenReturn(2000L);
+ assertTrue(triggerField.apply(null, mockState));
+
+ Function<TestMessage, Boolean> tokenFunc = m -> true;
+ builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
+ triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+ TestMessage m = mock(TestMessage.class);
+ assertTrue(triggerField.apply(m, mockState));
+
+ builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
+ triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+ when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
+ when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
+ when(m.getTimestamp()).thenReturn(19999000000L);
+ assertFalse(triggerField.apply(m, mockState));
+ when(m.getTimestamp()).thenReturn(32000000000L);
+ assertTrue(triggerField.apply(m, mockState));
+ when(m.getTimestamp()).thenReturn(1001000000L);
+ when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
+ assertTrue(triggerField.apply(m, mockState));
+
+ BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class);
+ builder = TriggerBuilder.earlyTrigger(mockFunc);
+ triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+ assertEquals(triggerField, mockFunc);
+
+ builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
+ Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
+ (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+ when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+ assertTrue(timerTrigger.apply(mockState));
+ // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+ when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+ assertFalse(timerTrigger.apply(mockState));
+
+ builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
+ timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+ when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+ assertTrue(timerTrigger.apply(mockState));
+ // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+ when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000));
+ assertFalse(timerTrigger.apply(mockState));
+ }
+
+ @Test public void testAddTimerTriggers() throws IllegalAccessException {
+ TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ builder.addTimeoutSinceFirstMessage(10000L);
+ // exam that both earlyTrigger and timer triggers are set up
+ BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
+ (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+ WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(triggerField.apply(null, mockState));
+ // check the timer trigger
+ Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
+ (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+ when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+ assertTrue(timerTrigger.apply(mockState));
+ // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+ when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+ assertFalse(timerTrigger.apply(mockState));
+
+ // exam that both early trigger and timer triggers are set up
+ builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+ mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(triggerField.apply(null, mockState));
+ builder.addTimeoutSinceLastMessage(20000L);
+ // check the timer trigger
+ timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
+ when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+ assertTrue(timerTrigger.apply(mockState));
+ // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+ when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+ assertFalse(timerTrigger.apply(mockState));
+ }
+
+ @Test public void testAddLateTriggers() throws IllegalAccessException {
+ TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ builder.addLateTriggerOnSizeLimit(10000L);
+ // exam that both earlyTrigger and lateTriggers are set up
+ BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger =
+ (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+ WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(earlyTrigger.apply(null, mockState));
+ // check the late trigger
+ BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger =
+ (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
+ assertFalse(lateTrigger.apply(null, mockState));
+ // set the number of messages to 10001 to trigger the late trigger
+ when(mockState.getNumberMessages()).thenReturn(10001L);
+ assertTrue(lateTrigger.apply(null, mockState));
+
+ builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
+ // exam that both earlyTrigger and lateTriggers are set up
+ earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
+ mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(earlyTrigger.apply(null, mockState));
+ // exam the lateTrigger
+ when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
+ lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
+ assertFalse(lateTrigger.apply(null, mockState));
+ List<TestMessage> mockList = mock(ArrayList.class);
+ when(mockList.size()).thenReturn(200);
+ when(mockState.getOutputValue()).thenReturn(mockList);
+ assertTrue(lateTrigger.apply(null, mockState));
+ }
+
+ @Test public void testAddTriggerUpdater() throws IllegalAccessException {
+ TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ builder.onEarlyTrigger(c -> {
+ c.clear();
+ return c;
+ });
+ List<TestMessage> collection = new ArrayList<TestMessage>() { {
+ for (int i = 0; i < 10; i++) {
+ this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime()));
+ }
+ } };
+ // exam that earlyTriggerUpdater is set up
+ Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
+ (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
+ WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
+ when(mockState.getOutputValue()).thenReturn(collection);
+ earlyTriggerUpdater.apply(mockState);
+ assertTrue(collection.isEmpty());
+
+ collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime()));
+ collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime()));
+ builder.onLateTrigger(c -> {
+ c.removeIf(t -> t.getKey().equals("key-to-remove"));
+ return c;
+ });
+ // check the late trigger updater
+ Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater =
+ (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
+ when(mockState.getOutputValue()).thenReturn(collection);
+ lateTriggerUpdater.apply(mockState);
+ assertTrue(collection.size() == 1);
+ assertFalse(collection.get(0).isDelete());
+ assertEquals(collection.get(0).getKey(), "key-to-stay");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
new file mode 100644
index 0000000..8a25a96
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.Windows.Window;
+import org.apache.samza.operators.internal.Trigger;
+import org.apache.samza.operators.internal.WindowOutput;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWindows {
+
+ @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException {
+ // test constructing the default session window
+ Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions(
+ TestMessage::getKey);
+ assertTrue(testWnd instanceof Windows.SessionWindow);
+ Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
+ Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator");
+ wndKeyFuncField.setAccessible(true);
+ aggregatorField.setAccessible(true);
+ Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd);
+ assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key");
+ BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc =
+ (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd);
+ TestMessage mockMsg = mock(TestMessage.class);
+ Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+ assertTrue(collection.size() == 1);
+ assertTrue(collection.contains(mockMsg));
+
+ // test constructing the session window w/ customized session info
+ Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
+ m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0));
+ assertTrue(testWnd2 instanceof Windows.SessionWindow);
+ wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
+ aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2);
+ assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0");
+ when(mockMsg.getMessage()).thenReturn("x-001");
+ collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+ assertTrue(collection.size() == 1);
+ assertTrue(collection.contains('x'));
+
+ // test constructing session window w/ a default counter
+ Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
+ m -> String.format("key-%d", m.getTimestamp()));
+ assertTrue(testCounter instanceof Windows.SessionWindow);
+ wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter);
+ BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
+ when(mockMsg.getTimestamp()).thenReturn(12345L);
+ assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
+ assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
+ }
+
+ @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException {
+ Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
+ m -> String.format("key-%d", m.getTimestamp()));
+ // test session window w/ a trigger
+ TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
+ testCounter.setTriggers(triggerBuilder);
+ Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build();
+ Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger();
+ // examine all trigger fields are expected
+ Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+ Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+ Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+ Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater");
+ Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater");
+ earlyTriggerField.setAccessible(true);
+ lateTriggerField.setAccessible(true);
+ timerTriggerField.setAccessible(true);
+ earlyTriggerUpdater.setAccessible(true);
+ lateTriggerUpdater.setAccessible(true);
+ assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger));
+ assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger));
+ assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger));
+ assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger));
+ assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
new file mode 100644
index 0000000..b734e87
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestIncomingSystemMessage {
+
+ @Test public void testConstructor() {
+ IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
+ IncomingSystemMessage ism = new IncomingSystemMessage(ime);
+
+ Object mockKey = mock(Object.class);
+ Object mockValue = mock(Object.class);
+ LongOffset testOffset = new LongOffset("12345");
+ SystemStreamPartition mockSsp = mock(SystemStreamPartition.class);
+
+ when(ime.getKey()).thenReturn(mockKey);
+ when(ime.getMessage()).thenReturn(mockValue);
+ when(ime.getSystemStreamPartition()).thenReturn(mockSsp);
+ when(ime.getOffset()).thenReturn("12345");
+
+ assertEquals(ism.getKey(), mockKey);
+ assertEquals(ism.getMessage(), mockValue);
+ assertEquals(ism.getSystemStreamPartition(), mockSsp);
+ assertEquals(ism.getOffset(), testOffset);
+ assertFalse(ism.isDelete());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
new file mode 100644
index 0000000..943c47f
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+
+public class TestLongOffset {
+
+ @Test public void testConstructor() throws Exception {
+ LongOffset o1 = new LongOffset("12345");
+ Field offsetField = LongOffset.class.getDeclaredField("offset");
+ offsetField.setAccessible(true);
+ Long x = (Long) offsetField.get(o1);
+ assertEquals(x.longValue(), 12345L);
+
+ o1 = new LongOffset("012345");
+ x = (Long) offsetField.get(o1);
+ assertEquals(x.longValue(), 12345L);
+
+ try {
+ o1 = new LongOffset("xyz");
+ fail("Constructor of LongOffset should have failed w/ mal-formatted numbers");
+ } catch (NumberFormatException nfe) {
+ // expected
+ }
+ }
+
+ @Test public void testComparator() {
+ LongOffset o1 = new LongOffset("11111");
+ Offset other = mock(Offset.class);
+ try {
+ o1.compareTo(other);
+ fail("compareTo() should have have failed when comparing to an object of a different class");
+ } catch (IllegalArgumentException iae) {
+ // expected
+ }
+
+ LongOffset o2 = new LongOffset("-10000");
+ assertEquals(o1.compareTo(o2), 1);
+ LongOffset o3 = new LongOffset("22222");
+ assertEquals(o1.compareTo(o3), -1);
+ LongOffset o4 = new LongOffset("11111");
+ assertEquals(o1.compareTo(o4), 0);
+ }
+
+ @Test public void testEquals() {
+ LongOffset o1 = new LongOffset("12345");
+ Offset other = mock(Offset.class);
+ assertFalse(o1.equals(other));
+
+ LongOffset o2 = new LongOffset("0012345");
+ assertTrue(o1.equals(o2));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
new file mode 100644
index 0000000..d994486
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperators {
+
+ private class TestMessage implements Message<String, Object> {
+ private final long timestamp;
+ private final String key;
+ private final Object msg;
+
+
+ TestMessage(String key, Object msg, long timestamp) {
+ this.timestamp = timestamp;
+ this.key = key;
+ this.msg = msg;
+ }
+
+ @Override public Object getMessage() {
+ return this.msg;
+ }
+
+ @Override public String getKey() {
+ return this.key;
+ }
+
+ @Override public long getTimestamp() {
+ return this.timestamp;
+ }
+ }
+
+ @Test public void testGetStreamOperator() {
+ Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() { {
+ this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L));
+ } };
+ Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn);
+ assertEquals(strmOp.getFunction(), transformFn);
+ assertTrue(strmOp.getOutputStream() instanceof MessageStream);
+ }
+
+ @Test public void testGetSinkOperator() {
+ MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> { };
+ Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn);
+ assertEquals(sinkOp.getFunction(), sinkFn);
+ assertTrue(sinkOp.getOutputStream() == null);
+ }
+
+ @Test public void testGetWindowOperator() {
+ WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class);
+ BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null;
+ Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class);
+ Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
+ MessageStream<TestMessage> mockInput = mock(MessageStream.class);
+ when(windowFn.getTransformFunc()).thenReturn(xFunction);
+ when(windowFn.getStoreFuncs()).thenReturn(storeFns);
+ when(windowFn.getTrigger()).thenReturn(trigger);
+ when(mockInput.toString()).thenReturn("mockStream1");
+
+ Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
+ assertEquals(windowOp.getFunction(), xFunction);
+ assertEquals(windowOp.getStoreFunctions(), storeFns);
+ assertEquals(windowOp.getTrigger(), trigger);
+ assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString()));
+ }
+
+ @Test public void testGetPartialJoinOperator() {
+ BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
+ (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
+ Math.max(m1.getTimestamp(), m2.getTimestamp()));
+ MessageStream<TestMessage> joinOutput = new MessageStream<>();
+ Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin =
+ Operators.getPartialJoinOperator(merger, joinOutput);
+
+ assertEquals(partialJoin.getOutputStream(), joinOutput);
+ Message<Object, Object> m = mock(Message.class);
+ Message<Object, Object> s = mock(Message.class);
+ assertEquals(partialJoin.getFunction(), merger);
+ assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
+ assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m);
+ assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
+ assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
+ }
+
+ @Test public void testGetMergeOperator() {
+ MessageStream<TestMessage> output = new MessageStream<>();
+ Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output);
+ Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() { {
+ this.add(t);
+ } };
+ TestMessage t = mock(TestMessage.class);
+ assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
+ assertEquals(mergeOp.getOutputStream(), output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
new file mode 100644
index 0000000..0f35a7c
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.apache.samza.operators.WindowState;
+import org.apache.samza.operators.data.Message;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestTrigger {
+
+ @Test public void testConstructor() throws Exception {
+ BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
+ BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000;
+ Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis();
+ Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> {
+ s.setOutputValue(0);
+ return s;
+ };
+ Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> {
+ s.setOutputValue(1);
+ return s;
+ };
+
+ Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
+ earlyTriggerUpdater, lateTriggerUpdater);
+
+ Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+ Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+ Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+ Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater");
+ Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater");
+ earlyTriggerField.setAccessible(true);
+ lateTriggerField.setAccessible(true);
+ timerTriggerField.setAccessible(true);
+ earlyTriggerUpdaterField.setAccessible(true);
+ lateTriggerUpdaterField.setAccessible(true);
+
+ assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
+ assertEquals(timerTrigger, timerTriggerField.get(trigger));
+ assertEquals(lateTrigger, lateTriggerField.get(trigger));
+ assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
+ assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
new file mode 100644
index 0000000..268c9fc
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.internal;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestWindowOutput {
+
+ @Test public void testConstructor() {
+ WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
+ assertEquals(wndOutput.getKey(), "testMsg");
+ assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+ assertFalse(wndOutput.isDelete());
+ assertEquals(wndOutput.getTimestamp(), 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
index 429573b..75de630 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
@@ -94,8 +94,7 @@ public class MockCoordinatorStreamWrappedConsumer extends BlockingEnvelopeMap {
}
@Override
- public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(
- Set<SystemStreamPartition> systemStreamPartitions, long timeout)
+ public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout)
throws InterruptedException {
if (blockpollFlag) {
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
index 0e73e18..baf146a 100644
--- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamSystemConsumer.java
@@ -134,7 +134,9 @@ public class TestCoordinatorStreamSystemConsumer {
assertEquals(expectedSystemStreamPartition, systemStreamPartition);
}
- public int getRegisterCount() { return registerCount; }
+ public int getRegisterCount() {
+ return registerCount;
+ }
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws InterruptedException {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> map = new LinkedHashMap<SystemStreamPartition, List<IncomingMessageEnvelope>>();
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
deleted file mode 100644
index b5e1028..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.Windows.Window;
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Operators;
-import org.apache.samza.operators.api.internal.Operators.Operator;
-import org.apache.samza.operators.api.internal.WindowOutput;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines either the input or output streams to/from the operators. Users use the API methods defined here to
- * directly program the stream processing stages that processes a stream and generate another one.
- *
- * @param <M> Type of message in this stream
- */
-public class MessageStream<M extends Message> {
-
- private final Set<Operator> subscribers = new HashSet<>();
-
- /**
- * Helper method to get the corresponding list of subscribers to a specific {@link MessageStream}.
- *
- * NOTE: This should only be used by implementation of {@link org.apache.samza.operators.impl.ChainedOperators}, not directly by programmers.
- *
- * @return A unmodifiable set containing all {@link Operator}s that subscribe to this {@link MessageStream} object
- */
- public Collection<Operator> getSubscribers() {
- return Collections.unmodifiableSet(this.subscribers);
- }
-
- /**
- * Public API methods start here
- */
-
- /**
- * Defines a function API that takes three input parameters w/ types {@code A}, {@code B}, and {@code C} and w/o a return value
- *
- * @param <A> the type of input {@code a}
- * @param <B> the type of input {@code b}
- * @param <C> the type of input {@code c}
- */
- @FunctionalInterface
- public interface VoidFunction3 <A, B, C> {
- public void apply(A a, B b, C c);
- }
-
- /**
- * Method to apply a map function (1:1) on a {@link MessageStream}
- *
- * @param mapper the mapper function to map one input {@link Message} to one output {@link Message}
- * @param <OM> the type of the output {@link Message} in the output {@link MessageStream}
- * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
- */
- public <OM extends Message> MessageStream<OM> map(Function<M, OM> mapper) {
- Operator<OM> op = Operators.<M, OM>getStreamOperator(m -> new ArrayList<OM>() {{
- OM r = mapper.apply(m);
- if (r != null) {
- this.add(r);
- }
- }});
- this.subscribers.add(op);
- return op.getOutputStream();
- }
-
- /**
- * Method to apply a flatMap function (1:n) on a {@link MessageStream}
- *
- * @param flatMapper the flat mapper function to map one input {@link Message} to zero or more output {@link Message}s
- * @param <OM> the type of the output {@link Message} in the output {@link MessageStream}
- * @return the output {@link MessageStream} by applying the map function on the input {@link MessageStream}
- */
- public <OM extends Message> MessageStream<OM> flatMap(Function<M, Collection<OM>> flatMapper) {
- Operator<OM> op = Operators.getStreamOperator(flatMapper);
- this.subscribers.add(op);
- return op.getOutputStream();
- }
-
- /**
- * Method to apply a filter function on a {@link MessageStream}
- *
- * @param filter the filter function to filter input {@link Message}s from the input {@link MessageStream}
- * @return the output {@link MessageStream} after applying the filter function on the input {@link MessageStream}
- */
- public MessageStream<M> filter(Function<M, Boolean> filter) {
- Operator<M> op = Operators.<M, M>getStreamOperator(t -> new ArrayList<M>() {{
- if (filter.apply(t)) {
- this.add(t);
- }
- }});
- this.subscribers.add(op);
- return op.getOutputStream();
- }
-
- /**
- * Method to send an input {@link MessageStream} to an output {@link SystemStream}, and allows the output {@link MessageStream}
- * to be consumed by downstream stream operators again.
- *
- * @param sink the user-defined sink function to send the input {@link Message}s to the external output systems
- */
- public void sink(VoidFunction3<M, MessageCollector, TaskCoordinator> sink) {
- this.subscribers.add(Operators.getSinkOperator(sink));
- }
-
- /**
- * Method to perform a window function (i.e. a group-by, aggregate function) on a {@link MessageStream}
- *
- * @param window the window function to group and aggregate the input {@link Message}s from the input {@link MessageStream}
- * @param <WK> the type of key in the output {@link Message} from the {@link Window} function
- * @param <WV> the type of output value from
- * @param <WS> the type of window state kept in the {@link Window} function
- * @param <WM> the type of {@link WindowOutput} message from the {@link Window} function
- * @return the output {@link MessageStream} after applying the window function on the input {@link MessageStream}
- */
- public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(Window<M, WK, WV, WM> window) {
- Operator<WM> wndOp = Operators.getWindowOperator(Windows.getInternalWindowFn(window));
- this.subscribers.add(wndOp);
- return wndOp.getOutputStream();
- }
-
- /**
- * Method to add an input {@link MessageStream} to a join function. Note that we currently only support 2-way joins.
- *
- * @param other the other stream to be joined w/
- * @param merger the common function to merge messages from this {@link MessageStream} and {@code other}
- * @param <K> the type of join key
- * @param <JM> the type of message in the {@link Message} from the other join stream
- * @param <RM> the type of message in the {@link Message} from the join function
- * @return the output {@link MessageStream} from the join function {@code joiner}
- */
- public <K, JM extends Message<K, ?>, RM extends Message> MessageStream<RM> join(MessageStream<JM> other,
- BiFunction<M, JM, RM> merger) {
- MessageStream<RM> outputStream = new MessageStream<>();
-
- BiFunction<M, JM, RM> parJoin1 = merger::apply;
- BiFunction<JM, M, RM> parJoin2 = (m, t1) -> merger.apply(t1, m);
-
- // TODO: need to add default store functions for the two partial join functions
-
- other.subscribers.add(Operators.<JM, K, M, RM>getPartialJoinOperator(parJoin2, outputStream));
- this.subscribers.add(Operators.<M, K, JM, RM>getPartialJoinOperator(parJoin1, outputStream));
- return outputStream;
- }
-
- /**
- * Method to merge all {@code others} streams w/ this {@link MessageStream}. The merging streams must have the same type {@code M}
- *
- * @param others other streams to be merged w/ this one
- * @return the merged output stream
- */
- public MessageStream<M> merge(Collection<MessageStream<M>> others) {
- MessageStream<M> outputStream = new MessageStream<>();
-
- others.add(this);
- others.forEach(other -> other.subscribers.add(Operators.getMergeOperator(outputStream)));
- return outputStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java b/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
deleted file mode 100644
index 59dd91c..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.IncomingSystemMessage;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * This class defines all methods to create a {@link MessageStream} object. Users can use this to create an {@link MessageStream}
- * from a specific input source.
- *
- */
-
-public final class MessageStreams {
-
- /**
- * private constructor to prevent instantiation
- */
- private MessageStreams() {}
-
- /**
- * private class for system input/output {@link MessageStream}
- */
- public static final class SystemMessageStream extends MessageStream<IncomingSystemMessage> {
- /**
- * The corresponding {@link org.apache.samza.system.SystemStream}
- */
- private final SystemStreamPartition ssp;
-
- /**
- * Constructor for input system stream
- *
- * @param ssp the input {@link SystemStreamPartition} for the input {@link SystemMessageStream}
- */
- private SystemMessageStream(SystemStreamPartition ssp) {
- this.ssp = ssp;
- }
-
- /**
- * Getter for the {@link SystemStreamPartition} of the input
- *
- * @return the input {@link SystemStreamPartition}
- */
- public SystemStreamPartition getSystemStreamPartition() {
- return this.ssp;
- }
- }
-
- /**
- * Public static API methods start here
- */
-
- /**
- * Static API method to create a {@link MessageStream} from a system input stream
- *
- * @param ssp the input {@link SystemStreamPartition}
- * @return the {@link MessageStream} object takes {@code ssp} as the input
- */
- public static SystemMessageStream input(SystemStreamPartition ssp) {
- return new SystemMessageStream(ssp);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java b/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
deleted file mode 100644
index fc3ea37..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/TriggerBuilder.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-import org.apache.samza.operators.api.data.Message;
-import org.apache.samza.operators.api.internal.Trigger;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-
-/**
- * This class defines a builder of {@link Trigger} object for a {@link Windows.Window}. The triggers are categorized into
- * three types:
- *
- * <p>
- * early trigger: defines the condition when the first output from the window function is sent.
- * late trigger: defines the condition when the updated output after the first output is sent.
- * timer trigger: defines a system timeout condition to trigger output if no more inputs are received to enable early/late triggers
- * </p>
- *
- * If multiple conditions are defined for a specific type of trigger, the aggregated trigger is the disjunction of the each individual trigger (i.e. OR).
- *
- * NOTE: Programmers should not use classes defined in {@link org.apache.samza.operators.api.internal} to create triggers
- *
- *
- * @param <M> the type of input {@link Message} to the {@link Windows.Window}
- * @param <V> the type of output value from the {@link Windows.Window}
- */
-public final class TriggerBuilder<M extends Message, V> {
-
- /**
- * Predicate helper to OR multiple trigger conditions
- */
- static class PredicateHelper {
- static <M, S> BiFunction<M, S, Boolean> or(BiFunction<M, S, Boolean> lhs, BiFunction<M, S, Boolean> rhs) {
- return (m, s) -> lhs.apply(m, s) || rhs.apply(m, s);
- }
-
- static <S> Function<S, Boolean> or(Function<S, Boolean> lhs, Function<S, Boolean> rhs) {
- return s -> lhs.apply(s) || rhs.apply(s);
- }
- }
-
- /**
- * The early trigger condition that determines the first output from the {@link Windows.Window}
- */
- private BiFunction<M, WindowState<V>, Boolean> earlyTrigger = null;
-
- /**
- * The late trigger condition that determines the late output(s) from the {@link Windows.Window}
- */
- private BiFunction<M, WindowState<V>, Boolean> lateTrigger = null;
-
- /**
- * The system timer based trigger conditions that guarantees the {@link Windows.Window} proceeds forward
- */
- private Function<WindowState<V>, Boolean> timerTrigger = null;
-
- /**
- * The state updater function to be applied after the first output is triggered
- */
- private Function<WindowState<V>, WindowState<V>> earlyTriggerUpdater = Function.identity();
-
- /**
- * The state updater function to be applied after the late output is triggered
- */
- private Function<WindowState<V>, WindowState<V>> lateTriggerUpdater = Function.identity();
-
- /**
- * Helper method to add a trigger condition
- *
- * @param currentTrigger current trigger condition
- * @param newTrigger new trigger condition
- * @return combined trigger condition that is {@code currentTrigger} OR {@code newTrigger}
- */
- private BiFunction<M, WindowState<V>, Boolean> addTrigger(BiFunction<M, WindowState<V>, Boolean> currentTrigger,
- BiFunction<M, WindowState<V>, Boolean> newTrigger) {
- if (currentTrigger == null) {
- return newTrigger;
- }
-
- return PredicateHelper.or(currentTrigger, newTrigger);
- }
-
- /**
- * Helper method to add a system timer trigger
- *
- * @param currentTimer current timer condition
- * @param newTimer new timer condition
- * @return combined timer condition that is {@code currentTimer} OR {@code newTimer}
- */
- private Function<WindowState<V>, Boolean> addTimerTrigger(Function<WindowState<V>, Boolean> currentTimer,
- Function<WindowState<V>, Boolean> newTimer) {
- if (currentTimer == null) {
- return newTimer;
- }
-
- return PredicateHelper.or(currentTimer, newTimer);
- }
-
- /**
- * default constructor to prevent instantiation
- */
- private TriggerBuilder() {}
-
- /**
- * Constructor that set the size limit as the early trigger for a window
- *
- * @param sizeLimit the number of messages in a window that would trigger the first output
- */
- private TriggerBuilder(long sizeLimit) {
- this.earlyTrigger = (m, s) -> s.getNumberMessages() > sizeLimit;
- }
-
- /**
- * Constructor that set the event time length as the early trigger
- *
- * @param eventTimeFunction the function that calculate the event time in nano-second from the input {@link Message}
- * @param wndLenMs the window length in event time in milli-second
- */
- private TriggerBuilder(Function<M, Long> eventTimeFunction, long wndLenMs) {
- this.earlyTrigger = (m, s) ->
- TimeUnit.NANOSECONDS.toMillis(Math.max(s.getLatestEventTimeNs() - s.getEarliestEventTimeNs(),
- eventTimeFunction.apply(m) - s.getEarliestEventTimeNs())) > wndLenMs;
- }
-
- /**
- * Constructor that set the special token message as the early trigger
- *
- * @param tokenFunc the function that checks whether an input {@link Message} is a token message that triggers window output
- */
- private TriggerBuilder(Function<M, Boolean> tokenFunc) {
- this.earlyTrigger = (m, s) -> tokenFunc.apply(m);
- }
-
- /**
- * Build method that creates an {@link Trigger} object based on the trigger conditions set in {@link TriggerBuilder}
- * This is kept package private and only used by {@link Windows} to convert the mutable {@link TriggerBuilder} object to an immutable {@link Trigger} object
- *
- * @return the final {@link Trigger} object
- */
- Trigger<M, WindowState<V>> build() {
- return Trigger.createTrigger(this.timerTrigger, this.earlyTrigger, this.lateTrigger, this.earlyTriggerUpdater, this.lateTriggerUpdater);
- }
-
- /**
- * Public API methods start here
- */
-
-
- /**
- * API method to allow users to set an update method to update the output value after the first window output is triggered
- * by the early trigger condition
- *
- * @param onTriggerFunc the method to update the output value after the early trigger
- * @return the {@link TriggerBuilder} object
- */
- public TriggerBuilder<M, V> onEarlyTrigger(Function<V, V> onTriggerFunc) {
- this.earlyTriggerUpdater = s -> { s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); return s; };
- return this;
- }
-
- /**
- * API method to allow users to set an update method to update the output value after a late window output is triggered
- * by the late trigger condition
- *
- * @param onTriggerFunc the method to update the output value after the late trigger
- * @return the {@link TriggerBuilder} object
- */
- public TriggerBuilder<M, V> onLateTrigger(Function<V, V> onTriggerFunc) {
- this.lateTriggerUpdater = s -> { s.setOutputValue(onTriggerFunc.apply(s.getOutputValue())); return s; };
- return this;
- }
-
- /**
- * API method to allow users to add a system timer trigger based on timeout after the last message received in the window
- *
- * @param timeoutMs the timeout in ms after the last message received in the window
- * @return the {@link TriggerBuilder} object
- */
- public TriggerBuilder<M, V> addTimeoutSinceLastMessage(long timeoutMs) {
- this.timerTrigger = this.addTimerTrigger(this.timerTrigger,
- s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
- return this;
- }
-
- /**
- * API method to allow users to add a system timer trigger based on the timeout after the first message received in the window
- *
- * @param timeoutMs the timeout in ms after the first message received in the window
- * @return the {@link TriggerBuilder} object
- */
- public TriggerBuilder<M, V> addTimeoutSinceFirstMessage(long timeoutMs) {
- this.timerTrigger = this.addTimerTrigger(this.timerTrigger, s ->
- TimeUnit.NANOSECONDS.toMillis(s.getFirstMessageTimeNs()) + timeoutMs < System.currentTimeMillis());
- return this;
- }
-
- /**
- * API method allow users to add a late trigger based on the window size limit
- *
- * @param sizeLimit limit on the number of messages in window
- * @return the {@link TriggerBuilder} object
- */
- public TriggerBuilder<M, V> addLateTriggerOnSizeLimit(long sizeLimit) {
- this.lateTrigger = this.addTrigger(this.lateTrigger, (m, s) -> s.getNumberMessages() > sizeLimit);
- return this;
- }
-
- /**
- * API method to allow users to define a customized late trigger function based on input message and the window state
- *
- * @param lateTrigger the late trigger condition based on input {@link Message} and the current {@link WindowState}
- * @return the {@link TriggerBuilder} object
- */
- public TriggerBuilder<M, V> addLateTrigger(BiFunction<M, WindowState<V>, Boolean> lateTrigger) {
- this.lateTrigger = this.addTrigger(this.lateTrigger, lateTrigger);
- return this;
- }
-
- /**
- * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on window size limit
- *
- * @param sizeLimit window size limit
- * @param <M> the type of input {@link Message}
- * @param <V> the type of {@link Windows.Window} output value
- * @return the {@link TriggerBuilder} object
- */
- public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerWhenExceedWndLen(long sizeLimit) {
- return new TriggerBuilder<M, V>(sizeLimit);
- }
-
- /**
- * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on event time window
- *
- *
- * @param eventTimeFunc the function to get the event time from the input message
- * @param eventTimeWndSizeMs the event time window size in Ms
- * @param <M> the type of input {@link Message}
- * @param <V> the type of {@link Windows.Window} output value
- * @return the {@link TriggerBuilder} object
- */
- public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnEventTime(Function<M, Long> eventTimeFunc, long eventTimeWndSizeMs) {
- return new TriggerBuilder<M, V>(eventTimeFunc, eventTimeWndSizeMs);
- }
-
- /**
- * Static API method to create a {@link TriggerBuilder} w/ early trigger condition based on token messages
- *
- * @param tokenFunc the function to determine whether an input message is a window token or not
- * @param <M> the type of input {@link Message}
- * @param <V> the type of {@link Windows.Window} output value
- * @return the {@link TriggerBuilder} object
- */
- public static <M extends Message, V> TriggerBuilder<M, V> earlyTriggerOnTokenMsg(Function<M, Boolean> tokenFunc) {
- return new TriggerBuilder<M, V>(tokenFunc);
- }
-
- /**
- * Static API method to allow customized early trigger condition based on input {@link Message} and the corresponding {@link WindowState}
- *
- * @param earlyTrigger the user defined early trigger condition
- * @param <M> the input message type
- * @param <V> the output value from the window
- * @return the {@link TriggerBuilder} object
- */
- public static <M extends Message, V> TriggerBuilder<M, V> earlyTrigger(BiFunction<M, WindowState<V>, Boolean> earlyTrigger) {
- TriggerBuilder<M, V> newTriggers = new TriggerBuilder<M, V>();
- newTriggers.earlyTrigger = newTriggers.addTrigger(newTriggers.earlyTrigger, earlyTrigger);
- return newTriggers;
- }
-
- /**
- * Static API method to create a {@link TriggerBuilder} w/ system timeout after the last message received in the window
- *
- * @param timeoutMs timeout in ms after the last message received
- * @param <M> the type of input {@link Message}
- * @param <V> the type of {@link Windows.Window} output value
- * @return the {@link TriggerBuilder} object
- */
- public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceLastMessage(long timeoutMs) {
- return new TriggerBuilder<M, V>().addTimeoutSinceLastMessage(timeoutMs);
- }
-
- /**
- * Static API method to create a {@link TriggerBuilder} w/ system timeout after the first message received in the window
- *
- * @param timeoutMs timeout in ms after the first message received
- * @param <M> the type of input {@link Message}
- * @param <V> the type of {@link Windows.Window} output value
- * @return the {@link TriggerBuilder} object
- */
- public static <M extends Message, V> TriggerBuilder<M, V> timeoutSinceFirstMessage(long timeoutMs) {
- return new TriggerBuilder<M, V>().addTimeoutSinceFirstMessage(timeoutMs);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1dac25e1/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java b/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
deleted file mode 100644
index 402cc42..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.api;
-
-/**
- * This interface defines the methods a window state class has to implement. The programmers are allowed to implement
- * customized window state to be stored in window state stores by implementing this interface class.
- *
- * @param <WV> the type for window output value
- */
-public interface WindowState<WV> {
- /**
- * Method to get the system time when the first message in the window is received
- *
- * @return nano-second of system time for the first message received in the window
- */
- long getFirstMessageTimeNs();
-
- /**
- * Method to get the system time when the last message in the window is received
- *
- * @return nano-second of system time for the last message received in the window
- */
- long getLastMessageTimeNs();
-
- /**
- * Method to get the earliest event time in the window
- *
- * @return the earliest event time in nano-second in the window
- */
- long getEarliestEventTimeNs();
-
- /**
- * Method to get the latest event time in the window
- *
- * @return the latest event time in nano-second in the window
- */
- long getLatestEventTimeNs();
-
- /**
- * Method to get the total number of messages received in the window
- *
- * @return number of messages in the window
- */
- long getNumberMessages();
-
- /**
- * Method to get the corresponding window's output value
- *
- * @return the corresponding window's output value
- */
- WV getOutputValue();
-
- /**
- * Method to set the corresponding window's output value
- *
- * @param value the corresponding window's output value
- */
- void setOutputValue(WV value);
-
-}