You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/02/23 19:27:48 UTC
[02/13] samza git commit: SAMZA-1073: moving all operator classes
into samza-core
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java b/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
deleted file mode 100644
index c4e9f51..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/TestMessageStreamImplUtil.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-
-public class TestMessageStreamImplUtil {
- public static <M> MessageStreamImpl<M> getMessageStreamImpl(StreamGraphImpl graph) {
- return new MessageStreamImpl<M>(graph);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java b/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
deleted file mode 100644
index 9a425d1..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/data/JsonIncomingSystemMessageEnvelope.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.data;
-
-import org.apache.samza.system.SystemStreamPartition;
-
-
-/**
- * Example input {@link MessageEnvelope} w/ Json message and string as the key.
- */
-
-public class JsonIncomingSystemMessageEnvelope<T> implements MessageEnvelope<String, T> {
-
- private final String key;
- private final T data;
- private final Offset offset;
- private final SystemStreamPartition partition;
-
- public JsonIncomingSystemMessageEnvelope(String key, T data, Offset offset, SystemStreamPartition partition) {
- this.key = key;
- this.data = data;
- this.offset = offset;
- this.partition = partition;
- }
-
- @Override
- public T getMessage() {
- return this.data;
- }
-
- @Override
- public String getKey() {
- return this.key;
- }
-
- public Offset getOffset() {
- return this.offset;
- }
-
- public SystemStreamPartition getSystemStreamPartition() {
- return this.partition;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
deleted file mode 100644
index 361972e..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.hamcrest.core.IsEqual;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.argThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-
-public class TestOperatorImpl {
-
- TestMessageEnvelope curInputMsg;
- MessageCollector curCollector;
- TaskCoordinator curCoordinator;
-
- @Test
- public void testSubscribers() {
- this.curInputMsg = null;
- this.curCollector = null;
- this.curCoordinator = null;
- OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = new OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope>() {
- @Override
- public void onNext(TestMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
- TestOperatorImpl.this.curInputMsg = message;
- TestOperatorImpl.this.curCollector = collector;
- TestOperatorImpl.this.curCoordinator = coordinator;
- }
- };
- // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext()
- OperatorImpl mockSub = mock(OperatorImpl.class);
- opImpl.registerNextOperator(mockSub);
- TestOutputMessageEnvelope xOutput = mock(TestOutputMessageEnvelope.class);
- MessageCollector mockCollector = mock(MessageCollector.class);
- TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- opImpl.propagateResult(xOutput, mockCollector, mockCoordinator);
- verify(mockSub, times(1)).onNext(
- argThat(new IsEqual<>(xOutput)),
- argThat(new IsEqual<>(mockCollector)),
- argThat(new IsEqual<>(mockCoordinator))
- );
- // verify onNext() is invoked correctly
- TestMessageEnvelope mockInput = mock(TestMessageEnvelope.class);
- opImpl.onNext(mockInput, mockCollector, mockCoordinator);
- assertEquals(mockInput, this.curInputMsg);
- assertEquals(mockCollector, this.curCollector);
- assertEquals(mockCoordinator, this.curCoordinator);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
deleted file mode 100644
index 02637a3..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.windows.Windows;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.task.TaskContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.lang.reflect.Field;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-
-public class TestOperatorImpls {
- Field nextOperatorsField = null;
- Method createOpMethod = null;
- Method createOpsMethod = null;
-
- @Before
- public void prep() throws NoSuchFieldException, NoSuchMethodException {
- nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
- nextOperatorsField.setAccessible(true);
-
- createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
- OperatorSpec.class, Config.class, TaskContext.class);
- createOpMethod.setAccessible(true);
-
- createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
- createOpsMethod.setAccessible(true);
- }
-
- @Test
- public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
- // get window operator
- WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
- WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null);
- when(mockWnd.getWindow()).thenReturn(windowInternal);
- MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
-
- OperatorGraph opGraph = new OperatorGraph();
- OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
- createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
- assertTrue(opImpl instanceof WindowOperatorImpl);
- Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
- wndInternalField.setAccessible(true);
- WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
- assertEquals(wndInternal, windowInternal);
-
- // get simple operator
- StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
- when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof StreamOperatorImpl);
- Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
- txfmFnField.setAccessible(true);
- assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
-
- // get sink operator
- SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
- when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof SinkOperatorImpl);
- Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
- sinkFnField.setAccessible(true);
- assertEquals(sinkFn, sinkFnField.get(opImpl));
-
- // get join operator
- PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
- TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
- PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
- when(joinOp.getTransformFn()).thenReturn(joinFn);
- opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
- assertTrue(opImpl instanceof PartialJoinOperatorImpl);
- }
-
- @Test
- public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
- // test creation of empty chain
- MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
- TaskContext mockContext = mock(TaskContext.class);
- Config mockConfig = mock(Config.class);
- OperatorGraph opGraph = new OperatorGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
- assertTrue(operatorChain != null);
- }
-
- @Test
- public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of linear chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- Config mockConfig = mock(Config.class);
- testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
- OperatorGraph opGraph = new OperatorGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl wndOpImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
- assertEquals(subsOps.size(), 0);
- }
-
- @Test
- public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of broadcast chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- Config mockConfig = mock(Config.class);
- testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
- testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
- OperatorGraph opGraph = new OperatorGraph();
- RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
- assertEquals(subsSet.size(), 2);
- Iterator<OperatorImpl> iter = subsSet.iterator();
- // check the first branch w/ flatMap
- OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl flatMapImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
- assertEquals(subsOps.size(), 0);
- // check the second branch w/ map
- opImpl = iter.next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
- assertEquals(subsOps.size(), 1);
- OperatorImpl mapImpl = subsOps.iterator().next();
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
- assertEquals(subsOps.size(), 0);
- }
-
- @Test
- public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
- // test creation of join chain
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
- TaskContext mockContext = mock(TaskContext.class);
- Config mockConfig = mock(Config.class);
- input1
- .join(input2,
- new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
- @Override
- public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
- return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
- }
-
- @Override
- public String getFirstKey(TestMessageEnvelope message) {
- return message.getKey();
- }
-
- @Override
- public String getSecondKey(TestMessageEnvelope message) {
- return message.getKey();
- }
- })
- .map(m -> m);
- OperatorGraph opGraph = new OperatorGraph();
- // now, we create chained operators from each input sources
- RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
- RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
- // check that those two chains will merge at map operator
- // first branch of the join
- Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
- Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
- assertEquals(subsOps.size(), 1);
- // the map operator consumes the common join output, where two branches merge
- OperatorImpl mapImpl = subsOps.iterator().next();
- // second branch of the join
- subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
- assertEquals(subsSet.size(), 1);
- OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
- assertNotSame(joinOp1, joinOp2);
- subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
- assertEquals(subsOps.size(), 1);
- // make sure that the map operator is the same
- assertEquals(mapImpl, subsOps.iterator().next());
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
deleted file mode 100644
index ce9fdd2..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestSinkOperatorImpl {
-
- @Test
- public void testSinkOperator() {
- SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
- SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
- when(sinkOp.getSinkFn()).thenReturn(sinkFn);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext);
- TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
- MessageCollector mockCollector = mock(MessageCollector.class);
- TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
-
- sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator);
- verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
deleted file mode 100644
index 010a210..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestOutputMessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import static org.mockito.Mockito.*;
-
-
-public class TestStreamOperatorImpl {
-
- @Test
- public void testSimpleOperator() {
- StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
- FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
- when(mockOp.getTransformFn()).thenReturn(txfmFn);
- MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
- Config mockConfig = mock(Config.class);
- TaskContext mockContext = mock(TaskContext.class);
- StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext));
- TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
- TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
- Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
- this.add(outMsg);
- } };
- when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
- MessageCollector mockCollector = mock(MessageCollector.class);
- TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
- opImpl.onNext(inMsg, mockCollector, mockCoordinator);
- verify(txfmFn, times(1)).apply(inMsg);
- verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
deleted file mode 100644
index 31257a4..0000000
--- a/samza-operator/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.TestMessageEnvelope;
-import org.apache.samza.operators.TestMessageStreamImplUtil;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-
-public class TestOperatorSpecs {
- @Test
- public void testGetStreamOperator() {
- FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
- this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
- } };
- MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
- assertEquals(strmOp.getTransformFn(), transformFn);
- assertEquals(strmOp.getNextStream(), mockOutput);
- }
-
- @Test
- public void testGetSinkOperator() {
- SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
- TaskCoordinator taskCoordinator) -> { };
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
- assertEquals(sinkOp.getSinkFn(), sinkFn);
- assertTrue(sinkOp.getNextStream() == null);
- }
-
- @Test
- public void testGetWindowOperator() throws Exception {
- Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
- BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c + 1;
-
- //instantiate a window using reflection
- WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null);
-
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
- WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
- assertEquals(spec.getWindow(), window);
- assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
- assertEquals(spec.getWindow().getFoldFunction(), aggregator);
- }
-
- @Test
- public void testGetPartialJoinOperator() {
- PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
- new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() {
- @Override
- public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) {
- return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
- }
-
- @Override
- public Object getKey(MessageEnvelope<Object, ?> message) {
- return message.getKey();
- }
-
- @Override
- public Object getOtherKey(MessageEnvelope<Object, ?> message) {
- return message.getKey();
- }
- };
-
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
- PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin =
- OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput);
-
- assertEquals(partialJoin.getNextStream(), joinOutput);
- MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class);
- MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class);
- assertEquals(partialJoin.getTransformFn(), merger);
- }
-
- @Test
- public void testGetMergeOperator() {
- StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
- MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
- StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
- Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
- this.add(t);
- } };
- TestMessageEnvelope t = mock(TestMessageEnvelope.class);
- assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
- assertEquals(mergeOp.getNextStream(), output);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 813882c..5de30d8 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -20,7 +20,6 @@ include \
'samza-api',
'samza-elasticsearch',
'samza-log4j',
- 'samza-operator',
'samza-rest',
'samza-shell'