You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/12/01 22:51:02 UTC
[2/6] samza git commit: SAMZA-1054: Refactor Operator APIs
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
new file mode 100644
index 0000000..2ad6461
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.MapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.SessionWindow;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestMessageStreamImpl {
+
+ @Test
+ public void testMap() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+ MapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xMap =
+ m -> new TestOutputMessageEnvelope(m.getKey(), m.getMessage().getValue().length() + 1);
+ MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.map(xMap);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestOutputMessageEnvelope> mapOp = subs.iterator().next();
+ assertTrue(mapOp instanceof StreamOperatorSpec);
+ assertEquals(mapOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ TestMessageEnvelope xTestMsg = mock(TestMessageEnvelope.class);
+ TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
+ when(xTestMsg.getKey()).thenReturn("test-msg-key");
+ when(xTestMsg.getMessage()).thenReturn(mockInnerTestMessage);
+ when(mockInnerTestMessage.getValue()).thenReturn("123456789");
+
+ Collection<TestOutputMessageEnvelope> cOutputMsg = ((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) mapOp).getTransformFn().apply(xTestMsg);
+ assertEquals(cOutputMsg.size(), 1);
+ TestOutputMessageEnvelope outputMessage = cOutputMsg.iterator().next();
+ assertEquals(outputMessage.getKey(), xTestMsg.getKey());
+ assertEquals(outputMessage.getMessage(), Integer.valueOf(xTestMsg.getMessage().getValue().length() + 1));
+ }
+
+ @Test
+ public void testFlatMap() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+ Set<TestOutputMessageEnvelope> flatOuts = new HashSet<TestOutputMessageEnvelope>() { {
+ this.add(mock(TestOutputMessageEnvelope.class));
+ this.add(mock(TestOutputMessageEnvelope.class));
+ this.add(mock(TestOutputMessageEnvelope.class));
+ } };
+ FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> xFlatMap = m -> flatOuts;
+ MessageStream<TestOutputMessageEnvelope> outputStream = inputStream.flatMap(xFlatMap);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestOutputMessageEnvelope> flatMapOp = subs.iterator().next();
+ assertTrue(flatMapOp instanceof StreamOperatorSpec);
+ assertEquals(flatMapOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ assertEquals(((StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope>) flatMapOp).getTransformFn(), xFlatMap);
+ }
+
+ @Test
+ public void testFilter() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+ FilterFunction<TestMessageEnvelope> xFilter = m -> m.getMessage().getEventTime() > 123456L;
+ MessageStream<TestMessageEnvelope> outputStream = inputStream.filter(xFilter);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> filterOp = subs.iterator().next();
+ assertTrue(filterOp instanceof StreamOperatorSpec);
+ assertEquals(filterOp.getOutputStream(), outputStream);
+ // assert that the transformation function is what we defined above
+ FlatMapFunction<TestMessageEnvelope, TestMessageEnvelope> txfmFn = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) filterOp).getTransformFn();
+ TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+ TestMessageEnvelope.MessageType mockInnerTestMessage = mock(TestMessageEnvelope.MessageType.class);
+ when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
+ when(mockInnerTestMessage.getEventTime()).thenReturn(11111L);
+ Collection<TestMessageEnvelope> output = txfmFn.apply(mockMsg);
+ assertTrue(output.isEmpty());
+ when(mockMsg.getMessage()).thenReturn(mockInnerTestMessage);
+ when(mockInnerTestMessage.getEventTime()).thenReturn(999999L);
+ output = txfmFn.apply(mockMsg);
+ assertEquals(output.size(), 1);
+ assertEquals(output.iterator().next(), mockMsg);
+ }
+
+ @Test
+ public void testSink() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+ SinkFunction<TestMessageEnvelope> xSink = (m, mc, tc) -> {
+ mc.send(new OutgoingMessageEnvelope(new SystemStream("test-sys", "test-stream"), m.getMessage()));
+ tc.commit(TaskCoordinator.RequestScope.CURRENT_TASK);
+ };
+ inputStream.sink(xSink);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> sinkOp = subs.iterator().next();
+ assertTrue(sinkOp instanceof SinkOperatorSpec);
+ assertEquals(((SinkOperatorSpec) sinkOp).getSinkFn(), xSink);
+ assertNull(((SinkOperatorSpec) sinkOp).getOutputStream());
+ }
+
+ @Test
+ public void testWindow() {
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>();
+ SessionWindow<TestMessageEnvelope, String, Integer> window = mock(SessionWindow.class);
+ doReturn(mock(WindowFn.class)).when(window).getInternalWindowFn();
+ MessageStream<WindowOutput<String, Integer>> outStream = inputStream.window(window);
+ Collection<OperatorSpec> subs = inputStream.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> wndOp = subs.iterator().next();
+ assertTrue(wndOp instanceof WindowOperatorSpec);
+ assertEquals(((WindowOperatorSpec) wndOp).getOutputStream(), outStream);
+ }
+
+ @Test
+ public void testJoin() {
+ MessageStreamImpl<TestMessageEnvelope> source1 = new MessageStreamImpl<>();
+ MessageStreamImpl<TestMessageEnvelope> source2 = new MessageStreamImpl<>();
+ JoinFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joiner =
+ (m1, m2) -> new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+ MessageStream<TestOutputMessageEnvelope> joinOutput = source1.join(source2, joiner);
+ Collection<OperatorSpec> subs = source1.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> joinOp1 = subs.iterator().next();
+ assertTrue(joinOp1 instanceof PartialJoinOperatorSpec);
+ assertEquals(((PartialJoinOperatorSpec) joinOp1).getOutputStream(), joinOutput);
+ subs = source2.getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> joinOp2 = subs.iterator().next();
+ assertTrue(joinOp2 instanceof PartialJoinOperatorSpec);
+ assertEquals(((PartialJoinOperatorSpec) joinOp2).getOutputStream(), joinOutput);
+ TestMessageEnvelope joinMsg1 = new TestMessageEnvelope("test-join-1", "join-msg-001", 11111L);
+ TestMessageEnvelope joinMsg2 = new TestMessageEnvelope("test-join-2", "join-msg-002", 22222L);
+ TestOutputMessageEnvelope xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp1).getTransformFn().apply(joinMsg1, joinMsg2);
+ assertEquals(xOut.getKey(), "test-join-1");
+ assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ xOut = (TestOutputMessageEnvelope) ((PartialJoinOperatorSpec) joinOp2).getTransformFn().apply(joinMsg2, joinMsg1);
+ assertEquals(xOut.getKey(), "test-join-1");
+ assertEquals(xOut.getMessage(), Integer.valueOf(24));
+ }
+
+ @Test
+ public void testMerge() {
+ MessageStream<TestMessageEnvelope> merge1 = new MessageStreamImpl<>();
+ Collection<MessageStream<TestMessageEnvelope>> others = new ArrayList<MessageStream<TestMessageEnvelope>>() { {
+ this.add(new MessageStreamImpl<>());
+ this.add(new MessageStreamImpl<>());
+ } };
+ MessageStream<TestMessageEnvelope> mergeOutput = merge1.merge(others);
+ validateMergeOperator(merge1, mergeOutput);
+
+ others.forEach(merge -> validateMergeOperator(merge, mergeOutput));
+ }
+
+ private void validateMergeOperator(MessageStream<TestMessageEnvelope> mergeSource, MessageStream<TestMessageEnvelope> mergeOutput) {
+ Collection<OperatorSpec> subs = ((MessageStreamImpl<TestMessageEnvelope>) mergeSource).getRegisteredOperatorSpecs();
+ assertEquals(subs.size(), 1);
+ OperatorSpec<TestMessageEnvelope> mergeOp = subs.iterator().next();
+ assertTrue(mergeOp instanceof StreamOperatorSpec);
+ assertEquals(((StreamOperatorSpec) mergeOp).getOutputStream(), mergeOutput);
+ TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+ Collection<TestMessageEnvelope> outputs = ((StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope>) mergeOp).getTransformFn().apply(mockMsg);
+ assertEquals(outputs.size(), 1);
+ assertEquals(outputs.iterator().next(), mockMsg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
new file mode 100644
index 0000000..8fa7ccc
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/TestStateStoreImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.TaskContext;
+import org.junit.Test;
+
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestStateStoreImpl {
+ @Test
+ public void testStateStoreImpl() {
+ StoreFunctions<TestMessageEnvelope, String, WindowState> mockStoreFunctions = mock(StoreFunctions.class);
+ // test constructor
+ StateStoreImpl<TestMessageEnvelope, String, WindowState> storeImpl = new StateStoreImpl<>(mockStoreFunctions, "myStoreName");
+ TaskContext mockContext = mock(TaskContext.class);
+ KeyValueStore<String, WindowState> mockKvStore = mock(KeyValueStore.class);
+ when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore);
+ // test init()
+ storeImpl.init(mockContext);
+ verify(mockContext, times(1)).getStore("myStoreName");
+ Function<TestMessageEnvelope, String> wndKeyFn = mock(Function.class);
+ when(mockStoreFunctions.getStoreKeyFn()).thenReturn(wndKeyFn);
+ TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+ when(wndKeyFn.apply(mockMsg)).thenReturn("myKey");
+ WindowState mockState = mock(WindowState.class);
+ when(mockKvStore.get("myKey")).thenReturn(mockState);
+ // test getState()
+ Entry<String, WindowState> storeEntry = storeImpl.getState(mockMsg);
+ assertEquals(storeEntry.getKey(), "myKey");
+ assertEquals(storeEntry.getValue(), mockState);
+ verify(wndKeyFn, times(1)).apply(mockMsg);
+ verify(mockKvStore, times(1)).get("myKey");
+ Entry<String, WindowState> oldEntry = new Entry<>("myKey", mockState);
+ WindowState mockNewState = mock(WindowState.class);
+ BiFunction<TestMessageEnvelope, WindowState, WindowState> mockUpdaterFn = mock(BiFunction.class);
+ when(mockStoreFunctions.getStateUpdaterFn()).thenReturn(mockUpdaterFn);
+ when(mockUpdaterFn.apply(mockMsg, mockState)).thenReturn(mockNewState);
+ // test updateState()
+ Entry<String, WindowState> newEntry = storeImpl.updateState(mockMsg, oldEntry);
+ assertEquals(newEntry.getKey(), "myKey");
+ assertEquals(newEntry.getValue(), mockNewState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
new file mode 100644
index 0000000..f33510e
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/WindowTask.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.operators;
+
+import org.apache.samza.operators.data.IncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.data.Offset;
+import org.apache.samza.operators.windows.TriggerBuilder;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.system.SystemStreamPartition;
+
+import java.util.Map;
+
+
+/**
+ * Example implementation of a simple user-defined tasks w/ window operators
+ *
+ */
+public class WindowTask implements StreamOperatorTask {
+ class MessageType {
+ String field1;
+ String field2;
+ }
+
+ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope<MessageType> {
+
+ JsonMessageEnvelope(String key, MessageType data, Offset offset, SystemStreamPartition partition) {
+ super(key, data, offset, partition);
+ }
+ }
+
+ @Override public void transform(Map<SystemStreamPartition, MessageStream<IncomingSystemMessageEnvelope>> messageStreams) {
+ messageStreams.values().forEach(source ->
+ source.map(m1 ->
+ new JsonMessageEnvelope(
+ this.myMessageKeyFunction(m1),
+ (MessageType) m1.getMessage(),
+ m1.getOffset(),
+ m1.getSystemStreamPartition())).
+ window(
+ Windows.<JsonMessageEnvelope, String>intoSessionCounter(
+ m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)).
+ setTriggers(TriggerBuilder.<JsonMessageEnvelope, Integer>earlyTriggerWhenExceedWndLen(100).
+ addTimeoutSinceLastMessage(30000)))
+ );
+ }
+
+ String myMessageKeyFunction(MessageEnvelope<Object, Object> m) {
+ return m.getKey().toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
new file mode 100644
index 0000000..9a425d1
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.data;
+
+import org.apache.samza.system.SystemStreamPartition;
+
+
+/**
+ * Example input {@link MessageEnvelope} w/ Json message and string as the key.
+ */
+
+public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
+
+ private final String key;
+ private final T data;
+ private final Offset offset;
+ private final SystemStreamPartition partition;
+
+ public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
+ this.key = key;
+ this.data = data;
+ this.offset = offset;
+ this.partition = partition;
+ }
+
+ @Override
+ public T getMessage() {
+ return this.data;
+ }
+
+ @Override
+ public String getKey() {
+ return this.key;
+ }
+
+ public Offset getOffset() {
+ return this.offset;
+ }
+
+ public SystemStreamPartition getSystemStreamPartition() {
+ return this.partition;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
deleted file mode 100644
index e3a70e8..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.TestOutputMessage;
-import org.apache.samza.operators.Windows;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-import org.reactivestreams.Subscriber;
-
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-
-public class TestChainedOperators {
- Field subsField = null;
- Field opSubsField = null;
-
- @Before public void prep() throws NoSuchFieldException {
- subsField = ChainedOperators.class.getDeclaredField("subscribers");
- subsField.setAccessible(true);
- opSubsField = OperatorImpl.class.getDeclaredField("subscribers");
- opSubsField.setAccessible(true);
- }
-
- @Test public void testCreate() {
- // test creation of empty chain
- MessageStream<TestMessage> testStream = new MessageStream<>();
- TaskContext mockContext = mock(TaskContext.class);
- ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testStream, mockContext);
- assertTrue(operatorChain != null);
- }
-
- @Test public void testLinearChain() throws IllegalAccessException {
- // test creation of linear chain
- MessageStream<TestMessage> testInput = new MessageStream<>();
- TaskContext mockContext = mock(TaskContext.class);
- testInput.map(m -> m).window(Windows.intoSessionCounter(TestMessage::getKey));
- ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext);
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessage, TestMessage> firstOpImpl = subsSet.iterator().next();
- Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(firstOpImpl);
- assertEquals(subsOps.size(), 1);
- Subscriber<? super ProcessorContext<TestMessage>> wndOpImpl = subsOps.iterator().next();
- subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(wndOpImpl);
- assertEquals(subsOps.size(), 0);
- }
-
- @Test public void testBroadcastChain() throws IllegalAccessException {
- // test creation of broadcast chain
- MessageStream<TestMessage> testInput = new MessageStream<>();
- TaskContext mockContext = mock(TaskContext.class);
- testInput.filter(m -> m.getTimestamp() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
- testInput.filter(m -> m.getTimestamp() < 123456L).map(m -> m);
- ChainedOperators<TestMessage> operatorChain = ChainedOperators.create(testInput, mockContext);
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(operatorChain);
- assertEquals(subsSet.size(), 2);
- Iterator<OperatorImpl> iter = subsSet.iterator();
- // check the first branch w/ flatMap
- OperatorImpl<TestMessage, TestMessage> opImpl = iter.next();
- Set<Subscriber<? super ProcessorContext<TestMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl);
- assertEquals(subsOps.size(), 1);
- Subscriber<? super ProcessorContext<TestMessage>> flatMapImpl = subsOps.iterator().next();
- subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(flatMapImpl);
- assertEquals(subsOps.size(), 0);
- // check the second branch w/ map
- opImpl = iter.next();
- subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(opImpl);
- assertEquals(subsOps.size(), 1);
- Subscriber<? super ProcessorContext<TestMessage>> mapImpl = subsOps.iterator().next();
- subsOps = (Set<Subscriber<? super ProcessorContext<TestMessage>>>) opSubsField.get(mapImpl);
- assertEquals(subsOps.size(), 0);
- }
-
- @Test public void testJoinChain() throws IllegalAccessException {
- // test creation of join chain
- MessageStream<TestMessage> input1 = new MessageStream<>();
- MessageStream<TestMessage> input2 = new MessageStream<>();
- TaskContext mockContext = mock(TaskContext.class);
- input1.join(input2, (m1, m2) -> new TestOutputMessage(m1.getKey(), m1.getMessage().length() + m2.getMessage().length(), m1.getTimestamp())).map(m -> m);
- // now, we create chained operators from each input sources
- ChainedOperators<TestMessage> chain1 = ChainedOperators.create(input1, mockContext);
- ChainedOperators<TestMessage> chain2 = ChainedOperators.create(input2, mockContext);
- // check that those two chains will merge at map operator
- // first branch of the join
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) subsField.get(chain1);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessage, TestOutputMessage> joinOp1 = subsSet.iterator().next();
- Set<Subscriber<? super ProcessorContext<TestOutputMessage>>> subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp1);
- assertEquals(subsOps.size(), 1);
- // the map operator consumes the common join output, where two branches merge
- Subscriber<? super ProcessorContext<TestOutputMessage>> mapImpl = subsOps.iterator().next();
- // second branch of the join
- subsSet = (Set<OperatorImpl>) subsField.get(chain2);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessage, TestOutputMessage> joinOp2 = subsSet.iterator().next();
- assertNotSame(joinOp1, joinOp2);
- subsOps = (Set<Subscriber<? super ProcessorContext<TestOutputMessage>>>) opSubsField.get(joinOp2);
- assertEquals(subsOps.size(), 1);
- // make sure that the map operator is the same
- assertEquals(mapImpl, subsOps.iterator().next());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
deleted file mode 100644
index cb4576c..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.TestOutputMessage;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.impl.join.PartialJoinOpImpl;
-import org.apache.samza.operators.impl.window.SessionWindowImpl;
-import org.apache.samza.operators.internal.Operators.PartialJoinOperator;
-import org.apache.samza.operators.internal.Operators.SinkOperator;
-import org.apache.samza.operators.internal.Operators.StreamOperator;
-import org.apache.samza.operators.internal.Operators.WindowOperator;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorFactory {
-
- @Test public void testGetOperator() throws NoSuchFieldException, IllegalAccessException {
- // get window operator
- WindowOperator mockWnd = mock(WindowOperator.class);
- Entry<OperatorImpl<TestMessage, ? extends Message>, Boolean>
- factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockWnd);
- assertFalse(factoryEntry.getValue());
- OperatorImpl<TestMessage, TestOutputMessage> opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
- assertTrue(opImpl instanceof SessionWindowImpl);
- Field sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
- sessWndField.setAccessible(true);
- WindowOperator sessWnd = (WindowOperator) sessWndField.get(opImpl);
- assertEquals(sessWnd, mockWnd);
-
- // get simple operator
- StreamOperator<TestMessage, TestOutputMessage> mockSimpleOp = mock(StreamOperator.class);
- Function<TestMessage, Collection<TestOutputMessage>> mockTxfmFn = mock(Function.class);
- when(mockSimpleOp.getFunction()).thenReturn(mockTxfmFn);
- factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(mockSimpleOp);
- opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
- assertTrue(opImpl instanceof SimpleOperatorImpl);
- Field txfmFnField = SimpleOperatorImpl.class.getDeclaredField("transformFn");
- txfmFnField.setAccessible(true);
- assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
-
- // get sink operator
- MessageStream.VoidFunction3<TestMessage, MessageCollector, TaskCoordinator> sinkFn = (m, mc, tc) -> { };
- SinkOperator<TestMessage> sinkOp = mock(SinkOperator.class);
- when(sinkOp.getFunction()).thenReturn(sinkFn);
- factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(sinkOp);
- opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
- assertTrue(opImpl instanceof SinkOperatorImpl);
- Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFunc");
- sinkFnField.setAccessible(true);
- assertEquals(sinkFn, sinkFnField.get(opImpl));
-
- // get join operator
- PartialJoinOperator<TestMessage, String, TestMessage, TestOutputMessage> joinOp = mock(PartialJoinOperator.class);
- TestOutputMessage mockOutput = mock(TestOutputMessage.class);
- BiFunction<TestMessage, TestMessage, TestOutputMessage> joinFn = (m1, m2) -> mockOutput;
- when(joinOp.getFunction()).thenReturn(joinFn);
- factoryEntry = OperatorFactory.<TestMessage, TestOutputMessage>getOperator(joinOp);
- opImpl = (OperatorImpl<TestMessage, TestOutputMessage>) factoryEntry.getKey();
- assertTrue(opImpl instanceof PartialJoinOpImpl);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
index 4bd467d..361972e 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
@@ -18,52 +18,54 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.TestOutputMessage;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
+import org.hamcrest.core.IsEqual;
import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.reactivestreams.Subscriber;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
public class TestOperatorImpl {
- TestMessage curInputMsg;
+ TestMessageEnvelope curInputMsg;
MessageCollector curCollector;
TaskCoordinator curCoordinator;
- @Test public void testSubscribers() {
+ @Test
+ public void testSubscribers() {
this.curInputMsg = null;
this.curCollector = null;
this.curCoordinator = null;
- OperatorImpl<TestMessage, TestOutputMessage> opImpl = new OperatorImpl<TestMessage, TestOutputMessage>() {
- @Override protected void onNext(TestMessage message, MessageCollector collector, TaskCoordinator coordinator) {
+ OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() {
+ @Override
+ public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
TestOperatorImpl.this.curInputMsg = message;
TestOperatorImpl.this.curCollector = collector;
TestOperatorImpl.this.curCoordinator = coordinator;
}
};
- // verify subscribe() added the mockSub and nextProcessors() invoked the mockSub.onNext()
- Subscriber<ProcessorContext<TestOutputMessage>> mockSub = mock(Subscriber.class);
- opImpl.subscribe(mockSub);
- TestOutputMessage xOutput = mock(TestOutputMessage.class);
+ // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
+ OperatorImpl mockSub = mock(OperatorImpl.class);
+ opImpl.registerNextOperator(mockSub);
+ TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- opImpl.nextProcessors(xOutput, mockCollector, mockCoordinator);
- verify(mockSub, times(1)).onNext(argThat(new ArgumentMatcher<ProcessorContext<TestOutputMessage>>() {
- @Override public boolean matches(Object argument) {
- ProcessorContext<TestOutputMessage> pCntx = (ProcessorContext<TestOutputMessage>) argument;
- return pCntx.getMessage().equals(xOutput) && pCntx.getCoordinator().equals(mockCoordinator) && pCntx.getCollector().equals(mockCollector);
- }
- }));
+ opImpl.propagateResult(xOutput, mockCollector, mockCoordinator);
+ verify(mockSub, times(1)).onNext(
+ argThat(new IsEqual<>(xOutput)),
+ argThat(new IsEqual<>(mockCollector)),
+ argThat(new IsEqual<>(mockCoordinator))
+ );
// verify onNext() is invoked correctly
- TestMessage mockInput = mock(TestMessage.class);
- ProcessorContext<TestMessage> inCntx = new ProcessorContext<>(mockInput, mockCollector, mockCoordinator);
- opImpl.onNext(inCntx);
+ TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
+ opImpl.onNext(mockInput, mockCollector, mockCoordinator);
assertEquals(mockInput, this.curInputMsg);
assertEquals(mockCollector, this.curCollector);
assertEquals(mockCoordinator, this.curCoordinator);
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
new file mode 100644
index 0000000..1d0d547
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.task.TaskContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperatorImpls {
+ Field nextOperatorsField = null;
+
+ @Before
+ public void prep() throws NoSuchFieldException {
+ nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
+ nextOperatorsField.setAccessible(true);
+ }
+
+ @Test
+ public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException {
+ // get window operator
+ WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
+ OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = OperatorImpls.createOperatorImpl(mockWnd);
+ assertTrue(opImpl instanceof SessionWindowOperatorImpl);
+ Field sessWndField = SessionWindowOperatorImpl.class.getDeclaredField("windowSpec");
+ sessWndField.setAccessible(true);
+ WindowOperatorSpec sessWnd = (WindowOperatorSpec) sessWndField.get(opImpl);
+ assertEquals(sessWnd, mockWnd);
+
+ // get simple operator
+ StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
+ FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
+ when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
+ opImpl = OperatorImpls.createOperatorImpl(mockSimpleOp);
+ assertTrue(opImpl instanceof StreamOperatorImpl);
+ Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
+ txfmFnField.setAccessible(true);
+ assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
+
+ // get sink operator
+ SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
+ when(sinkOp.getSinkFn()).thenReturn(sinkFn);
+ opImpl = OperatorImpls.createOperatorImpl(sinkOp);
+ assertTrue(opImpl instanceof SinkOperatorImpl);
+ Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
+ sinkFnField.setAccessible(true);
+ assertEquals(sinkFn, sinkFnField.get(opImpl));
+
+ // get join operator
+ PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
+ TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
+ BiFunction<TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = (m1, m2) -> mockOutput;
+ when(joinOp.getTransformFn()).thenReturn(joinFn);
+ opImpl = OperatorImpls.createOperatorImpl(joinOp);
+ assertTrue(opImpl instanceof PartialJoinOperatorImpl);
+ }
+
+ @Test
+ public void testEmptyChain() {
+ // test creation of empty chain
+ MessageStreamImpl<TestMessageEnvelope> testStream = new MessageStreamImpl<>();
+ TaskContext mockContext = mock(TaskContext.class);
+ RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testStream, mockContext);
+ assertTrue(operatorChain != null);
+ }
+
+ @Test
+ public void testLinearChain() throws IllegalAccessException {
+ // test creation of linear chain
+ MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+ TaskContext mockContext = mock(TaskContext.class);
+ testInput.map(m -> m).window(Windows.intoSessionCounter(TestMessageEnvelope::getKey));
+ RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
+ Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
+ assertEquals(subsOps.size(), 1);
+ OperatorImpl wndOpImpl = subsOps.iterator().next();
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
+ assertEquals(subsOps.size(), 0);
+ }
+
+ @Test
+ public void testBroadcastChain() throws IllegalAccessException {
+ // test creation of broadcast chain
+ MessageStreamImpl<TestMessageEnvelope> testInput = new MessageStreamImpl<>();
+ TaskContext mockContext = mock(TaskContext.class);
+ testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
+ testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
+ RootOperatorImpl operatorChain = OperatorImpls.createOperatorImpls(testInput, mockContext);
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
+ assertEquals(subsSet.size(), 2);
+ Iterator<OperatorImpl> iter = subsSet.iterator();
+ // check the first branch w/ flatMap
+ OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
+ Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
+ assertEquals(subsOps.size(), 1);
+ OperatorImpl flatMapImpl = subsOps.iterator().next();
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
+ assertEquals(subsOps.size(), 0);
+ // check the second branch w/ map
+ opImpl = iter.next();
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
+ assertEquals(subsOps.size(), 1);
+ OperatorImpl mapImpl = subsOps.iterator().next();
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
+ assertEquals(subsOps.size(), 0);
+ }
+
+ @Test
+ public void testJoinChain() throws IllegalAccessException {
+ // test creation of join chain
+ MessageStreamImpl<TestMessageEnvelope> input1 = new MessageStreamImpl<>();
+ MessageStreamImpl<TestMessageEnvelope> input2 = new MessageStreamImpl<>();
+ TaskContext mockContext = mock(TaskContext.class);
+ input1
+ .join(input2, (m1, m2) ->
+ new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length()))
+ .map(m -> m);
+ // now, we create chained operators from each input sources
+ RootOperatorImpl chain1 = OperatorImpls.createOperatorImpls(input1, mockContext);
+ RootOperatorImpl chain2 = OperatorImpls.createOperatorImpls(input2, mockContext);
+ // check that those two chains will merge at map operator
+ // first branch of the join
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
+ Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
+ assertEquals(subsOps.size(), 1);
+ // the map operator consumes the common join output, where two branches merge
+ OperatorImpl mapImpl = subsOps.iterator().next();
+ // second branch of the join
+ subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
+ assertNotSame(joinOp1, joinOp2);
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
+ assertEquals(subsOps.size(), 1);
+ // make sure that the map operator is the same
+ assertEquals(mapImpl, subsOps.iterator().next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
deleted file mode 100644
index 224245e..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestProcessorContext.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-
-
-public class TestProcessorContext {
- @Test public void testConstructor() {
- TestMessage mockMsg = mock(TestMessage.class);
- MessageCollector mockCollector = mock(MessageCollector.class);
- TaskCoordinator mockTaskCoordinator = mock(TaskCoordinator.class);
- ProcessorContext<TestMessage> pCntx = new ProcessorContext<>(mockMsg, mockCollector, mockTaskCoordinator);
- assertEquals(pCntx.getMessage(), mockMsg);
- assertEquals(pCntx.getCollector(), mockCollector);
- assertEquals(pCntx.getCoordinator(), mockTaskCoordinator);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSessionWindowImpl.java
new file mode 100644
index 0000000..c302df7
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSessionWindowImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.storage.kv.Entry;
+import org.apache.samza.storage.kv.KeyValueStore;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+import java.lang.reflect.Field;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestSessionWindowImpl {
+ Field wndStoreField = null;
+ Field sessWndField = null;
+
+ @Before public void prep() throws NoSuchFieldException {
+ wndStoreField = SessionWindowOperatorImpl.class.getDeclaredField("stateStore");
+ sessWndField = SessionWindowOperatorImpl.class.getDeclaredField("windowSpec");
+ wndStoreField.setAccessible(true);
+ sessWndField.setAccessible(true);
+ }
+
+ @Test
+ public void testConstructor() throws IllegalAccessException, NoSuchFieldException {
+ // test constructing a SessionWindowOperatorImpl w/ expected mock functions
+ WindowOperatorSpec<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperatorSpec.class);
+ SessionWindowOperatorImpl<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowOperatorImpl<>(wndOp);
+ assertEquals(wndOp, sessWndField.get(sessWnd));
+ }
+
+ @Test
+ public void testInitAndProcess() throws IllegalAccessException {
+ WindowOperatorSpec<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperatorSpec.class);
+ BiFunction<TestMessageEnvelope, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
+ SessionWindowOperatorImpl<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowOperatorImpl<>(wndOp);
+
+ // construct and init the SessionWindowOperatorImpl object
+ MessageStreamImpl<TestMessageEnvelope> mockInputStrm = mock(MessageStreamImpl.class);
+ StoreFunctions<TestMessageEnvelope, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class);
+ Function<TestMessageEnvelope, String> wndKeyFn = m -> "test-msg-key";
+ when(mockStoreFns.getStoreKeyFn()).thenReturn(wndKeyFn);
+ when(wndOp.getStoreFns()).thenReturn(mockStoreFns);
+ when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
+ when(wndOp.getTransformFn()).thenReturn(mockTxfmFn);
+ TaskContext mockContext = mock(TaskContext.class);
+ KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class);
+ when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
+ sessWnd.init(mockInputStrm, mockContext);
+
+ // test onNext() method. Make sure the transformation function and the state update functions are invoked.
+ TestMessageEnvelope mockMsg = mock(TestMessageEnvelope.class);
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+ BiFunction<TestMessageEnvelope, WindowState<Integer>, WindowState<Integer>> stateUpdaterFn = mock(BiFunction.class);
+ when(mockStoreFns.getStateUpdaterFn()).thenReturn(stateUpdaterFn);
+ WindowState<Integer> mockNewState = mock(WindowState.class);
+ WindowState<Integer> oldState = mock(WindowState.class);
+ when(mockKvStore.get("test-msg-key")).thenReturn(oldState);
+ when(stateUpdaterFn.apply(mockMsg, oldState)).thenReturn(mockNewState);
+ sessWnd.onNext(mockMsg, mockCollector, mockCoordinator);
+ verify(mockTxfmFn, times(1)).apply(argThat(new ArgumentMatcher<TestMessageEnvelope>() {
+ @Override public boolean matches(Object argument) {
+ TestMessageEnvelope xIn = (TestMessageEnvelope) argument;
+ return xIn.equals(mockMsg);
+ }
+ }), argThat(new ArgumentMatcher<Entry<String, WindowState<Integer>>>() {
+ @Override public boolean matches(Object argument) {
+ Entry<String, WindowState<Integer>> xIn = (Entry<String, WindowState<Integer>>) argument;
+ return xIn.getKey().equals("test-msg-key") && xIn.getValue().equals(oldState);
+ }
+ }));
+ verify(stateUpdaterFn, times(1)).apply(mockMsg, oldState);
+ verify(mockKvStore, times(1)).put("test-msg-key", mockNewState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
deleted file mode 100644
index de029ea..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.TestOutputMessage;
-import org.apache.samza.operators.internal.Operators.StreamOperator;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.Function;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestSimpleOperatorImpl {
-
- @Test public void testSimpleOperator() {
- StreamOperator<TestMessage, TestOutputMessage> mockOp = mock(StreamOperator.class);
- Function<TestMessage, Collection<TestOutputMessage>> txfmFn = mock(Function.class);
- when(mockOp.getFunction()).thenReturn(txfmFn);
-
- SimpleOperatorImpl<TestMessage, TestOutputMessage> opImpl = spy(new SimpleOperatorImpl<>(mockOp));
- TestMessage inMsg = mock(TestMessage.class);
- TestOutputMessage outMsg = mock(TestOutputMessage.class);
- Collection<TestOutputMessage> mockOutputs = new ArrayList() { {
- this.add(outMsg);
- } };
- when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
- MessageCollector mockCollector = mock(MessageCollector.class);
- TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- opImpl.onNext(inMsg, mockCollector, mockCoordinator);
- verify(txfmFn, times(1)).apply(inMsg);
- verify(opImpl, times(1)).nextProcessors(outMsg, mockCollector, mockCoordinator);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
index cdac3fc..ba5b6f8 100644
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -18,25 +18,28 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TestOutputMessage;
-import org.apache.samza.operators.internal.Operators.SinkOperator;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.junit.Test;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestSinkOperatorImpl {
- @Test public void testSinkOperator() {
- SinkOperator<TestOutputMessage> sinkOp = mock(SinkOperator.class);
- MessageStream.VoidFunction3<TestOutputMessage, MessageCollector, TaskCoordinator> sinkFn = mock(
- MessageStream.VoidFunction3.class);
- when(sinkOp.getFunction()).thenReturn(sinkFn);
- SinkOperatorImpl<TestOutputMessage> sinkImpl = new SinkOperatorImpl<>(sinkOp);
- TestOutputMessage mockMsg = mock(TestOutputMessage.class);
+ @Test
+ public void testSinkOperator() {
+ SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
+ SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
+ when(sinkOp.getSinkFn()).thenReturn(sinkFn);
+ SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp);
+ TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
MessageCollector mockCollector = mock(MessageCollector.class);
TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
deleted file mode 100644
index 5ede757..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStateStoreImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.internal.Operators.StoreFunctions;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.TaskContext;
-import org.junit.Test;
-
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-
-
-public class TestStateStoreImpl {
- @Test public void testStateStoreImpl() {
- StoreFunctions<TestMessage, String, WindowState> mockStoreFunctions = mock(StoreFunctions.class);
- // test constructor
- StateStoreImpl<TestMessage, String, WindowState> storeImpl = new StateStoreImpl<>(mockStoreFunctions, "myStoreName");
- TaskContext mockContext = mock(TaskContext.class);
- KeyValueStore<String, WindowState> mockKvStore = mock(KeyValueStore.class);
- when(mockContext.getStore("myStoreName")).thenReturn(mockKvStore);
- // test init()
- storeImpl.init(mockContext);
- verify(mockContext, times(1)).getStore("myStoreName");
- Function<TestMessage, String> wndKeyFn = mock(Function.class);
- when(mockStoreFunctions.getStoreKeyFinder()).thenReturn(wndKeyFn);
- TestMessage mockMsg = mock(TestMessage.class);
- when(wndKeyFn.apply(mockMsg)).thenReturn("myKey");
- WindowState mockState = mock(WindowState.class);
- when(mockKvStore.get("myKey")).thenReturn(mockState);
- // test getState()
- Entry<String, WindowState> storeEntry = storeImpl.getState(mockMsg);
- assertEquals(storeEntry.getKey(), "myKey");
- assertEquals(storeEntry.getValue(), mockState);
- verify(wndKeyFn, times(1)).apply(mockMsg);
- verify(mockKvStore, times(1)).get("myKey");
- Entry<String, WindowState> oldEntry = new Entry<>("myKey", mockState);
- WindowState mockNewState = mock(WindowState.class);
- BiFunction<TestMessage, WindowState, WindowState> mockUpdaterFn = mock(BiFunction.class);
- when(mockStoreFunctions.getStateUpdater()).thenReturn(mockUpdaterFn);
- when(mockUpdaterFn.apply(mockMsg, mockState)).thenReturn(mockNewState);
- // test updateState()
- Entry<String, WindowState> newEntry = storeImpl.updateState(mockMsg, oldEntry);
- assertEquals(newEntry.getKey(), "myKey");
- assertEquals(newEntry.getValue(), mockNewState);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
new file mode 100644
index 0000000..5a3840c
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+public class TestStreamOperatorImpl {
+
+ @Test
+ public void testSimpleOperator() {
+ StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
+ FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
+ when(mockOp.getTransformFn()).thenReturn(txfmFn);
+
+ StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp));
+ TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
+ TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
+ Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
+ this.add(outMsg);
+ } };
+ when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+ opImpl.onNext(inMsg, mockCollector, mockCoordinator);
+ verify(txfmFn, times(1)).apply(inMsg);
+ verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
deleted file mode 100644
index 719ab99..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl.window;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.TestMessage;
-import org.apache.samza.operators.WindowState;
-import org.apache.samza.operators.internal.Operators.StoreFunctions;
-import org.apache.samza.operators.internal.Operators.WindowOperator;
-import org.apache.samza.operators.internal.WindowOutput;
-import org.apache.samza.storage.kv.Entry;
-import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-
-import java.lang.reflect.Field;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.*;
-
-
-public class TestSessionWindowImpl {
- Field wndStoreField = null;
- Field sessWndField = null;
-
- @Before public void prep() throws NoSuchFieldException {
- wndStoreField = SessionWindowImpl.class.getDeclaredField("wndStore");
- sessWndField = SessionWindowImpl.class.getDeclaredField("sessWnd");
- wndStoreField.setAccessible(true);
- sessWndField.setAccessible(true);
- }
-
- @Test public void testConstructor() throws IllegalAccessException, NoSuchFieldException {
- // test constructing a SessionWindowImpl w/ expected mock functions
- WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
- SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
- assertEquals(wndOp, sessWndField.get(sessWnd));
- }
-
- @Test public void testInitAndProcess() throws IllegalAccessException {
- WindowOperator<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> wndOp = mock(WindowOperator.class);
- BiFunction<TestMessage, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> mockTxfmFn = mock(BiFunction.class);
- SessionWindowImpl<TestMessage, String, WindowState<Integer>, WindowOutput<String, Integer>> sessWnd = new SessionWindowImpl<>(wndOp);
-
- // construct and init the SessionWindowImpl object
- MessageStream<TestMessage> mockInputStrm = mock(MessageStream.class);
- StoreFunctions<TestMessage, String, WindowState<Integer>> mockStoreFns = mock(StoreFunctions.class);
- Function<TestMessage, String> wndKeyFn = m -> "test-msg-key";
- when(mockStoreFns.getStoreKeyFinder()).thenReturn(wndKeyFn);
- when(wndOp.getStoreFunctions()).thenReturn(mockStoreFns);
- when(wndOp.getStoreName(mockInputStrm)).thenReturn("test-wnd-store");
- when(wndOp.getFunction()).thenReturn(mockTxfmFn);
- TaskContext mockContext = mock(TaskContext.class);
- KeyValueStore<String, WindowState<Integer>> mockKvStore = mock(KeyValueStore.class);
- when(mockContext.getStore("test-wnd-store")).thenReturn(mockKvStore);
- sessWnd.init(mockInputStrm, mockContext);
-
- // test onNext() method. Make sure the transformation function and the state update functions are invoked.
- TestMessage mockMsg = mock(TestMessage.class);
- MessageCollector mockCollector = mock(MessageCollector.class);
- TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- BiFunction<TestMessage, WindowState<Integer>, WindowState<Integer>> stateUpdaterFn = mock(BiFunction.class);
- when(mockStoreFns.getStateUpdater()).thenReturn(stateUpdaterFn);
- WindowState<Integer> mockNewState = mock(WindowState.class);
- WindowState<Integer> oldState = mock(WindowState.class);
- when(mockKvStore.get("test-msg-key")).thenReturn(oldState);
- when(stateUpdaterFn.apply(mockMsg, oldState)).thenReturn(mockNewState);
- sessWnd.onNext(mockMsg, mockCollector, mockCoordinator);
- verify(mockTxfmFn, times(1)).apply(argThat(new ArgumentMatcher<TestMessage>() {
- @Override public boolean matches(Object argument) {
- TestMessage xIn = (TestMessage) argument;
- return xIn.equals(mockMsg);
- }
- }), argThat(new ArgumentMatcher<Entry<String, WindowState<Integer>>>() {
- @Override public boolean matches(Object argument) {
- Entry<String, WindowState<Integer>> xIn = (Entry<String, WindowState<Integer>>) argument;
- return xIn.getKey().equals("test-msg-key") && xIn.getValue().equals(oldState);
- }
- }));
- verify(stateUpdaterFn, times(1)).apply(mockMsg, oldState);
- verify(mockKvStore, times(1)).put("test-msg-key", mockNewState);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
new file mode 100644
index 0000000..028bd67
--- /dev/null
+++ b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.windows.StoreFunctions;
+import org.apache.samza.operators.windows.Trigger;
+import org.apache.samza.operators.windows.WindowFn;
+import org.apache.samza.operators.windows.WindowOutput;
+import org.apache.samza.operators.windows.WindowState;
+import org.apache.samza.storage.kv.Entry;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperatorSpecs {
+ @Test
+ public void testGetStreamOperator() {
+ FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
+ this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+ } };
+ StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperator(transformFn);
+ assertEquals(strmOp.getTransformFn(), transformFn);
+ assertTrue(strmOp.getOutputStream() instanceof MessageStreamImpl);
+ }
+
+ @Test
+ public void testGetSinkOperator() {
+ SinkFunction<TestMessageEnvelope> sinkFn = (m, c, t) -> { };
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperator(sinkFn);
+ assertEquals(sinkOp.getSinkFn(), sinkFn);
+ assertTrue(sinkOp.getOutputStream() == null);
+ }
+
+ @Test
+ public void testGetWindowOperator() {
+ WindowFn<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> windowFn = mock(WindowFn.class);
+ BiFunction<TestMessageEnvelope, Entry<String, WindowState<Integer>>, WindowOutput<String, Integer>> xFunction = (m, e) -> null;
+ StoreFunctions<TestMessageEnvelope, String, WindowState<Integer>> storeFns = mock(StoreFunctions.class);
+ Trigger<TestMessageEnvelope, WindowState<Integer>> trigger = mock(Trigger.class);
+ MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
+ when(windowFn.getTransformFn()).thenReturn(xFunction);
+ when(windowFn.getStoreFns()).thenReturn(storeFns);
+ when(windowFn.getTrigger()).thenReturn(trigger);
+ when(mockInput.toString()).thenReturn("mockStream1");
+
+ WindowOperatorSpec<TestMessageEnvelope, String, WindowState<Integer>, WindowOutput<String, Integer>> windowOp = OperatorSpecs
+ .createWindowOperator(windowFn);
+ assertEquals(windowOp.getTransformFn(), xFunction);
+ assertEquals(windowOp.getStoreFns(), storeFns);
+ assertEquals(windowOp.getTrigger(), trigger);
+ assertEquals(windowOp.getStoreName(mockInput), String.format("input-mockStream1-wndop-%s", windowOp.toString()));
+ }
+
+ @Test
+ public void testGetPartialJoinOperator() {
+ BiFunction<MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
+ (m1, m2) -> new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
+ MessageStreamImpl<TestMessageEnvelope> joinOutput = new MessageStreamImpl<>();
+ PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin =
+ OperatorSpecs.createPartialJoinOperator(merger, joinOutput);
+
+ assertEquals(partialJoin.getOutputStream(), joinOutput);
+ MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class);
+ MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class);
+ assertEquals(partialJoin.getTransformFn(), merger);
+ assertEquals(partialJoin.getSelfStoreFns().getStoreKeyFn().apply(m), m.getKey());
+ assertEquals(partialJoin.getSelfStoreFns().getStateUpdaterFn().apply(m, s), m);
+ assertEquals(partialJoin.getJoinStoreFns().getStoreKeyFn().apply(m), m.getKey());
+ assertNull(partialJoin.getJoinStoreFns().getStateUpdaterFn());
+ }
+
+ @Test
+ public void testGetMergeOperator() {
+ MessageStreamImpl<TestMessageEnvelope> output = new MessageStreamImpl<>();
+ StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperator(output);
+ Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
+ this.add(t);
+ } };
+ TestMessageEnvelope t = mock(TestMessageEnvelope.class);
+ assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
+ assertEquals(mergeOp.getOutputStream(), output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java b/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
deleted file mode 100644
index d1b0a88..0000000
--- a/samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.MessageStreams.SystemMessageStream;
-import org.apache.samza.operators.TriggerBuilder;
-import org.apache.samza.operators.Windows;
-import org.apache.samza.operators.data.IncomingSystemMessage;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.operators.task.StreamOperatorTask;
-import org.apache.samza.system.SystemStreamPartition;
-
-import java.util.Collection;
-
-
-/**
- * Example implementation of split stream tasks
- *
- */
-public class BroadcastOperatorTask implements StreamOperatorTask {
- class MessageType {
- String field1;
- String field2;
- String field3;
- String field4;
- String parKey;
- private long timestamp;
-
- public long getTimestamp() {
- return this.timestamp;
- }
- }
-
- class JsonMessage extends InputJsonSystemMessage<MessageType> {
-
- JsonMessage(String key, MessageType data, Offset offset, long timestamp, SystemStreamPartition partition) {
- super(key, data, offset, timestamp, partition);
- }
- }
-
- @Override public void initOperators(Collection<SystemMessageStream> sources) {
- sources.forEach(source -> {
- MessageStream<JsonMessage> inputStream = source.map(this::getInputMessage);
-
- inputStream.filter(this::myFilter1).
- window(Windows.<JsonMessage, String>intoSessionCounter(
- m -> String.format("%s-%s", m.getMessage().field1, m.getMessage().field2)).
- setTriggers(TriggerBuilder.<JsonMessage, Integer>earlyTriggerWhenExceedWndLen(100).
- addLateTriggerOnSizeLimit(10).
- addTimeoutSinceLastMessage(30000)));
-
- inputStream.filter(this::myFilter2).
- window(Windows.<JsonMessage, String>intoSessions(
- m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4)).
- setTriggers(TriggerBuilder.<JsonMessage, Collection<JsonMessage>>earlyTriggerWhenExceedWndLen(100).
- addTimeoutSinceLastMessage(30000)));
-
- inputStream.filter(this::myFilter3).
- window(Windows.<JsonMessage, String, MessageType>intoSessions(
- m -> String.format("%s-%s", m.getMessage().field3, m.getMessage().field4), m -> m.getMessage()).
- setTriggers(TriggerBuilder.<JsonMessage, Collection<MessageType>>earlyTriggerOnEventTime(m -> m.getTimestamp(), 30000).
- addTimeoutSinceFirstMessage(60000)));
- }
- );
- }
-
- JsonMessage getInputMessage(IncomingSystemMessage m1) {
- return (JsonMessage) m1.getMessage();
- }
-
- boolean myFilter1(JsonMessage m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key1");
- }
-
- boolean myFilter2(JsonMessage m1) {
- // Do user defined processing here
- return m1.getMessage().parKey.equals("key2");
- }
-
- boolean myFilter3(JsonMessage m1) {
- return m1.getMessage().parKey.equals("key3");
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/00543804/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java b/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
deleted file mode 100644
index 88aa159..0000000
--- a/samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.task;
-
-import org.apache.samza.operators.data.InputSystemMessage;
-import org.apache.samza.operators.data.Message;
-import org.apache.samza.operators.data.Offset;
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example input message w/ Json message body and string as the key.
- */
-
-public class InputJsonSystemMessage<T> implements Message<String, T>, InputSystemMessage<Offset> {
-
- private final String key;
- private final T data;
- private final Offset offset;
- private final long timestamp;
- private final SystemStreamPartition partition;
-
- InputJsonSystemMessage(String key, T data, Offset offset, long timestamp, SystemStreamPartition partition) {
- this.key = key;
- this.data = data;
- this.offset = offset;
- this.timestamp = timestamp;
- this.partition = partition;
- }
-
- @Override public T getMessage() {
- return this.data;
- }
-
- @Override public String getKey() {
- return this.key;
- }
-
- @Override public long getTimestamp() {
- return this.timestamp;
- }
-
- @Override public Offset getOffset() {
- return this.offset;
- }
-
- @Override public SystemStreamPartition getSystemStreamPartition() {
- return this.partition;
- }
-}
-