You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/02/23 19:27:50 UTC
[04/13] samza git commit: SAMZA-1073: moving all operator classes
into samza-core
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
new file mode 100644
index 0000000..02637a3
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestMessageStreamImplUtil;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.JoinFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.WindowOperatorSpec;
+import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.task.TaskContext;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class TestOperatorImpls {
+ Field nextOperatorsField = null;
+ Method createOpMethod = null;
+ Method createOpsMethod = null;
+
+ @Before
+ public void prep() throws NoSuchFieldException, NoSuchMethodException {
+ nextOperatorsField = OperatorImpl.class.getDeclaredField("nextOperators");
+ nextOperatorsField.setAccessible(true);
+
+ createOpMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpl", MessageStreamImpl.class,
+ OperatorSpec.class, Config.class, TaskContext.class);
+ createOpMethod.setAccessible(true);
+
+ createOpsMethod = OperatorGraph.class.getDeclaredMethod("createOperatorImpls", MessageStreamImpl.class, Config.class, TaskContext.class);
+ createOpsMethod.setAccessible(true);
+ }
+
+ @Test
+ public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException {
+ // get window operator
+ WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class);
+ WindowInternal<TestMessageEnvelope, String, Integer> windowInternal = new WindowInternal<>(null, null, null, null);
+ when(mockWnd.getWindow()).thenReturn(windowInternal);
+ MessageStreamImpl<TestMessageEnvelope> mockStream = mock(MessageStreamImpl.class);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+
+ OperatorGraph opGraph = new OperatorGraph();
+ OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope> opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>)
+ createOpMethod.invoke(opGraph, mockStream, mockWnd, mockConfig, mockContext);
+ assertTrue(opImpl instanceof WindowOperatorImpl);
+ Field wndInternalField = WindowOperatorImpl.class.getDeclaredField("window");
+ wndInternalField.setAccessible(true);
+ WindowInternal wndInternal = (WindowInternal) wndInternalField.get(opImpl);
+ assertEquals(wndInternal, windowInternal);
+
+ // get simple operator
+ StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockSimpleOp = mock(StreamOperatorSpec.class);
+ FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> mockTxfmFn = mock(FlatMapFunction.class);
+ when(mockSimpleOp.getTransformFn()).thenReturn(mockTxfmFn);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, mockSimpleOp, mockConfig, mockContext);
+ assertTrue(opImpl instanceof StreamOperatorImpl);
+ Field txfmFnField = StreamOperatorImpl.class.getDeclaredField("transformFn");
+ txfmFnField.setAccessible(true);
+ assertEquals(mockTxfmFn, txfmFnField.get(opImpl));
+
+ // get sink operator
+ SinkFunction<TestMessageEnvelope> sinkFn = (m, mc, tc) -> { };
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
+ when(sinkOp.getSinkFn()).thenReturn(sinkFn);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, sinkOp, mockConfig, mockContext);
+ assertTrue(opImpl instanceof SinkOperatorImpl);
+ Field sinkFnField = SinkOperatorImpl.class.getDeclaredField("sinkFn");
+ sinkFnField.setAccessible(true);
+ assertEquals(sinkFn, sinkFnField.get(opImpl));
+
+ // get join operator
+ PartialJoinOperatorSpec<TestMessageEnvelope, String, TestMessageEnvelope, TestOutputMessageEnvelope> joinOp = mock(PartialJoinOperatorSpec.class);
+ TestOutputMessageEnvelope mockOutput = mock(TestOutputMessageEnvelope.class);
+ PartialJoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope> joinFn = mock(PartialJoinFunction.class);
+ when(joinOp.getTransformFn()).thenReturn(joinFn);
+ opImpl = (OperatorImpl<TestMessageEnvelope, ? extends MessageEnvelope>) createOpMethod.invoke(opGraph, mockStream, joinOp, mockConfig, mockContext);
+ assertTrue(opImpl instanceof PartialJoinOperatorImpl);
+ }
+
+ @Test
+ public void testEmptyChain() throws InvocationTargetException, IllegalAccessException {
+ // test creation of empty chain
+ MessageStreamImpl<TestMessageEnvelope> testStream = mock(MessageStreamImpl.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ Config mockConfig = mock(Config.class);
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testStream, mockConfig, mockContext);
+ assertTrue(operatorChain != null);
+ }
+
+ @Test
+ public void testLinearChain() throws IllegalAccessException, InvocationTargetException {
+ // test creation of linear chain
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+ TaskContext mockContext = mock(TaskContext.class);
+ Config mockConfig = mock(Config.class);
+ testInput.map(m -> m).window(Windows.keyedSessionWindow(TestMessageEnvelope::getKey, Duration.ofMinutes(10)));
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> firstOpImpl = subsSet.iterator().next();
+ Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(firstOpImpl);
+ assertEquals(subsOps.size(), 1);
+ OperatorImpl wndOpImpl = subsOps.iterator().next();
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(wndOpImpl);
+ assertEquals(subsOps.size(), 0);
+ }
+
+ @Test
+ public void testBroadcastChain() throws IllegalAccessException, InvocationTargetException {
+ // test creation of broadcast chain
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> testInput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+ TaskContext mockContext = mock(TaskContext.class);
+ Config mockConfig = mock(Config.class);
+ testInput.filter(m -> m.getMessage().getEventTime() > 123456L).flatMap(m -> new ArrayList() { { this.add(m); this.add(m); } });
+ testInput.filter(m -> m.getMessage().getEventTime() < 123456L).map(m -> m);
+ OperatorGraph opGraph = new OperatorGraph();
+ RootOperatorImpl operatorChain = (RootOperatorImpl) createOpsMethod.invoke(opGraph, testInput, mockConfig, mockContext);
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(operatorChain);
+ assertEquals(subsSet.size(), 2);
+ Iterator<OperatorImpl> iter = subsSet.iterator();
+ // check the first branch w/ flatMap
+ OperatorImpl<TestMessageEnvelope, TestMessageEnvelope> opImpl = iter.next();
+ Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
+ assertEquals(subsOps.size(), 1);
+ OperatorImpl flatMapImpl = subsOps.iterator().next();
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(flatMapImpl);
+ assertEquals(subsOps.size(), 0);
+ // check the second branch w/ map
+ opImpl = iter.next();
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(opImpl);
+ assertEquals(subsOps.size(), 1);
+ OperatorImpl mapImpl = subsOps.iterator().next();
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(mapImpl);
+ assertEquals(subsOps.size(), 0);
+ }
+
+ @Test
+ public void testJoinChain() throws IllegalAccessException, InvocationTargetException {
+ // test creation of join chain
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> input1 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
+ MessageStreamImpl<TestMessageEnvelope> input2 = TestMessageStreamImplUtil.getMessageStreamImpl(mockGraph);
+ TaskContext mockContext = mock(TaskContext.class);
+ Config mockConfig = mock(Config.class);
+ input1
+ .join(input2,
+ new JoinFunction<String, TestMessageEnvelope, TestMessageEnvelope, TestOutputMessageEnvelope>() {
+ @Override
+ public TestOutputMessageEnvelope apply(TestMessageEnvelope m1, TestMessageEnvelope m2) {
+ return new TestOutputMessageEnvelope(m1.getKey(), m1.getMessage().getValue().length() + m2.getMessage().getValue().length());
+ }
+
+ @Override
+ public String getFirstKey(TestMessageEnvelope message) {
+ return message.getKey();
+ }
+
+ @Override
+ public String getSecondKey(TestMessageEnvelope message) {
+ return message.getKey();
+ }
+ })
+ .map(m -> m);
+ OperatorGraph opGraph = new OperatorGraph();
+ // now, we create chained operators from each input sources
+ RootOperatorImpl chain1 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input1, mockConfig, mockContext);
+ RootOperatorImpl chain2 = (RootOperatorImpl) createOpsMethod.invoke(opGraph, input2, mockConfig, mockContext);
+ // check that those two chains will merge at map operator
+ // first branch of the join
+ Set<OperatorImpl> subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain1);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp1 = subsSet.iterator().next();
+ Set<OperatorImpl> subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp1);
+ assertEquals(subsOps.size(), 1);
+ // the map operator consumes the common join output, where two branches merge
+ OperatorImpl mapImpl = subsOps.iterator().next();
+ // second branch of the join
+ subsSet = (Set<OperatorImpl>) nextOperatorsField.get(chain2);
+ assertEquals(subsSet.size(), 1);
+ OperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> joinOp2 = subsSet.iterator().next();
+ assertNotSame(joinOp1, joinOp2);
+ subsOps = (Set<OperatorImpl>) nextOperatorsField.get(joinOp2);
+ assertEquals(subsOps.size(), 1);
+ // make sure that the map operator is the same
+ assertEquals(mapImpl, subsOps.iterator().next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
new file mode 100644
index 0000000..ce9fdd2
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestSinkOperatorImpl {
+
+ @Test
+ public void testSinkOperator() {
+ SinkOperatorSpec<TestOutputMessageEnvelope> sinkOp = mock(SinkOperatorSpec.class);
+ SinkFunction<TestOutputMessageEnvelope> sinkFn = mock(SinkFunction.class);
+ when(sinkOp.getSinkFn()).thenReturn(sinkFn);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ SinkOperatorImpl<TestOutputMessageEnvelope> sinkImpl = new SinkOperatorImpl<>(sinkOp, mockConfig, mockContext);
+ TestOutputMessageEnvelope mockMsg = mock(TestOutputMessageEnvelope.class);
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+
+ sinkImpl.onNext(mockMsg, mockCollector, mockCoordinator);
+ verify(sinkFn, times(1)).apply(mockMsg, mockCollector, mockCoordinator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
new file mode 100644
index 0000000..010a210
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestOutputMessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+public class TestStreamOperatorImpl {
+
+ @Test
+ public void testSimpleOperator() {
+ StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
+ FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
+ when(mockOp.getTransformFn()).thenReturn(txfmFn);
+ MessageStreamImpl<TestMessageEnvelope> mockInput = mock(MessageStreamImpl.class);
+ Config mockConfig = mock(Config.class);
+ TaskContext mockContext = mock(TaskContext.class);
+ StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl = spy(new StreamOperatorImpl<>(mockOp, mockInput, mockConfig, mockContext));
+ TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
+ TestOutputMessageEnvelope outMsg = mock(TestOutputMessageEnvelope.class);
+ Collection<TestOutputMessageEnvelope> mockOutputs = new ArrayList() { {
+ this.add(outMsg);
+ } };
+ when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
+ MessageCollector mockCollector = mock(MessageCollector.class);
+ TaskCoordinator mockCoordinator = mock(TaskCoordinator.class);
+ opImpl.onNext(inMsg, mockCollector, mockCoordinator);
+ verify(txfmFn, times(1)).apply(inMsg);
+ verify(opImpl, times(1)).propagateResult(outMsg, mockCollector, mockCoordinator);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
new file mode 100644
index 0000000..31257a4
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.MessageStreamImpl;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.TestMessageEnvelope;
+import org.apache.samza.operators.TestMessageStreamImplUtil;
+import org.apache.samza.operators.data.MessageEnvelope;
+import org.apache.samza.operators.functions.FlatMapFunction;
+import org.apache.samza.operators.functions.PartialJoinFunction;
+import org.apache.samza.operators.functions.SinkFunction;
+import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskCoordinator;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+
+public class TestOperatorSpecs {
+ @Test
+ public void testGetStreamOperator() {
+ FlatMapFunction<MessageEnvelope, TestMessageEnvelope> transformFn = m -> new ArrayList<TestMessageEnvelope>() { {
+ this.add(new TestMessageEnvelope(m.getKey().toString(), m.getMessage().toString(), 12345L));
+ } };
+ MessageStreamImpl<TestMessageEnvelope> mockOutput = mock(MessageStreamImpl.class);
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ StreamOperatorSpec<MessageEnvelope, TestMessageEnvelope> strmOp = OperatorSpecs.createStreamOperatorSpec(transformFn, mockGraph, mockOutput);
+ assertEquals(strmOp.getTransformFn(), transformFn);
+ assertEquals(strmOp.getNextStream(), mockOutput);
+ }
+
+ @Test
+ public void testGetSinkOperator() {
+ SinkFunction<TestMessageEnvelope> sinkFn = (TestMessageEnvelope message, MessageCollector messageCollector,
+ TaskCoordinator taskCoordinator) -> { };
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ SinkOperatorSpec<TestMessageEnvelope> sinkOp = OperatorSpecs.createSinkOperatorSpec(sinkFn, mockGraph);
+ assertEquals(sinkOp.getSinkFn(), sinkFn);
+ assertTrue(sinkOp.getNextStream() == null);
+ }
+
+ @Test
+ public void testGetWindowOperator() throws Exception {
+ Function<TestMessageEnvelope, String> keyExtractor = m -> "globalkey";
+ BiFunction<TestMessageEnvelope, Integer, Integer> aggregator = (m, c) -> c + 1;
+
+ //instantiate a window using reflection
+ WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null);
+
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<WindowPane<String, Integer>> mockWndOut = mock(MessageStreamImpl.class);
+ WindowOperatorSpec spec = OperatorSpecs.<TestMessageEnvelope, String, Integer>createWindowOperatorSpec(window, mockGraph, mockWndOut);
+ assertEquals(spec.getWindow(), window);
+ assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor);
+ assertEquals(spec.getWindow().getFoldFunction(), aggregator);
+ }
+
+ @Test
+ public void testGetPartialJoinOperator() {
+ PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope> merger =
+ new PartialJoinFunction<Object, MessageEnvelope<Object, ?>, MessageEnvelope<Object, ?>, TestMessageEnvelope>() {
+ @Override
+ public TestMessageEnvelope apply(MessageEnvelope<Object, ?> m1, MessageEnvelope<Object, ?> m2) {
+ return new TestMessageEnvelope(m1.getKey().toString(), m2.getMessage().toString(), System.nanoTime());
+ }
+
+ @Override
+ public Object getKey(MessageEnvelope<Object, ?> message) {
+ return message.getKey();
+ }
+
+ @Override
+ public Object getOtherKey(MessageEnvelope<Object, ?> message) {
+ return message.getKey();
+ }
+ };
+
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> joinOutput = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+ PartialJoinOperatorSpec<MessageEnvelope<Object, ?>, Object, MessageEnvelope<Object, ?>, TestMessageEnvelope> partialJoin =
+ OperatorSpecs.createPartialJoinOperatorSpec(merger, mockGraph, joinOutput);
+
+ assertEquals(partialJoin.getNextStream(), joinOutput);
+ MessageEnvelope<Object, Object> m = mock(MessageEnvelope.class);
+ MessageEnvelope<Object, Object> s = mock(MessageEnvelope.class);
+ assertEquals(partialJoin.getTransformFn(), merger);
+ }
+
+ @Test
+ public void testGetMergeOperator() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ MessageStreamImpl<TestMessageEnvelope> output = TestMessageStreamImplUtil.<TestMessageEnvelope>getMessageStreamImpl(mockGraph);
+ StreamOperatorSpec<TestMessageEnvelope, TestMessageEnvelope> mergeOp = OperatorSpecs.createMergeOperatorSpec(mockGraph, output);
+ Function<TestMessageEnvelope, Collection<TestMessageEnvelope>> mergeFn = t -> new ArrayList<TestMessageEnvelope>() { {
+ this.add(t);
+ } };
+ TestMessageEnvelope t = mock(TestMessageEnvelope.class);
+ assertEquals(mergeOp.getTransformFn().apply(t), mergeFn.apply(t));
+ assertEquals(mergeOp.getNextStream(), output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/README.md
----------------------------------------------------------------------
diff --git a/samza-operator/README.md b/samza-operator/README.md
deleted file mode 100644
index 15d2092..0000000
--- a/samza-operator/README.md
+++ /dev/null
@@ -1,17 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-samza-operator is an experimental module that is under development (SAMZA-552).
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
deleted file mode 100644
index 830e4a5..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.function.Function;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.functions.FilterFunction;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.JoinFunction;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.OperatorSpecs;
-import org.apache.samza.operators.windows.Window;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * The implementation for input/output {@link MessageStream}s to/from the operators.
- * Users use the {@link MessageStream} API methods to describe and chain the operators specs.
- *
- * @param <M> type of messages in this {@link MessageStream}
- */
-public class MessageStreamImpl<M> implements MessageStream<M> {
- /**
- * The {@link StreamGraphImpl} object that contains this {@link MessageStreamImpl}
- */
- private final StreamGraphImpl graph;
-
- /**
- * The set of operators that consume the messages in this {@link MessageStream}
- */
- private final Set<OperatorSpec> registeredOperatorSpecs = new HashSet<>();
-
- /**
- * Default constructor
- *
- * @param graph the {@link StreamGraphImpl} object that this stream belongs to
- */
- MessageStreamImpl(StreamGraphImpl graph) {
- this.graph = graph;
- }
-
- @Override public <TM> MessageStream<TM> map(MapFunction<M, TM> mapFn) {
- OperatorSpec<TM> op = OperatorSpecs.<M, TM>createMapOperatorSpec(mapFn, this.graph, new MessageStreamImpl<>(this.graph));
- this.registeredOperatorSpecs.add(op);
- return op.getNextStream();
- }
-
- @Override public MessageStream<M> filter(FilterFunction<M> filterFn) {
- OperatorSpec<M> op = OperatorSpecs.<M>createFilterOperatorSpec(filterFn, this.graph, new MessageStreamImpl<>(this.graph));
- this.registeredOperatorSpecs.add(op);
- return op.getNextStream();
- }
-
- @Override
- public <TM> MessageStream<TM> flatMap(FlatMapFunction<M, TM> flatMapFn) {
- OperatorSpec<TM> op = OperatorSpecs.createStreamOperatorSpec(flatMapFn, this.graph, new MessageStreamImpl<>(this.graph));
- this.registeredOperatorSpecs.add(op);
- return op.getNextStream();
- }
-
- @Override
- public void sink(SinkFunction<M> sinkFn) {
- this.registeredOperatorSpecs.add(OperatorSpecs.createSinkOperatorSpec(sinkFn, this.graph));
- }
-
- @Override public void sendTo(OutputStream<M> stream) {
- this.registeredOperatorSpecs.add(OperatorSpecs.createSendToOperatorSpec(stream.getSinkFunction(), this.graph, stream));
- }
-
- @Override
- public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window) {
- OperatorSpec<WindowPane<K, WV>> wndOp = OperatorSpecs.createWindowOperatorSpec((WindowInternal<M, K, WV>) window,
- this.graph, new MessageStreamImpl<>(this.graph));
- this.registeredOperatorSpecs.add(wndOp);
- return wndOp.getNextStream();
- }
-
- @Override public <K, OM, RM> MessageStream<RM> join(MessageStream<OM> otherStream, JoinFunction<K, M, OM, RM> joinFn) {
- MessageStreamImpl<RM> outputStream = new MessageStreamImpl<>(this.graph);
-
- PartialJoinFunction<K, M, OM, RM> parJoin1 = new PartialJoinFunction<K, M, OM, RM>() {
- @Override
- public RM apply(M m1, OM om) {
- return joinFn.apply(m1, om);
- }
-
- @Override
- public K getKey(M message) {
- return joinFn.getFirstKey(message);
- }
-
- @Override
- public K getOtherKey(OM message) {
- return joinFn.getSecondKey(message);
- }
-
- @Override
- public void init(Config config, TaskContext context) {
- joinFn.init(config, context);
- }
- };
-
- PartialJoinFunction<K, OM, M, RM> parJoin2 = new PartialJoinFunction<K, OM, M, RM>() {
- @Override
- public RM apply(OM m1, M m) {
- return joinFn.apply(m, m1);
- }
-
- @Override
- public K getKey(OM message) {
- return joinFn.getSecondKey(message);
- }
-
- @Override
- public K getOtherKey(M message) {
- return joinFn.getFirstKey(message);
- }
- };
-
- // TODO: need to add default store functions for the two partial join functions
-
- ((MessageStreamImpl<OM>) otherStream).registeredOperatorSpecs.add(
- OperatorSpecs.<OM, K, M, RM>createPartialJoinOperatorSpec(parJoin2, this.graph, outputStream));
- this.registeredOperatorSpecs.add(OperatorSpecs.<M, K, OM, RM>createPartialJoinOperatorSpec(parJoin1, this.graph, outputStream));
- return outputStream;
- }
-
- @Override
- public MessageStream<M> merge(Collection<MessageStream<M>> otherStreams) {
- MessageStreamImpl<M> outputStream = new MessageStreamImpl<>(this.graph);
-
- otherStreams.add(this);
- otherStreams.forEach(other -> ((MessageStreamImpl<M>) other).registeredOperatorSpecs.
- add(OperatorSpecs.createMergeOperatorSpec(this.graph, outputStream)));
- return outputStream;
- }
-
- @Override
- public <K> MessageStream<M> partitionBy(Function<M, K> parKeyExtractor) {
- MessageStreamImpl<M> intStream = this.graph.createIntStream(parKeyExtractor);
- OutputStream<M> outputStream = this.graph.getOutputStream(intStream);
- this.registeredOperatorSpecs.add(OperatorSpecs.createPartitionOperatorSpec(outputStream.getSinkFunction(),
- this.graph, outputStream));
- return intStream;
- }
- /**
- * Gets the operator specs registered to consume the output of this {@link MessageStream}. This is an internal API and
- * should not be exposed to users.
- *
- * @return a collection containing all {@link OperatorSpec}s that are registered with this {@link MessageStream}.
- */
- public Collection<OperatorSpec> getRegisteredOperatorSpecs() {
- return Collections.unmodifiableSet(this.registeredOperatorSpecs);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
deleted file mode 100644
index dca3469..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators;
-
-import java.util.Properties;
-import java.util.function.Function;
-import org.apache.samza.operators.data.MessageEnvelope;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.serializers.Serde;
-import org.apache.samza.system.OutgoingMessageEnvelope;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The implementation of {@link StreamGraph} interface. This class provides implementation of methods to allow users to
- * create system input/output/intermediate streams.
- */
-public class StreamGraphImpl implements StreamGraph {
-
- /**
- * Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} added to transform the {@link MessageEnvelope}
- * in the input {@link MessageStream}s.
- */
- private int opId = 0;
-
- private class InputStreamImpl<K, V, M extends MessageEnvelope<K, V>> extends MessageStreamImpl<M> {
- final StreamSpec spec;
- final Serde<K> keySerde;
- final Serde<V> msgSerde;
-
- InputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- super(graph);
- this.spec = streamSpec;
- this.keySerde = keySerde;
- this.msgSerde = msgSerde;
- }
-
- StreamSpec getSpec() {
- return this.spec;
- }
-
- }
-
- private class OutputStreamImpl<K, V, M extends MessageEnvelope<K, V>> implements OutputStream<M> {
- final StreamSpec spec;
- final Serde<K> keySerde;
- final Serde<V> msgSerde;
-
- OutputStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- this.spec = streamSpec;
- this.keySerde = keySerde;
- this.msgSerde = msgSerde;
- }
-
- StreamSpec getSpec() {
- return this.spec;
- }
-
- @Override
- public SinkFunction<M> getSinkFunction() {
- return (M message, MessageCollector mc, TaskCoordinator tc) -> {
- // TODO: need to find a way to directly pass in the serde class names
- // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
- // message.getKey(), message.getKey(), message.getMessage()));
- mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
- };
- }
- }
-
- private class IntermediateStreamImpl<PK, K, V, M extends MessageEnvelope<K, V>> extends InputStreamImpl<K, V, M> implements OutputStream<M> {
- final Function<M, PK> parKeyFn;
-
- /**
- * Default constructor
- *
- * @param graph the {@link StreamGraphImpl} object that this stream belongs to
- */
- IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- this(graph, streamSpec, keySerde, msgSerde, null);
- }
-
- IntermediateStreamImpl(StreamGraphImpl graph, StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde, Function<M, PK> parKeyFn) {
- super(graph, streamSpec, keySerde, msgSerde);
- this.parKeyFn = parKeyFn;
- }
-
- @Override
- public SinkFunction<M> getSinkFunction() {
- return (M message, MessageCollector mc, TaskCoordinator tc) -> {
- // TODO: need to find a way to directly pass in the serde class names
- // mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.keySerde.getClass().getName(), this.msgSerde.getClass().getName(),
- // message.getKey(), message.getKey(), message.getMessage()));
- if (this.parKeyFn == null) {
- mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), message.getKey(), message.getMessage()));
- } else {
- // apply partition key function
- mc.send(new OutgoingMessageEnvelope(this.spec.getSystemStream(), this.parKeyFn.apply(message), message.getKey(), message.getMessage()));
- }
- };
- }
- }
-
- /**
- * Maps keeping all {@link SystemStream}s that are input and output of operators in {@link StreamGraphImpl}
- */
- private final Map<SystemStream, MessageStream> inStreams = new HashMap<>();
- private final Map<SystemStream, OutputStream> outStreams = new HashMap<>();
-
- private ContextManager contextManager = new ContextManager() { };
-
- @Override
- public <K, V, M extends MessageEnvelope<K, V>> MessageStream<M> createInStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
- this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new InputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
- }
- return this.inStreams.get(streamSpec.getSystemStream());
- }
-
- /**
- * Helper method to be used by {@link MessageStreamImpl} class
- *
- * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as the output
- * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream}
- * @return the {@link MessageStreamImpl} object
- */
- @Override
- public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createOutStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
- this.outStreams.putIfAbsent(streamSpec.getSystemStream(), new OutputStreamImpl<K, V, M>(this, streamSpec, keySerde, msgSerde));
- }
- return this.outStreams.get(streamSpec.getSystemStream());
- }
-
- /**
- * Helper method to be used by {@link MessageStreamImpl} class
- *
- * @param streamSpec the {@link StreamSpec} object defining the {@link SystemStream} as an intermediate {@link SystemStream}
- * @param <M> the type of {@link MessageEnvelope}s in the output {@link SystemStream}
- * @return the {@link MessageStreamImpl} object
- */
- @Override
- public <K, V, M extends MessageEnvelope<K, V>> OutputStream<M> createIntStream(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> msgSerde) {
- if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
- this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl<K, K, V, M>(this, streamSpec, keySerde, msgSerde));
- }
- IntermediateStreamImpl<K, K, V, M> intStream = (IntermediateStreamImpl<K, K, V, M>) this.inStreams.get(streamSpec.getSystemStream());
- if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
- this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
- }
- return intStream;
- }
-
- @Override public Map<StreamSpec, MessageStream> getInStreams() {
- Map<StreamSpec, MessageStream> inStreamMap = new HashMap<>();
- this.inStreams.forEach((ss, entry) -> inStreamMap.put(((InputStreamImpl) entry).getSpec(), entry));
- return Collections.unmodifiableMap(inStreamMap);
- }
-
- @Override public Map<StreamSpec, OutputStream> getOutStreams() {
- Map<StreamSpec, OutputStream> outStreamMap = new HashMap<>();
- this.outStreams.forEach((ss, entry) -> outStreamMap.put(((OutputStreamImpl) entry).getSpec(), entry));
- return Collections.unmodifiableMap(outStreamMap);
- }
-
- @Override
- public StreamGraph withContextManager(ContextManager manager) {
- this.contextManager = manager;
- return this;
- }
-
- public int getNextOpId() {
- return this.opId++;
- }
-
- public ContextManager getContextManager() {
- return this.contextManager;
- }
-
- /**
- * Helper method to be get the input stream via {@link SystemStream}
- *
- * @param systemStream the {@link SystemStream}
- * @return a {@link MessageStreamImpl} object corresponding to the {@code systemStream}
- */
- public MessageStreamImpl getInputStream(SystemStream systemStream) {
- if (this.inStreams.containsKey(systemStream)) {
- return (MessageStreamImpl) this.inStreams.get(systemStream);
- }
- return null;
- }
-
- <M> OutputStream<M> getOutputStream(MessageStreamImpl<M> intStream) {
- if (this.outStreams.containsValue(intStream)) {
- return (OutputStream<M>) intStream;
- }
- return null;
- }
-
- <M> MessageStream<M> getIntStream(OutputStream<M> outStream) {
- if (this.inStreams.containsValue(outStream)) {
- return (MessageStream<M>) outStream;
- }
- return null;
- }
-
- /**
- * Method to create intermediate topics for {@link MessageStreamImpl#partitionBy(Function)} method.
- *
- * @param parKeyFn the function to extract the partition key from the input message
- * @param <PK> the type of partition key
- * @param <M> the type of input message
- * @return the {@link OutputStream} object for the re-partitioned stream
- */
- <PK, M> MessageStreamImpl<M> createIntStream(Function<M, PK> parKeyFn) {
- // TODO: placeholder to auto-generate intermediate streams via {@link StreamSpec}
- StreamSpec streamSpec = new StreamSpec() {
- @Override
- public SystemStream getSystemStream() {
- // TODO: should auto-generate intermedaite stream name here
- return new SystemStream("intermediate", String.format("par-%d", StreamGraphImpl.this.opId));
- }
-
- @Override
- public Properties getProperties() {
- return null;
- }
- };
-
- if (!this.inStreams.containsKey(streamSpec.getSystemStream())) {
- this.inStreams.putIfAbsent(streamSpec.getSystemStream(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
- }
- IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getSystemStream());
- if (!this.outStreams.containsKey(streamSpec.getSystemStream())) {
- this.outStreams.putIfAbsent(streamSpec.getSystemStream(), intStream);
- }
- return intStream;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java b/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
deleted file mode 100644
index 809a70a..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/functions/PartialJoinFunction.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.functions;
-
-import org.apache.samza.annotation.InterfaceStability;
-
-
-/**
- * This defines the interface function a two-way join functions that takes input messages from two input
- * {@link org.apache.samza.operators.MessageStream}s and merge them into a single output joined message in the join output
- */
-@InterfaceStability.Unstable
-public interface PartialJoinFunction<K, M, OM, RM> extends InitableFunction {
-
- /**
- * Method to perform join method on the two input messages
- *
- * @param m1 message from the first input stream
- * @param om message from the second input stream
- * @return the joined message in the output stream
- */
- RM apply(M m1, OM om);
-
- /**
- * Method to get the key from the input message
- *
- * @param message the input message from the first strean
- * @return the join key in the {@code message}
- */
- K getKey(M message);
-
- /**
- * Method to get the key from the input message in the other stream
- *
- * @param message the input message from the other stream
- * @return the join key in the {@code message}
- */
- K getOtherKey(OM message);
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
deleted file mode 100644
index 66336f8..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.system.SystemStream;
-import org.apache.samza.task.TaskContext;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-
-/**
- * Instantiates the DAG of {@link OperatorImpl}s corresponding to the {@link OperatorSpec}s for a
- * {@link MessageStreamImpl}
- */
-public class OperatorGraph {
-
- /**
- * A {@link Map} from {@link OperatorSpec} to {@link OperatorImpl}. This map registers all {@link OperatorImpl} in the DAG
- * of {@link OperatorImpl} in a {@link org.apache.samza.container.TaskInstance}. Each {@link OperatorImpl} is created
- * according to a single instance of {@link OperatorSpec}.
- */
- private final Map<OperatorSpec, OperatorImpl> operators = new HashMap<>();
-
- /**
- * This {@link Map} describes the DAG of {@link OperatorImpl} that are chained together to process the input messages.
- */
- private final Map<SystemStream, RootOperatorImpl> operatorGraph = new HashMap<>();
-
- /**
- * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}.
- * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and
- * instantiate the corresponding {@link OperatorImpl} chains that take the {@link org.apache.samza.operators.MessageStream} as input.
- *
- * @param inputStreams the map of input {@link org.apache.samza.operators.MessageStream}s
- * @param config the {@link Config} required to instantiate operators
- * @param context the {@link TaskContext} required to instantiate operators
- */
- public void init(Map<SystemStream, MessageStreamImpl> inputStreams, Config config, TaskContext context) {
- inputStreams.forEach((ss, mstream) -> this.operatorGraph.put(ss, this.createOperatorImpls(mstream, config, context)));
- }
-
- /**
- * Method to get the corresponding {@link RootOperatorImpl}
- *
- * @param ss input {@link SystemStream}
- * @param <M> the type of input message
- * @return the {@link OperatorImpl} that starts processing the input message
- */
- public <M> OperatorImpl<M, M> get(SystemStream ss) {
- return this.operatorGraph.get(ss);
- }
-
- /**
- * Traverses the DAG of {@link OperatorSpec}s starting from the provided {@link MessageStreamImpl},
- * creates the corresponding DAG of {@link OperatorImpl}s, and returns its root {@link RootOperatorImpl} node.
- *
- * @param source the input {@link MessageStreamImpl} to instantiate {@link OperatorImpl}s for
- * @param <M> the type of messagess in the {@code source} {@link MessageStreamImpl}
- * @param config the {@link Config} required to instantiate operators
- * @param context the {@link TaskContext} required to instantiate operators
- * @return root node for the {@link OperatorImpl} DAG
- */
- private <M> RootOperatorImpl<M> createOperatorImpls(MessageStreamImpl<M> source, Config config,
- TaskContext context) {
- // since the source message stream might have multiple operator specs registered on it,
- // create a new root node as a single point of entry for the DAG.
- RootOperatorImpl<M> rootOperator = new RootOperatorImpl<>();
- // create the pipeline/topology starting from the source
- source.getRegisteredOperatorSpecs().forEach(registeredOperator -> {
- // pass in the source and context s.t. stateful stream operators can initialize their stores
- OperatorImpl<M, ?> operatorImpl =
- this.createAndRegisterOperatorImpl(registeredOperator, source, config, context);
- rootOperator.registerNextOperator(operatorImpl);
- });
- return rootOperator;
- }
-
- /**
- * Helper method to recursively traverse the {@link OperatorSpec} DAG and instantiate and link the corresponding
- * {@link OperatorImpl}s.
- *
- * @param operatorSpec the operatorSpec registered with the {@code source}
- * @param source the source {@link MessageStreamImpl}
- * @param <M> type of input message
- * @param config the {@link Config} required to instantiate operators
- * @param context the {@link TaskContext} required to instantiate operators
- * @return the operator implementation for the operatorSpec
- */
- private <M> OperatorImpl<M, ?> createAndRegisterOperatorImpl(OperatorSpec operatorSpec,
- MessageStreamImpl<M> source, Config config, TaskContext context) {
- if (!operators.containsKey(operatorSpec)) {
- OperatorImpl<M, ?> operatorImpl = createOperatorImpl(source, operatorSpec, config, context);
- if (operators.putIfAbsent(operatorSpec, operatorImpl) == null) {
- // this is the first time we've added the operatorImpl corresponding to the operatorSpec,
- // so traverse and initialize and register the rest of the DAG.
- // initialize the corresponding operator function
- operatorSpec.init(config, context);
- MessageStreamImpl nextStream = operatorSpec.getNextStream();
- if (nextStream != null) {
- Collection<OperatorSpec> registeredSpecs = nextStream.getRegisteredOperatorSpecs();
- registeredSpecs.forEach(registeredSpec -> {
- OperatorImpl subImpl = this.createAndRegisterOperatorImpl(registeredSpec, nextStream, config, context);
- operatorImpl.registerNextOperator(subImpl);
- });
- }
- return operatorImpl;
- }
- }
-
- // the implementation corresponding to operatorSpec has already been instantiated
- // and registered, so we do not need to traverse the DAG further.
- return operators.get(operatorSpec);
- }
-
- /**
- * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}.
- *
- * @param source the source {@link MessageStreamImpl}
- * @param <M> type of input message
- * @param operatorSpec the immutable {@link OperatorSpec} definition.
- * @param config the {@link Config} required to instantiate operators
- * @param context the {@link TaskContext} required to instantiate operators
- * @return the {@link OperatorImpl} implementation instance
- */
- private static <M> OperatorImpl<M, ?> createOperatorImpl(MessageStreamImpl<M> source, OperatorSpec operatorSpec, Config config, TaskContext context) {
- if (operatorSpec instanceof StreamOperatorSpec) {
- StreamOperatorSpec<M, ?> streamOpSpec = (StreamOperatorSpec<M, ?>) operatorSpec;
- return new StreamOperatorImpl<>(streamOpSpec, source, config, context);
- } else if (operatorSpec instanceof SinkOperatorSpec) {
- return new SinkOperatorImpl<>((SinkOperatorSpec<M>) operatorSpec, config, context);
- } else if (operatorSpec instanceof WindowOperatorSpec) {
- return new WindowOperatorImpl<>((WindowOperatorSpec<M, ?, ?>) operatorSpec, source, config, context);
- } else if (operatorSpec instanceof PartialJoinOperatorSpec) {
- return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context);
- }
- throw new IllegalArgumentException(
- String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName()));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
deleted file mode 100644
index abb1fa9..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-import java.util.HashSet;
-import java.util.Set;
-
-
-/**
- * Abstract base class for all stream operator implementations.
- */
-public abstract class OperatorImpl<M, RM> {
-
- private final Set<OperatorImpl<RM, ?>> nextOperators = new HashSet<>();
-
- /**
- * Register the next operator in the chain that this operator should propagate its output to.
- * @param nextOperator the next operator in the chain.
- */
- void registerNextOperator(OperatorImpl<RM, ?> nextOperator) {
- nextOperators.add(nextOperator);
- }
-
- /**
- * Perform the transformation required for this operator and call the downstream operators.
- *
- * Must call {@link #propagateResult} to propage the output to registered downstream operators correctly.
- *
- * @param message the input message
- * @param collector the {@link MessageCollector} in the context
- * @param coordinator the {@link TaskCoordinator} in the context
- */
- public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator);
-
- /**
- * Helper method to propagate the output of this operator to all registered downstream operators.
- *
- * This method <b>must</b> be called from {@link #onNext} to propagate the operator output correctly.
- *
- * @param outputMessage output message
- * @param collector the {@link MessageCollector} in the context
- * @param coordinator the {@link TaskCoordinator} in the context
- */
- void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) {
- nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
deleted file mode 100644
index c8515e1..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.PartialJoinOperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Implementation of a {@link PartialJoinOperatorSpec}. This class implements function
- * that only takes in one input stream among all inputs to the join and generate the join output.
- *
- * @param <M> type of messages in the input stream
- * @param <JM> type of messages in the stream to join with
- * @param <RM> type of messages in the joined stream
- */
-class PartialJoinOperatorImpl<M, K, JM, RM> extends OperatorImpl<M, RM> {
-
- PartialJoinOperatorImpl(PartialJoinOperatorSpec<M, K, JM, RM> joinOp, MessageStreamImpl<M> source, Config config, TaskContext context) {
- // TODO: implement PartialJoinOperatorImpl constructor
- }
-
- @Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- // TODO: implement PartialJoinOperatorImpl processing logic
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
deleted file mode 100644
index 4b30a5d..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/RootOperatorImpl.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * A no-op operator implementation that forwards incoming messages to all of its subscribers.
- * @param <M> type of incoming messages
- */
-final class RootOperatorImpl<M> extends OperatorImpl<M, M> {
-
- @Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- this.propagateResult(message, collector, coordinator);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
deleted file mode 100644
index 2bb362c..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SessionWindowOperatorImpl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Default implementation class of a {@link WindowOperatorSpec} for a session window.
- *
- * @param <M> the type of input message
- * @param <RK> the type of window key
- * @param <WV> the type of window state
- */
-class SessionWindowOperatorImpl<M, RK, WV> extends OperatorImpl<M, WindowPane<RK, WV>> {
-
- private final WindowOperatorSpec<M, RK, WV> windowSpec;
-
- SessionWindowOperatorImpl(WindowOperatorSpec<M, RK, WV> windowSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
- this.windowSpec = windowSpec;
- }
-
- @Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- }
-
- public void onTimer(MessageCollector collector, TaskCoordinator coordinator) {
- // This is to periodically check the timeout triggers to get the list of window states to be updated
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
deleted file mode 100644
index 41d1778..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.spec.SinkOperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * Implementation for {@link SinkOperatorSpec}
- */
-class SinkOperatorImpl<M> extends OperatorImpl<M, M> {
-
- private final SinkFunction<M> sinkFn;
-
- SinkOperatorImpl(SinkOperatorSpec<M> sinkOp, Config config, TaskContext context) {
- this.sinkFn = sinkOp.getSinkFn();
- }
-
- @Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- this.sinkFn.apply(message, collector, coordinator);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
deleted file mode 100644
index 644de20..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-
-/**
- * A StreamOperator that accepts a 1:n transform function and applies it to each incoming message.
- *
- * @param <M> type of message in the input stream
- * @param <RM> type of message in the output stream
- */
-class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> {
-
- private final FlatMapFunction<M, RM> transformFn;
-
- StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOperatorSpec, MessageStreamImpl<M> source, Config config, TaskContext context) {
- this.transformFn = streamOperatorSpec.getTransformFn();
- }
-
- @Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
- // call the transform function and then for each output call propagateResult()
- this.transformFn.apply(message).forEach(r -> this.propagateResult(r, collector, coordinator));
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
deleted file mode 100644
index af00553..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.impl;
-
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.spec.WindowOperatorSpec;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-import org.apache.samza.task.MessageCollector;
-import org.apache.samza.task.TaskContext;
-import org.apache.samza.task.TaskCoordinator;
-
-public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK, WV>> {
-
- private final WindowInternal<M, WK, WV> window;
-
- public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl<M> source, Config config, TaskContext context) {
- // source, config, and context are used to initialize the window kv-store
- window = spec.getWindow();
- }
-
- @Override
- public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
deleted file mode 100644
index 1444662..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.operators.spec;
-
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * A stateless serializable stream operator specification that holds all the information required
- * to transform the input {@link MessageStreamImpl} and produce the output {@link MessageStreamImpl}.
- *
- * @param <OM> the type of output message from the operator
- */
-@InterfaceStability.Unstable
-public interface OperatorSpec<OM> {
-
- enum OpCode {
- MAP,
- FLAT_MAP,
- FILTER,
- SINK,
- SEND_TO,
- JOIN,
- WINDOW,
- MERGE,
- PARTITION_BY
- }
-
-
- /**
- * Get the output stream containing transformed messages produced by this operator.
- * @return the output stream containing transformed messages produced by this operator.
- */
- MessageStreamImpl<OM> getNextStream();
-
- /**
- * Init method to initialize the context for this {@link OperatorSpec}. The default implementation is NO-OP.
- *
- * @param config the {@link Config} object for this task
- * @param context the {@link TaskContext} object for this task
- */
- default void init(Config config, TaskContext context) { }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/8515448a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
deleted file mode 100644
index d626852..0000000
--- a/samza-operator/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.operators.spec;
-
-import java.util.Collection;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.MessageStreamImpl;
-import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
-import org.apache.samza.operators.functions.FilterFunction;
-import org.apache.samza.operators.functions.FlatMapFunction;
-import org.apache.samza.operators.functions.MapFunction;
-import org.apache.samza.operators.functions.PartialJoinFunction;
-import org.apache.samza.operators.functions.SinkFunction;
-import org.apache.samza.operators.windows.WindowPane;
-import org.apache.samza.operators.windows.internal.WindowInternal;
-
-import java.util.ArrayList;
-import org.apache.samza.task.TaskContext;
-
-
-/**
- * Factory methods for creating {@link OperatorSpec} instances.
- */
-public class OperatorSpecs {
-
- private OperatorSpecs() {}
-
- /**
- * Creates a {@link StreamOperatorSpec} for {@link MapFunction}
- *
- * @param mapFn the map function
- * @param graph the {@link StreamGraphImpl} object
- * @param output the output {@link MessageStreamImpl} object
- * @param <M> type of input message
- * @param <OM> type of output message
- * @return the {@link StreamOperatorSpec}
- */
- public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec(MapFunction<M, OM> mapFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
- return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() {
- @Override
- public Collection<OM> apply(M message) {
- return new ArrayList<OM>() {
- {
- OM r = mapFn.apply(message);
- if (r != null) {
- this.add(r);
- }
- }
- };
- }
-
- @Override
- public void init(Config config, TaskContext context) {
- mapFn.init(config, context);
- }
- }, output, OperatorSpec.OpCode.MAP, graph.getNextOpId());
- }
-
- /**
- * Creates a {@link StreamOperatorSpec} for {@link FilterFunction}
- *
- * @param filterFn the transformation function
- * @param graph the {@link StreamGraphImpl} object
- * @param output the output {@link MessageStreamImpl} object
- * @param <M> type of input message
- * @return the {@link StreamOperatorSpec}
- */
- public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec(FilterFunction<M> filterFn, StreamGraphImpl graph, MessageStreamImpl<M> output) {
- return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() {
- @Override
- public Collection<M> apply(M message) {
- return new ArrayList<M>() {
- {
- if (filterFn.apply(message)) {
- this.add(message);
- }
- }
- };
- }
-
- @Override
- public void init(Config config, TaskContext context) {
- filterFn.init(config, context);
- }
- }, output, OperatorSpec.OpCode.FILTER, graph.getNextOpId());
- }
-
- /**
- * Creates a {@link StreamOperatorSpec}.
- *
- * @param transformFn the transformation function
- * @param graph the {@link StreamGraphImpl} object
- * @param output the output {@link MessageStreamImpl} object
- * @param <M> type of input message
- * @param <OM> type of output message
- * @return the {@link StreamOperatorSpec}
- */
- public static <M, OM> StreamOperatorSpec<M, OM> createStreamOperatorSpec(
- FlatMapFunction<M, OM> transformFn, StreamGraphImpl graph, MessageStreamImpl<OM> output) {
- return new StreamOperatorSpec<>(transformFn, output, OperatorSpec.OpCode.FLAT_MAP, graph.getNextOpId());
- }
-
- /**
- * Creates a {@link SinkOperatorSpec}.
- *
- * @param sinkFn the sink function
- * @param <M> type of input message
- * @param graph the {@link StreamGraphImpl} object
- * @return the {@link SinkOperatorSpec}
- */
- public static <M> SinkOperatorSpec<M> createSinkOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph) {
- return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SINK, graph.getNextOpId());
- }
-
- /**
- * Creates a {@link SinkOperatorSpec}.
- *
- * @param sinkFn the sink function
- * @param graph the {@link StreamGraphImpl} object
- * @param stream the {@link OutputStream} where the message is sent to
- * @param <M> type of input message
- * @return the {@link SinkOperatorSpec}
- */
- public static <M> SinkOperatorSpec<M> createSendToOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
- return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.SEND_TO, graph.getNextOpId(), stream);
- }
-
- /**
- * Creates a {@link SinkOperatorSpec}.
- *
- * @param sinkFn the sink function
- * @param graph the {@link StreamGraphImpl} object
- * @param stream the {@link OutputStream} where the message is sent to
- * @param <M> type of input message
- * @return the {@link SinkOperatorSpec}
- */
- public static <M> SinkOperatorSpec<M> createPartitionOperatorSpec(SinkFunction<M> sinkFn, StreamGraphImpl graph, OutputStream<M> stream) {
- return new SinkOperatorSpec<>(sinkFn, OperatorSpec.OpCode.PARTITION_BY, graph.getNextOpId(), stream);
- }
-
- /**
- * Creates a {@link WindowOperatorSpec}.
- *
- * @param window the description of the window.
- * @param graph the {@link StreamGraphImpl} object
- * @param wndOutput the window output {@link MessageStreamImpl} object
- * @param <M> the type of input message
- * @param <WK> the type of key in the {@link WindowPane}
- * @param <WV> the type of value in the window
- * @return the {@link WindowOperatorSpec}
- */
-
- public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec(
- WindowInternal<M, WK, WV> window, StreamGraphImpl graph, MessageStreamImpl<WindowPane<WK, WV>> wndOutput) {
- return new WindowOperatorSpec<>(window, wndOutput, graph.getNextOpId());
- }
-
- /**
- * Creates a {@link PartialJoinOperatorSpec}.
- *
- * @param partialJoinFn the join function
- * @param graph the {@link StreamGraphImpl} object
- * @param joinOutput the output {@link MessageStreamImpl}
- * @param <M> type of input message
- * @param <K> type of join key
- * @param <JM> the type of message in the other join stream
- * @param <OM> the type of message in the join output
- * @return the {@link PartialJoinOperatorSpec}
- */
- public static <M, K, JM, OM> PartialJoinOperatorSpec<M, K, JM, OM> createPartialJoinOperatorSpec(
- PartialJoinFunction<K, M, JM, OM> partialJoinFn, StreamGraphImpl graph, MessageStreamImpl<OM> joinOutput) {
- return new PartialJoinOperatorSpec<>(partialJoinFn, joinOutput, graph.getNextOpId());
- }
-
- /**
- * Creates a {@link StreamOperatorSpec} with a merger function.
- *
- * @param graph the {@link StreamGraphImpl} object
- * @param mergeOutput the output {@link MessageStreamImpl} from the merger
- * @param <M> the type of input message
- * @return the {@link StreamOperatorSpec} for the merge
- */
- public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(StreamGraphImpl graph, MessageStreamImpl<M> mergeOutput) {
- return new StreamOperatorSpec<M, M>(message ->
- new ArrayList<M>() {
- {
- this.add(message);
- }
- },
- mergeOutput, OperatorSpec.OpCode.MERGE, graph.getNextOpId());
- }
-}