You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/01 22:51:04 UTC
[4/6] samza git commit: SAMZA-1054: Refactor Operator APIs
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
deleted file mode 100644
index 14e6562..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.data.Message;
-
-
-public class TestOutputMessage implements Message<String, Integer> {
- private final String key;
- private final Integer value;
- private final long timestamp;
-
- public TestOutputMessage(String key, Integer value, long timestamp) {
- this.key = key;
- this.value = value;
- this.timestamp = timestamp;
- }
-
- @Override public Integer getMessage() {
- return this.value;
- }
-
- @Override public String getKey() {
- return this.key;
- }
-
- @Override public long getTimestamp() {
- return this.timestamp;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
new file mode 100644
index 0000000..284b30b
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/TestOutputMessageEnvelope.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+
+
+public class TestOutputMessageEnvelope implements MessageEnvelope<String, Integer> {
+ private final String key;
+ private final Integer value;
+
+ public TestOutputMessageEnvelope(String key, Integer value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public Integer getMessage() {
+ return this.value;
+ }
+
+ @Override
+ public String getKey() {
+ return this.key;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
deleted file mode 100644
index 927b14b..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestTriggerBuilder.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestTriggerBuilder {
- private Field earlyTriggerField;
- private Field lateTriggerField;
- private Field timerTriggerField;
- private Field earlyTriggerUpdater;
- private Field lateTriggerUpdater;
-
- @Before
- public void testPrep() throws Exception {
- this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger");
- this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger");
- this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger");
- this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
- this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
-
- this.earlyTriggerField.setAccessible(true);
- this.lateTriggerField.setAccessible(true);
- this.timerTriggerField.setAccessible(true);
- this.earlyTriggerUpdater.setAccessible(true);
- this.lateTriggerUpdater.setAccessible(true);
- }
-
- @Test public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException {
- TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
- BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
- (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
- WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
- when(mockState.getNumberMessages()).thenReturn(200L);
- assertFalse(triggerField.apply(null, mockState));
- when(mockState.getNumberMessages()).thenReturn(2000L);
- assertTrue(triggerField.apply(null, mockState));
-
- Function<TestMessage, Boolean> tokenFunc = m -> true;
- builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
- triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
- TestMessage m = mock(TestMessage.class);
- assertTrue(triggerField.apply(m, mockState));
-
- builder = TriggerBuilder.earlyTriggerOnEventTime(TestMessage::getTimestamp, 30000L);
- triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
- when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
- when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
- when(m.getTimestamp()).thenReturn(19999000000L);
- assertFalse(triggerField.apply(m, mockState));
- when(m.getTimestamp()).thenReturn(32000000000L);
- assertTrue(triggerField.apply(m, mockState));
- when(m.getTimestamp()).thenReturn(1001000000L);
- when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
- assertTrue(triggerField.apply(m, mockState));
-
- BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> mockFunc = mock(BiFunction.class);
- builder = TriggerBuilder.earlyTrigger(mockFunc);
- triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
- assertEquals(triggerField, mockFunc);
-
- builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
- Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
- (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
- when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
- assertTrue(timerTrigger.apply(mockState));
- // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
- when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
- assertFalse(timerTrigger.apply(mockState));
-
- builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
- timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
- when(mockState.getLastMessageTimeNs()).thenReturn(0L);
- assertTrue(timerTrigger.apply(mockState));
- // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
- when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000));
- assertFalse(timerTrigger.apply(mockState));
- }
-
- @Test public void testAddTimerTriggers() throws IllegalAccessException {
- TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
- builder.addTimeoutSinceFirstMessage(10000L);
- // exam that both earlyTrigger and timer triggers are set up
- BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> triggerField =
- (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
- WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
- when(mockState.getNumberMessages()).thenReturn(200L);
- assertFalse(triggerField.apply(null, mockState));
- // check the timer trigger
- Function<WindowState<Collection<TestMessage>>, Boolean> timerTrigger =
- (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
- when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
- assertTrue(timerTrigger.apply(mockState));
- // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
- when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
- assertFalse(timerTrigger.apply(mockState));
-
- // exam that both early trigger and timer triggers are set up
- builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
- triggerField = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
- mockState = mock(WindowState.class);
- when(mockState.getNumberMessages()).thenReturn(200L);
- assertFalse(triggerField.apply(null, mockState));
- builder.addTimeoutSinceLastMessage(20000L);
- // check the timer trigger
- timerTrigger = (Function<WindowState<Collection<TestMessage>>, Boolean>) this.timerTriggerField.get(builder);
- when(mockState.getLastMessageTimeNs()).thenReturn(0L);
- assertTrue(timerTrigger.apply(mockState));
- // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
- when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
- assertFalse(timerTrigger.apply(mockState));
- }
-
- @Test public void testAddLateTriggers() throws IllegalAccessException {
- TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
- builder.addLateTriggerOnSizeLimit(10000L);
- // exam that both earlyTrigger and lateTriggers are set up
- BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> earlyTrigger =
- (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
- WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
- when(mockState.getNumberMessages()).thenReturn(200L);
- assertFalse(earlyTrigger.apply(null, mockState));
- // check the late trigger
- BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean> lateTrigger =
- (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
- assertFalse(lateTrigger.apply(null, mockState));
- // set the number of messages to 10001 to trigger the late trigger
- when(mockState.getNumberMessages()).thenReturn(10001L);
- assertTrue(lateTrigger.apply(null, mockState));
-
- builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
- builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
- // exam that both earlyTrigger and lateTriggers are set up
- earlyTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.earlyTriggerField.get(builder);
- mockState = mock(WindowState.class);
- when(mockState.getNumberMessages()).thenReturn(200L);
- assertFalse(earlyTrigger.apply(null, mockState));
- // exam the lateTrigger
- when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
- lateTrigger = (BiFunction<TestMessage, WindowState<Collection<TestMessage>>, Boolean>) this.lateTriggerField.get(builder);
- assertFalse(lateTrigger.apply(null, mockState));
- List<TestMessage> mockList = mock(ArrayList.class);
- when(mockList.size()).thenReturn(200);
- when(mockState.getOutputValue()).thenReturn(mockList);
- assertTrue(lateTrigger.apply(null, mockState));
- }
-
- @Test public void testAddTriggerUpdater() throws IllegalAccessException {
- TriggerBuilder<TestMessage, Collection<TestMessage>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
- builder.onEarlyTrigger(c -> {
- c.clear();
- return c;
- });
- List<TestMessage> collection = new ArrayList<TestMessage>() { {
- for (int i = 0; i < 10; i++) {
- this.add(new TestMessage(String.format("key-%d", i), "string-value", System.nanoTime()));
- }
- } };
- // exam that earlyTriggerUpdater is set up
- Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> earlyTriggerUpdater =
- (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.earlyTriggerUpdater.get(builder);
- WindowState<Collection<TestMessage>> mockState = mock(WindowState.class);
- when(mockState.getOutputValue()).thenReturn(collection);
- earlyTriggerUpdater.apply(mockState);
- assertTrue(collection.isEmpty());
-
- collection.add(new TestMessage("key-to-stay", "string-to-stay", System.nanoTime()));
- collection.add(new TestMessage("key-to-remove", "string-to-remove", System.nanoTime()));
- builder.onLateTrigger(c -> {
- c.removeIf(t -> t.getKey().equals("key-to-remove"));
- return c;
- });
- // check the late trigger updater
- Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>> lateTriggerUpdater =
- (Function<WindowState<Collection<TestMessage>>, WindowState<Collection<TestMessage>>>) this.lateTriggerUpdater.get(builder);
- when(mockState.getOutputValue()).thenReturn(collection);
- lateTriggerUpdater.apply(mockState);
- assertTrue(collection.size() == 1);
- assertFalse(collection.get(0).isDelete());
- assertEquals(collection.get(0).getKey(), "key-to-stay");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
deleted file mode 100644
index 8a25a96..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/TestWindows.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import org.apache.samza.operators.Windows.Window;
-import org.apache.samza.operators.internal.Trigger;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestWindows {
-
- @Test public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException {
- // test constructing the default session window
- Window<TestMessage, String, Collection<TestMessage>, WindowOutput<String, Collection<TestMessage>>> testWnd = Windows.intoSessions(
- TestMessage::getKey);
- assertTrue(testWnd instanceof Windows.SessionWindow);
- Field wndKeyFuncField = Windows.SessionWindow.class.getDeclaredField("wndKeyFunction");
- Field aggregatorField = Windows.SessionWindow.class.getDeclaredField("aggregator");
- wndKeyFuncField.setAccessible(true);
- aggregatorField.setAccessible(true);
- Function<TestMessage, String> wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd);
- assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "test-key");
- BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>> aggrFunc =
- (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd);
- TestMessage mockMsg = mock(TestMessage.class);
- Collection<TestMessage> collection = aggrFunc.apply(mockMsg, new ArrayList<>());
- assertTrue(collection.size() == 1);
- assertTrue(collection.contains(mockMsg));
-
- // test constructing the session window w/ customized session info
- Window<TestMessage, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
- m -> String.format("key-%d", m.getTimestamp()), m -> m.getMessage().charAt(0));
- assertTrue(testWnd2 instanceof Windows.SessionWindow);
- wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testWnd2);
- aggrFunc = (BiFunction<TestMessage, Collection<TestMessage>, Collection<TestMessage>>) aggregatorField.get(testWnd2);
- assertEquals(wndKeyFunc.apply(new TestMessage("test-key", "test-value", 0)), "key-0");
- when(mockMsg.getMessage()).thenReturn("x-001");
- collection = aggrFunc.apply(mockMsg, new ArrayList<>());
- assertTrue(collection.size() == 1);
- assertTrue(collection.contains('x'));
-
- // test constructing session window w/ a default counter
- Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
- m -> String.format("key-%d", m.getTimestamp()));
- assertTrue(testCounter instanceof Windows.SessionWindow);
- wndKeyFunc = (Function<TestMessage, String>) wndKeyFuncField.get(testCounter);
- BiFunction<TestMessage, Integer, Integer> counterFn = (BiFunction<TestMessage, Integer, Integer>) aggregatorField.get(testCounter);
- when(mockMsg.getTimestamp()).thenReturn(12345L);
- assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
- assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
- }
-
- @Test public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException {
- Window<TestMessage, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
- m -> String.format("key-%d", m.getTimestamp()));
- // test session window w/ a trigger
- TriggerBuilder<TestMessage, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
- testCounter.setTriggers(triggerBuilder);
- Trigger<TestMessage, WindowState<Integer>> expectedTrigger = triggerBuilder.build();
- Trigger<TestMessage, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger();
- // examine all trigger fields are expected
- Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
- Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
- Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
- Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater");
- Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater");
- earlyTriggerField.setAccessible(true);
- lateTriggerField.setAccessible(true);
- timerTriggerField.setAccessible(true);
- earlyTriggerUpdater.setAccessible(true);
- lateTriggerUpdater.setAccessible(true);
- assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger));
- assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger));
- assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger));
- assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger));
- assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
index b734e87..7bd62a7 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestIncomingSystemMessage.java
@@ -30,9 +30,10 @@ import static org.mockito.Mockito.when;
public class TestIncomingSystemMessage {
- @Test public void testConstructor() {
+ @Test
+ public void testConstructor() {
IncomingMessageEnvelope ime = mock(IncomingMessageEnvelope.class);
- IncomingSystemMessage ism = new IncomingSystemMessage(ime);
+ IncomingSystemMessageEnvelope ism = new IncomingSystemMessageEnvelope(ime);
Object mockKey = mock(Object.class);
Object mockValue = mock(Object.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
index 943c47f..7838896 100644
--- a/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
+++ b/samza-api/src/test/java/org/apache/samza/operators/data/TestLongOffset.java
@@ -28,7 +28,8 @@ import static org.mockito.Mockito.mock;
public class TestLongOffset {
- @Test public void testConstructor() throws Exception {
+ @Test
+ public void testConstructor() throws Exception {
LongOffset o1 = new LongOffset("12345");
Field offsetField = LongOffset.class.getDeclaredField("offset");
offsetField.setAccessible(true);
@@ -47,7 +48,8 @@ public class TestLongOffset {
}
}
- @Test public void testComparator() {
+ @Test
+ public void testComparator() {
LongOffset o1 = new LongOffset("11111");
Offset other = mock(Offset.class);
try {
@@ -65,7 +67,8 @@ public class TestLongOffset {
assertEquals(o1.compareTo(o4), 0);
}
- @Test public void testEquals() {
+ @Test
+ public void testEquals() {
LongOffset o1 = new LongOffset("12345");
Offset other = mock(Offset.class);
assertFalse(o1.equals(other));
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
deleted file mode 100644
index d994486..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestOperators.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperators {
-
- private class TestMessage implements Message<String, Object> {
- private final long timestamp;
- private final String key;
- private final Object msg;
-
-
- TestMessage(String key, Object msg, long timestamp) {
- this.timestamp = timestamp;
- this.key = key;
- this.msg = msg;
- }
-
- @Override public Object getMessage() {
- return this.msg;
- }
-
- @Override public String getKey() {
- return this.key;
- }
-
- @Override public long getTimestamp() {
- return this.timestamp;
- }
- }
-
- @Test public void testGetStreamOperator() {
- Function<Message, Collection<TestMessage>> transformFn = m -> new ArrayList<TestMessage>() { {
- this.add(new TestMessage(m.getKey().toString(), m.getMessage(), 12345L));
- } };
- Operators.StreamOperator<Message, TestMessage> strmOp = Operators.getStreamOperator(transformFn);
- assertEquals(strmOp.getFunction(), transformFn);
- assertTrue(strmOp.getOutputStream() instanceof MessageStream);
- }
-
- @Test public void testGetSinkOperator() {
- MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, c, t) -> { };
- Operators.SinkOperator<TestMessage> sinkOp = Operators.getSinkOperator(sinkFn);
- assertEquals(sinkOp.getFunction(), sinkFn);
- assertTrue(sinkOp.getOutputStream() == null);
- }
-
- @Test public void testGetWindowOperator() {
- WindowFn<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class);
- BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null;
- Operators.StoreFunctions<TestMessage, String, WindowState<Integer>> storeFns = mock(Operators.StoreFunctions.class);
- Trigger<TestMessage, WindowState<Integer>> trigger = mock(Trigger.class);
- MessageStream<TestMessage> mockInput = mock(MessageStream.class);
- when(windowFn.getTransformFunc()).thenReturn(xFunction);
- when(windowFn.getStoreFuncs()).thenReturn(storeFns);
- when(windowFn.getTrigger()).thenReturn(trigger);
- when(mockInput.toString()).thenReturn("mockStream1");
-
- Operators.WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = Operators.getWindowOperator(windowFn);
- assertEquals(windowOp.getFunction(), xFunction);
- assertEquals(windowOp.getStoreFunctions(), storeFns);
- assertEquals(windowOp.getTrigger(), trigger);
- assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString()));
- }
-
- @Test public void testGetPartialJoinOperator() {
- BiFunction<Message<Object, ?>, Message<Object, ?>, TestMessage> merger =
- (m1, m2) -> new TestMessage(m1.getKey().toString(), m2.getMessage(),
- Math.max(m1.getTimestamp(), m2.getTimestamp()));
- MessageStream<TestMessage> joinOutput = new MessageStream<>();
- Operators.PartialJoinOperator<Message<Object, ?>, Object, Message<Object, ?>, TestMessage> partialJoin =
- Operators.getPartialJoinOperator(merger, joinOutput);
-
- assertEquals(partialJoin.getOutputStream(), joinOutput);
- Message<Object, Object> m = mock(Message.class);
- Message<Object, Object> s = mock(Message.class);
- assertEquals(partialJoin.getFunction(), merger);
- assertEquals(partialJoin.getSelfStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
- assertEquals(partialJoin.getSelfStoreFunctions().getStateUpdater().apply(m, s), m);
- assertEquals(partialJoin.getJoinStoreFunctions().getStoreKeyFinder().apply(m), m.getKey());
- assertNull(partialJoin.getJoinStoreFunctions().getStateUpdater());
- }
-
- @Test public void testGetMergeOperator() {
- MessageStream<TestMessage> output = new MessageStream<>();
- Operators.StreamOperator<TestMessage, TestMessage> mergeOp = Operators.getMergeOperator(output);
- Function<TestMessage, Collection<TestMessage>> mergeFn = t -> new ArrayList<TestMessage>() { {
- this.add(t);
- } };
- TestMessage t = mock(TestMessage.class);
- assertEquals(mergeOp.getFunction().apply(t), mergeFn.apply(t));
- assertEquals(mergeOp.getOutputStream(), output);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
deleted file mode 100644
index 0f35a7c..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestTrigger.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class TestTrigger {
-
- @Test public void testConstructor() throws Exception {
- BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
- BiFunction<Message<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000;
- Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis();
- Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> {
- s.setOutputValue(0);
- return s;
- };
- Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> {
- s.setOutputValue(1);
- return s;
- };
-
- Trigger<Message<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
- earlyTriggerUpdater, lateTriggerUpdater);
-
- Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
- Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
- Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
- Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater");
- Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater");
- earlyTriggerField.setAccessible(true);
- lateTriggerField.setAccessible(true);
- timerTriggerField.setAccessible(true);
- earlyTriggerUpdaterField.setAccessible(true);
- lateTriggerUpdaterField.setAccessible(true);
-
- assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
- assertEquals(timerTrigger, timerTriggerField.get(trigger));
- assertEquals(lateTrigger, lateTriggerField.get(trigger));
- assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
- assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
deleted file mode 100644
index 268c9fc..0000000
--- a/samza-api/src/test/java/org/apache/samza/operators/internal/TestWindowOutput.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.internal;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-
-public class TestWindowOutput {
-
- @Test public void testConstructor() {
- WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
- assertEquals(wndOutput.getKey(), "testMsg");
- assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
- assertFalse(wndOutput.isDelete());
- assertEquals(wndOutput.getTimestamp(), 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
new file mode 100644
index 0000000..a91af24
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTrigger.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class TestTrigger {
+
+ @Test
+ public void testConstructor() throws Exception {
+ BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> earlyTrigger = (m, s) -> s.getOutputValue() > 1000;
+ BiFunction<MessageEnvelope<Object, Object>, WindowState<Integer>, Boolean> lateTrigger = (m, s) -> s.getOutputValue() > 1000;
+ Function<WindowState<Integer>, Boolean> timerTrigger = s -> TimeUnit.NANOSECONDS.toMillis(s.getLastMessageTimeNs()) + 50000 < System.currentTimeMillis();
+ Function<WindowState<Integer>, WindowState<Integer>> earlyTriggerUpdater = s -> {
+ s.setOutputValue(0);
+ return s;
+ };
+ Function<WindowState<Integer>, WindowState<Integer>> lateTriggerUpdater = s -> {
+ s.setOutputValue(1);
+ return s;
+ };
+
+ Trigger<MessageEnvelope<Object, Object>, WindowState<Integer>> trigger = Trigger.createTrigger(timerTrigger, earlyTrigger, lateTrigger,
+ earlyTriggerUpdater, lateTriggerUpdater);
+
+ Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+ Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+ Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+ Field earlyTriggerUpdaterField = Trigger.class.getDeclaredField("earlyTriggerUpdater");
+ Field lateTriggerUpdaterField = Trigger.class.getDeclaredField("lateTriggerUpdater");
+ earlyTriggerField.setAccessible(true);
+ lateTriggerField.setAccessible(true);
+ timerTriggerField.setAccessible(true);
+ earlyTriggerUpdaterField.setAccessible(true);
+ lateTriggerUpdaterField.setAccessible(true);
+
+ assertEquals(earlyTrigger, earlyTriggerField.get(trigger));
+ assertEquals(timerTrigger, timerTriggerField.get(trigger));
+ assertEquals(lateTrigger, lateTriggerField.get(trigger));
+ assertEquals(earlyTriggerUpdater, earlyTriggerUpdaterField.get(trigger));
+ assertEquals(lateTriggerUpdater, lateTriggerUpdaterField.get(trigger));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
new file mode 100644
index 0000000..6a9b55d
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestTriggerBuilder.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestTriggerBuilder {
+ private Field earlyTriggerField;
+ private Field lateTriggerField;
+ private Field timerTriggerField;
+ private Field earlyTriggerUpdater;
+ private Field lateTriggerUpdater;
+
+ @Before
+ public void testPrep() throws Exception {
+ this.earlyTriggerField = TriggerBuilder.class.getDeclaredField("earlyTrigger");
+ this.lateTriggerField = TriggerBuilder.class.getDeclaredField("lateTrigger");
+ this.timerTriggerField = TriggerBuilder.class.getDeclaredField("timerTrigger");
+ this.earlyTriggerUpdater = TriggerBuilder.class.getDeclaredField("earlyTriggerUpdater");
+ this.lateTriggerUpdater = TriggerBuilder.class.getDeclaredField("lateTriggerUpdater");
+
+ this.earlyTriggerField.setAccessible(true);
+ this.lateTriggerField.setAccessible(true);
+ this.timerTriggerField.setAccessible(true);
+ this.earlyTriggerUpdater.setAccessible(true);
+ this.lateTriggerUpdater.setAccessible(true);
+ }
+
+ @Test
+ public void testStaticCreators() throws NoSuchFieldException, IllegalAccessException {
+ TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField =
+ (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+ WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(triggerField.apply(null, mockState));
+ when(mockState.getNumberMessages()).thenReturn(2000L);
+ assertTrue(triggerField.apply(null, mockState));
+
+ Function<TestMessageEnvelope, Boolean> tokenFunc = m -> true;
+ builder = TriggerBuilder.earlyTriggerOnTokenMsg(tokenFunc);
+ triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+ TestMessageEnvelope m = mock(TestMessageEnvelope.class);
+ assertTrue(triggerField.apply(m, mockState));
+
+ builder = TriggerBuilder.earlyTriggerOnEventTime(tm -> tm.getMessage().getEventTime(), 30000L);
+ triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+ when(mockState.getEarliestEventTimeNs()).thenReturn(1000000000L);
+ when(mockState.getLatestEventTimeNs()).thenReturn(20000000000L);
+ TestMessageEnvelope.MessageType mockInnerMessage;
+ mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+ when(mockInnerMessage.getEventTime()).thenReturn(19999000000L);
+ when(m.getMessage()).thenReturn(mockInnerMessage);
+ assertFalse(triggerField.apply(m, mockState));
+ mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+ when(mockInnerMessage.getEventTime()).thenReturn(32000000000L);
+ when(m.getMessage()).thenReturn(mockInnerMessage);
+ assertTrue(triggerField.apply(m, mockState));
+ mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+ when(m.getMessage()).thenReturn(mockInnerMessage);
+ when(mockInnerMessage.getEventTime()).thenReturn(1001000000L);
+ when(mockState.getLatestEventTimeNs()).thenReturn(32000000000L);
+ assertTrue(triggerField.apply(m, mockState));
+
+ BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> mockFunc = mock(BiFunction.class);
+ builder = TriggerBuilder.earlyTrigger(mockFunc);
+ triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+ assertEquals(triggerField, mockFunc);
+
+ builder = TriggerBuilder.timeoutSinceFirstMessage(10000L);
+ Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> timerTrigger =
+ (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder);
+ when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+ assertTrue(timerTrigger.apply(mockState));
+ // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+ when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+ assertFalse(timerTrigger.apply(mockState));
+
+ builder = TriggerBuilder.timeoutSinceLastMessage(10000L);
+ timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder);
+ when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+ assertTrue(timerTrigger.apply(mockState));
+ // set the lastMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+ when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000));
+ assertFalse(timerTrigger.apply(mockState));
+ }
+
+ @Test
+ public void testAddTimerTriggers() throws IllegalAccessException {
+ TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ builder.addTimeoutSinceFirstMessage(10000L);
+ // exam that both earlyTrigger and timer triggers are set up
+ BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> triggerField =
+ (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+ WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(triggerField.apply(null, mockState));
+ // check the timer trigger
+ Function<WindowState<Collection<TestMessageEnvelope>>, Boolean> timerTrigger =
+ (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder);
+ when(mockState.getFirstMessageTimeNs()).thenReturn(0L);
+ assertTrue(timerTrigger.apply(mockState));
+ // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+ when(mockState.getFirstMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+ assertFalse(timerTrigger.apply(mockState));
+
+ // exam that both early trigger and timer triggers are set up
+ builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ triggerField = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+ mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(triggerField.apply(null, mockState));
+ builder.addTimeoutSinceLastMessage(20000L);
+ // check the timer trigger
+ timerTrigger = (Function<WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.timerTriggerField.get(builder);
+ when(mockState.getLastMessageTimeNs()).thenReturn(0L);
+ assertTrue(timerTrigger.apply(mockState));
+ // set the firstMessageTimeNs to 9 second earlier, giving the test 1 second to fire up the timerTrigger before assertion
+ when(mockState.getLastMessageTimeNs()).thenReturn(TimeUnit.MILLISECONDS.toNanos(System.currentTimeMillis() - 9000L));
+ assertFalse(timerTrigger.apply(mockState));
+ }
+
+ @Test
+ public void testAddLateTriggers() throws IllegalAccessException {
+ TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ builder.addLateTriggerOnSizeLimit(10000L);
+ // exam that both earlyTrigger and lateTriggers are set up
+ BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> earlyTrigger =
+ (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+ WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(earlyTrigger.apply(null, mockState));
+ // check the late trigger
+ BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean> lateTrigger =
+ (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.lateTriggerField.get(builder);
+ assertFalse(lateTrigger.apply(null, mockState));
+ // set the number of messages to 10001 to trigger the late trigger
+ when(mockState.getNumberMessages()).thenReturn(10001L);
+ assertTrue(lateTrigger.apply(null, mockState));
+
+ builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ builder.addLateTrigger((m, s) -> s.getOutputValue().size() > 0);
+ // exam that both earlyTrigger and lateTriggers are set up
+ earlyTrigger = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.earlyTriggerField.get(builder);
+ mockState = mock(WindowState.class);
+ when(mockState.getNumberMessages()).thenReturn(200L);
+ assertFalse(earlyTrigger.apply(null, mockState));
+ // exam the lateTrigger
+ when(mockState.getOutputValue()).thenReturn(new ArrayList<>());
+ lateTrigger = (BiFunction<TestMessageEnvelope, WindowState<Collection<TestMessageEnvelope>>, Boolean>) this.lateTriggerField.get(builder);
+ assertFalse(lateTrigger.apply(null, mockState));
+ List<TestMessageEnvelope> mockList = mock(ArrayList.class);
+ when(mockList.size()).thenReturn(200);
+ when(mockState.getOutputValue()).thenReturn(mockList);
+ assertTrue(lateTrigger.apply(null, mockState));
+ }
+
+ @Test
+ public void testAddTriggerUpdater() throws IllegalAccessException {
+ TriggerBuilder<TestMessageEnvelope, Collection<TestMessageEnvelope>> builder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000);
+ builder.onEarlyTrigger(c -> {
+ c.clear();
+ return c;
+ });
+ List<TestMessageEnvelope> collection = new ArrayList<TestMessageEnvelope>() { {
+ for (int i = 0; i < 10; i++) {
+ this.add(new TestMessageEnvelope(String.format("key-%d", i), "string-value", System.nanoTime()));
+ }
+ } };
+ // exam that earlyTriggerUpdater is set up
+ Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>> earlyTriggerUpdater =
+ (Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>>) this.earlyTriggerUpdater.get(builder);
+ WindowState<Collection<TestMessageEnvelope>> mockState = mock(WindowState.class);
+ when(mockState.getOutputValue()).thenReturn(collection);
+ earlyTriggerUpdater.apply(mockState);
+ assertTrue(collection.isEmpty());
+
+ collection.add(new TestMessageEnvelope("key-to-stay", "string-to-stay", System.nanoTime()));
+ collection.add(new TestMessageEnvelope("key-to-remove", "string-to-remove", System.nanoTime()));
+ builder.onLateTrigger(c -> {
+ c.removeIf(t -> t.getKey().equals("key-to-remove"));
+ return c;
+ });
+ // check the late trigger updater
+ Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>> lateTriggerUpdater =
+ (Function<WindowState<Collection<TestMessageEnvelope>>, WindowState<Collection<TestMessageEnvelope>>>) this.lateTriggerUpdater.get(builder);
+ when(mockState.getOutputValue()).thenReturn(collection);
+ lateTriggerUpdater.apply(mockState);
+ assertTrue(collection.size() == 1);
+ assertFalse(collection.get(0).isDelete());
+ assertEquals(collection.get(0).getKey(), "key-to-stay");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
new file mode 100644
index 0000000..7f81fc9
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowOutput.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+
+public class TestWindowOutput {
+
+ @Test
+ public void testConstructor() {
+ WindowOutput<String, Integer> wndOutput = WindowOutput.of("testMsg", 10);
+ assertEquals(wndOutput.getKey(), "testMsg");
+ assertEquals(wndOutput.getMessage(), Integer.valueOf(10));
+ assertFalse(wndOutput.isDelete());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
new file mode 100644
index 0000000..26af26e
--- /dev/null
+++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindows.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.windows;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestWindows {
+
+ @Test
+ public void testSessionWindows() throws NoSuchFieldException, IllegalAccessException {
+ // test constructing the default session window
+ Window<TestMessageEnvelope, String, Collection<TestMessageEnvelope>, WindowOutput<String, Collection<TestMessageEnvelope>>> testWnd = Windows
+ .intoSessions(
+ TestMessageEnvelope::getKey);
+ assertTrue(testWnd instanceof SessionWindow);
+ Field wndKeyFuncField = SessionWindow.class.getDeclaredField("wndKeyFunction");
+ Field aggregatorField = SessionWindow.class.getDeclaredField("aggregator");
+ wndKeyFuncField.setAccessible(true);
+ aggregatorField.setAccessible(true);
+ Function<TestMessageEnvelope, String> wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd);
+ assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", "test-value", 0)), "test-key");
+ BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>> aggrFunc =
+ (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd);
+ TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+ Collection<TestMessageEnvelope> collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+ assertTrue(collection.size() == 1);
+ assertTrue(collection.contains(mockMsg));
+
+ // test constructing the session window w/ customized session info
+ Window<TestMessageEnvelope, String, Collection<Character>, WindowOutput<String, Collection<Character>>> testWnd2 = Windows.intoSessions(
+ m -> String.format("key-%d", m.getMessage().getEventTime()), m -> m.getMessage().getValue().charAt(0));
+ assertTrue(testWnd2 instanceof SessionWindow);
+ wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testWnd2);
+ aggrFunc = (BiFunction<TestMessageEnvelope, Collection<TestMessageEnvelope>, Collection<TestMessageEnvelope>>) aggregatorField.get(testWnd2);
+ assertEquals(wndKeyFunc.apply(new TestMessageEnvelope("test-key", "test-value", 0)), "key-0");
+ TestMessageEnvelope.MessageType mockInnerMessage = mock(TestMessageEnvelope.MessageType.class);
+ when(mockMsg.getMessage()).thenReturn(mockInnerMessage);
+ when(mockInnerMessage.getValue()).thenReturn("x-001");
+ collection = aggrFunc.apply(mockMsg, new ArrayList<>());
+ assertTrue(collection.size() == 1);
+ assertTrue(collection.contains('x'));
+
+ // test constructing session window w/ a default counter
+ Window<TestMessageEnvelope, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
+ m -> String.format("key-%d", m.getMessage().getEventTime()));
+ assertTrue(testCounter instanceof SessionWindow);
+ wndKeyFunc = (Function<TestMessageEnvelope, String>) wndKeyFuncField.get(testCounter);
+ BiFunction<TestMessageEnvelope, Integer, Integer> counterFn = (BiFunction<TestMessageEnvelope, Integer, Integer>) aggregatorField.get(testCounter);
+ when(mockMsg.getMessage().getEventTime()).thenReturn(12345L);
+ assertEquals(wndKeyFunc.apply(mockMsg), "key-12345");
+ assertEquals(counterFn.apply(mockMsg, 1), Integer.valueOf(2));
+ }
+
+ @Test
+ public void testSetTriggers() throws NoSuchFieldException, IllegalAccessException {
+ Window<TestMessageEnvelope, String, Integer, WindowOutput<String, Integer>> testCounter = Windows.intoSessionCounter(
+ m -> String.format("key-%d", m.getMessage().getEventTime()));
+ // test session window w/ a trigger
+ TriggerBuilder<TestMessageEnvelope, Integer> triggerBuilder = TriggerBuilder.earlyTriggerWhenExceedWndLen(1000L);
+ testCounter.setTriggers(triggerBuilder);
+ Trigger<TestMessageEnvelope, WindowState<Integer>> expectedTrigger = triggerBuilder.build();
+ Trigger<TestMessageEnvelope, WindowState<Integer>> actualTrigger = Windows.getInternalWindowFn(testCounter).getTrigger();
+ // examine all trigger fields are expected
+ Field earlyTriggerField = Trigger.class.getDeclaredField("earlyTrigger");
+ Field lateTriggerField = Trigger.class.getDeclaredField("lateTrigger");
+ Field timerTriggerField = Trigger.class.getDeclaredField("timerTrigger");
+ Field earlyTriggerUpdater = Trigger.class.getDeclaredField("earlyTriggerUpdater");
+ Field lateTriggerUpdater = Trigger.class.getDeclaredField("lateTriggerUpdater");
+ earlyTriggerField.setAccessible(true);
+ lateTriggerField.setAccessible(true);
+ timerTriggerField.setAccessible(true);
+ earlyTriggerUpdater.setAccessible(true);
+ lateTriggerUpdater.setAccessible(true);
+ assertEquals(earlyTriggerField.get(expectedTrigger), earlyTriggerField.get(actualTrigger));
+ assertEquals(lateTriggerField.get(expectedTrigger), lateTriggerField.get(actualTrigger));
+ assertEquals(timerTriggerField.get(expectedTrigger), timerTriggerField.get(actualTrigger));
+ assertEquals(earlyTriggerUpdater.get(expectedTrigger), earlyTriggerUpdater.get(actualTrigger));
+ assertEquals(lateTriggerUpdater.get(expectedTrigger), lateTriggerUpdater.get(actualTrigger));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
new file mode 100644
index 0000000..231d3f5
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpecs;
+import org.apache.samza.operators.windows.Window;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+
+/**
+ * The implementation for input/output {@link MessageStream}s to/from the operators.
+ * Users use the {@link MessageStream} API methods to describe and chain the operators specs.
+ *
+ * @param <M> type of {@link MessageEnvelope}s in this {@link MessageStream}
+ */
+public class MessageStreamImpl<M extends MessageEnvelope> implements MessageStream<M> {
+
+ /**
+ * The set of operators that consume the {@link MessageEnvelope}s in this {@link MessageStream}
+ */
+ private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
+
+ @Override
+ public <OM extends MessageEnvelope> MessageStream<OM> map(MapFunction<M, OM> mapFn) {
+ OperatorSpec<OM> op = OperatorSpecs.<M, OM>createStreamOperator(m -> new ArrayList<OM>() { {
+ OM r = mapFn.apply(m);
+ if (r != null) {
+ this.add(r);
+ }
+ } });
+ this.registeredOperatorSpecs.add(op);
+ return op.getOutputStream();
+ }
+
+ @Override
+ public <OM extends MessageEnvelope> MessageStream<OM> flatMap(FlatMapFunction<M, OM> flatMapFn) {
+ OperatorSpec<OM> op = OperatorSpecs.createStreamOperator(flatMapFn);
+ this.registeredOperatorSpecs.add(op);
+ return op.getOutputStream();
+ }
+
+ @Override
+ public MessageStream<M> filter(FilterFunction<M> filterFn) {
+ OperatorSpec<M> op = OperatorSpecs.<M, M>createStreamOperator(t -> new ArrayList<M>() { {
+ if (filterFn.apply(t)) {
+ this.add(t);
+ }
+ } });
+ this.registeredOperatorSpecs.add(op);
+ return op.getOutputStream();
+ }
+
+ @Override
+ public void sink(SinkFunction<M> sinkFn) {
+ this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperator(sinkFn));
+ }
+
+ @Override
+ public <WK, WV, WS extends WindowState<WV>, WM extends WindowOutput<WK, WV>> MessageStream<WM> window(
+ Window<M, WK, WV, WM> window) {
+ OperatorSpec<WM> wndOp = OperatorSpecs.createWindowOperator((WindowFn<M, WK, WS, WM>) window.getInternalWindowFn());
+ this.registeredOperatorSpecs.add(wndOp);
+ return wndOp.getOutputStream();
+ }
+
+ @Override
+ public <K, JM extends MessageEnvelope<K, ?>, RM extends MessageEnvelope> MessageStream<RM> join(
+ MessageStream<JM> otherStream, JoinFunction<M, JM, RM> joinFn) {
+ MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>();
+
+ BiFunction<M, JM, RM> parJoin1 = joinFn::apply;
+ BiFunction<JM, M, RM> parJoin2 = (m, t1) -> joinFn.apply(t1, m);
+
+ // TODO: need to add default store functions for the two partial join functions
+
+ ((MessageStreamImpl<JM>) otherStream).registeredOperatorSpecs.add(
+ OperatorSpecs.createPartialJoinOperator(parJoin2, outputStream));
+ this.registeredOperatorSpecs.add(OperatorSpecs.createPartialJoinOperator(parJoin1, outputStream));
+ return outputStream;
+ }
+
+ @Override
+ public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
+ MessageStreamImpl<M> outputStream = new MessageStreamImpl<>();
+
+ otherStreams.add(this);
+ otherStreams.forEach(other ->
+ ((MessageStreamImpl<M>) other).registeredOperatorSpecs.add(OperatorSpecs.createMergeOperator(outputStream)));
+ return outputStream;
+ }
+
+ /**
+ * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and
+ * should not be exposed to users.
+ *
+ * @return a collection containing all {@link OperatorSpec}s that are registered with this {@link MessageStream}.
+ */
+ public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
+ return Collections.unmodifiableSet(this.registeredOperatorSpecs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
new file mode 100644
index 0000000..2572f14
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/StateStoreImpl.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+
+
+/**
+ * The base class for all state stores
+ */
+public class StateStoreImpl<M extends MessageEnvelope, SK, SS> {
+ private final String storeName;
+ private final StoreFunctions<M, SK, SS> storeFunctions;
+ private KeyValueStore<SK, SS> kvStore = null;
+
+ public StateStoreImpl(StoreFunctions<M, SK, SS> store, String storeName) {
+ this.storeFunctions = store;
+ this.storeName = storeName;
+ }
+
+ public void init(TaskContext context) {
+ this.kvStore = (KeyValueStore<SK, SS>) context.getStore(this.storeName);
+ }
+
+ public Entry<SK, SS> getState(M m) {
+ SK key = this.storeFunctions.getStoreKeyFn().apply(m);
+ SS state = this.kvStore.get(key);
+ return new Entry<>(key, state);
+ }
+
+ public Entry<SK, SS> updateState(M m, Entry<SK, SS> oldEntry) {
+ SS newValue = this.storeFunctions.getStateUpdaterFn().apply(m, oldEntry.getValue());
+ this.kvStore.put(oldEntry.getKey(), newValue);
+ return new Entry<>(oldEntry.getKey(), newValue);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
new file mode 100644
index 0000000..152cd92
--- /dev/null
+++ b/samza-operator/src/main/java/org/apache/samza/operators/StreamOperatorAdaptorTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.impl.OperatorImpl;
+import org.apache.samza.operators.impl.OperatorImpls;
+import org.apache.samza.system.IncomingMessageEnvelope;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.task.InitableTask;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.StreamTask;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.apache.samza.task.WindowableTask;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * An {@link StreamTask} implementation that receives {@link IncomingSystemMessageEnvelope}s and propagates them
+ * through the user's stream transformations defined in {@link StreamOperatorTask#transform(Map)} using the
+ * {@link MessageStream} APIs.
+ * <p>
+ * This class brings all the operator API implementation components together and feeds the
+ * {@link IncomingSystemMessageEnvelope}s into the transformation chains.
+ * <p>
+ * It accepts an instance of the user implemented {@link StreamOperatorTask}. When its own {@link #init(Config, TaskContext)}
+ * method is called during startup, it creates a {@link MessageStreamImpl} corresponding to each of its input
+ * {@link SystemStreamPartition}s and then calls the user's {@link StreamOperatorTask#transform(Map)} method.
+ * <p>
+ * When users invoke the methods on the {@link MessageStream} API to describe their stream transformations in the
+ * {@link StreamOperatorTask#transform(Map)} method, the underlying {@link MessageStreamImpl} creates the
+ * corresponding {@link org.apache.samza.operators.spec.OperatorSpec} to record information about the desired
+ * transformation, and returns the output {@link MessageStream} to allow further transform chaining.
+ * <p>
+ * Once the user's transformation DAGs have been described for all {@link MessageStream}s (i.e., when the
+ * {@link StreamOperatorTask#transform(Map)} call returns), it calls
+ * {@link OperatorImpls#createOperatorImpls(MessageStreamImpl, TaskContext)} for each of the input
+ * {@link MessageStreamImpl}. This instantiates the {@link org.apache.samza.operators.impl.OperatorImpl} DAG
+ * corresponding to the aforementioned {@link org.apache.samza.operators.spec.OperatorSpec} DAG and returns the
+ * root node of the DAG, which this class saves.
+ * <p>
+ * Now that it has the root for the DAG corresponding to each {@link SystemStreamPartition}, it can pass the message
+ * envelopes received in {@link StreamTask#process(IncomingMessageEnvelope, MessageCollector, TaskCoordinator)} along
+ * to the appropriate root nodes. From then on, each {@link org.apache.samza.operators.impl.OperatorImpl} propagates
+ * its transformed output to the next set of {@link org.apache.samza.operators.impl.OperatorImpl}s.
+ */
+public final class StreamOperatorAdaptorTask implements StreamTask, InitableTask, WindowableTask {
+
+ /**
+ * A mapping from each {@link SystemStreamPartition} to the root node of its operator chain DAG.
+ */
+ private final Map<SystemStreamPartition, OperatorImpl<IncomingSystemMessageEnvelope, ? extends MessageEnvelope>> operatorChains = new HashMap<>();
+
+ private final StreamOperatorTask userTask;
+
+ public StreamOperatorAdaptorTask(StreamOperatorTask userTask) {
+ this.userTask = userTask;
+ }
+
+ @Override
+ public final void init(Config config, TaskContext context) throws Exception {
+ if (this.userTask instanceof InitableTask) {
+ ((InitableTask) this.userTask).init(config, context);
+ }
+ Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams = new HashMap<>();
+ context.getSystemStreamPartitions().forEach(ssp -> messageStreams.put(ssp, new MessageStreamImpl<>()));
+ this.userTask.transform(messageStreams);
+ messageStreams.forEach((ssp, ms) ->
+ operatorChains.put(ssp, OperatorImpls.createOperatorImpls((MessageStreamImpl<IncomingSystemMessageEnvelope>) ms, context)));
+ }
+
+ @Override
+ public final void process(IncomingMessageEnvelope ime, MessageCollector collector, TaskCoordinator coordinator) {
+ this.operatorChains.get(ime.getSystemStreamPartition())
+ .onNext(new IncomingSystemMessageEnvelope(ime), collector, coordinator);
+ }
+
+ @Override
+ public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception {
+ if (this.userTask instanceof WindowableTask) {
+ ((WindowableTask) this.userTask).window(collector, coordinator);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
deleted file mode 100644
index 1eee2dc..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.Operator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * Implementation class for a chain of operators from the single input {@code source}
- *
- * @param <M> type of message in the input stream {@code source}
- */
-public class ChainedOperators<M extends Message> {
-
- private final Set<OperatorImpl> subscribers = new HashSet<>();
-
- /**
- * Private constructor
- *
- * @param source the input source {@link MessageStream}
- * @param context the {@link TaskContext} object that we need to instantiate the state stores
- */
- private ChainedOperators(MessageStream<M> source, TaskContext context) {
- // create the pipeline/topology starting from source
- source.getSubscribers().forEach(sub -> {
- // pass in the context s.t. stateful stream operators can initialize their stores
- OperatorImpl subImpl = this.createAndSubscribe(sub, source, context);
- this.subscribers.add(subImpl);
- });
- }
-
- /**
- * Private function to recursively instantiate the implementation of operators and the chains
- *
- * @param operator the operator that subscribe to {@code source}
- * @param source the source {@link MessageStream}
- * @param context the context of the task
- * @return the implementation object of the corresponding {@code operator}
- */
- private OperatorImpl<M, ? extends Message> createAndSubscribe(Operator operator, MessageStream source,
- TaskContext context) {
- Entry<OperatorImpl<M, ? extends Message>, Boolean> factoryEntry = OperatorFactory.getOperator(operator);
- if (factoryEntry.getValue()) {
- // The operator has already been instantiated and we do not need to traverse and create the subscribers any more.
- return factoryEntry.getKey();
- }
- OperatorImpl<M, ? extends Message> opImpl = factoryEntry.getKey();
- MessageStream outStream = operator.getOutputStream();
- Collection<Operator> subs = outStream.getSubscribers();
- subs.forEach(sub -> {
- OperatorImpl subImpl = this.createAndSubscribe(sub, operator.getOutputStream(), context);
- opImpl.subscribe(subImpl);
- });
- // initialize the operator's state store
- opImpl.init(source, context);
- return opImpl;
- }
-
- /**
- * Static method to create a {@link ChainedOperators} from the {@code source} stream
- *
- * @param source the input source {@link MessageStream}
- * @param context the {@link TaskContext} object used to initialize the {@link StateStoreImpl}
- * @param <M> the type of input {@link Message}
- * @return a {@link ChainedOperators} object takes the {@code source} as input
- */
- public static <M extends Message> ChainedOperators create(MessageStream<M> source, TaskContext context) {
- return new ChainedOperators<>(source, context);
- }
-
- /**
- * Method to navigate the incoming {@code message} through the processing chains
- *
- * @param message the incoming message to this {@link ChainedOperators}
- * @param collector the {@link MessageCollector} object within the process context
- * @param coordinator the {@link TaskCoordinator} object within the process context
- */
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- this.subscribers.forEach(sub -> sub.onNext(message, collector, coordinator));
- }
-
- /**
- * Method to handle timer events
- *
- * @param collector the {@link MessageCollector} object within the process context
- * @param coordinator the {@link TaskCoordinator} object within the process context
- */
- public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
- long nanoTime = System.nanoTime();
- this.subscribers.forEach(sub -> sub.onTimer(nanoTime, collector, coordinator));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
deleted file mode 100644
index ea90878..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.internal.Operators.*;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
-import org.apache.samza.operators.impl.window.SessionWindowImpl;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-
-/**
- * The factory class that instantiates all implementation of {@link OperatorImpl} classes.
- */
-public class OperatorFactory {
-
- /**
- * the static operatorMap that includes all operator implementation instances
- */
- private static final Map<Operator, OperatorImpl<? extends Message, ? extends Message>> OPERATOR_MAP = new ConcurrentHashMap<>();
-
- /**
- * The method to actually create the implementation instances of operators
- *
- * @param operator the immutable definition of {@link Operator}
- * @param <M> type of input {@link Message}
- * @param <RM> type of output {@link Message}
- * @return the implementation object of {@link OperatorImpl}
- */
- private static <M extends Message, RM extends Message> OperatorImpl<M, ? extends Message> createOperator(Operator<RM> operator) {
- if (operator instanceof StreamOperator) {
- return new SimpleOperatorImpl<>((StreamOperator<M, RM>) operator);
- } else if (operator instanceof SinkOperator) {
- return new SinkOperatorImpl<>((SinkOperator<M>) operator);
- } else if (operator instanceof WindowOperator) {
- return new SessionWindowImpl<>((WindowOperator<M, ?, ? extends WindowState, ? extends WindowOutput>) operator);
- } else if (operator instanceof PartialJoinOperator) {
- return new PartialJoinOpImpl<>((PartialJoinOperator) operator);
- }
- throw new IllegalArgumentException(
- String.format("The type of operator is not supported. Operator class name: %s", operator.getClass().getName()));
- }
-
- /**
- * The method to get the unique implementation instance of {@link Operator}
- *
- * @param operator the {@link Operator} to instantiate
- * @param <M> type of input {@link Message}
- * @param <RM> type of output {@link Message}
- * @return A pair of entry that include the unique implementation instance to the {@code operator} and a boolean value indicating whether
- * the operator instance has already been created or not. True means the operator instance has already created, false means the operator
- * was not created.
- */
- public static <M extends Message, RM extends Message> Entry<OperatorImpl<M, ? extends Message>, Boolean> getOperator(Operator<RM> operator) {
- if (!OPERATOR_MAP.containsKey(operator)) {
- OperatorImpl<M, ? extends Message> operatorImpl = OperatorFactory.createOperator(operator);
- if (OPERATOR_MAP.putIfAbsent(operator, operatorImpl) == null) {
- return new Entry<OperatorImpl<M, ? extends Message>, Boolean>(operatorImpl, false) { };
- }
- }
- return new Entry<OperatorImpl<M, ? extends Message>, Boolean>((OperatorImpl<M, ? extends Message>) OPERATOR_MAP.get(operator), true) { };
- }
-
-}