You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/05/25 16:36:49 UTC
[06/10] samza git commit: SAMZA-1659: Serializable OperatorSpec
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
new file mode 100644
index 0000000..e476abc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphSpec.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for THE
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+import com.google.common.collect.ImmutableList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStreamGraphSpec {
+
+ @Test
+ public void testGetInputStreamWithValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ Serde mockValueSerde = mock(Serde.class);
+ MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1", mockValueSerde);
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1", mockKVSerde);
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetInputStreamWithNullSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ graphSpec.getInputStream("test-stream-1", null);
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ Serde mockValueSerde = mock(Serde.class);
+ graphSpec.setDefaultSerde(mockValueSerde);
+ MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graphSpec.setDefaultSerde(mockKVSerde);
+ MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultDefaultSerde() {
+ // default default serde == user hasn't provided a default serde
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
+ }
+
+ @Test
+ public void testGetInputStreamWithRelaxedTypes() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ MessageStream<TestMessageEnvelope> inputStream = graphSpec.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ }
+
+ @Test
+ public void testMultipleGetInputStreams() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec1 = mock(StreamSpec.class);
+ StreamSpec mockStreamSpec2 = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec1);
+ when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ MessageStream<Object> inputStream1 = graphSpec.getInputStream("test-stream-1");
+ MessageStream<Object> inputStream2 = graphSpec.getInputStream("test-stream-2");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
+ (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 =
+ (InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
+
+ assertEquals(graphSpec.getInputOperators().size(), 2);
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec1), inputOpSpec1);
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec2), inputOpSpec2);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameInputStreamTwice() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ graphSpec.getInputStream("test-stream-1");
+ // should throw exception
+ graphSpec.getInputStream("test-stream-1");
+ }
+
+ @Test
+ public void testGetOutputStreamWithValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ Serde mockValueSerde = mock(Serde.class);
+ OutputStream<TestMessageEnvelope> outputStream =
+ graphSpec.getOutputStream("test-stream-1", mockValueSerde);
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test
+ public void testGetOutputStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graphSpec.setDefaultSerde(mockKVSerde);
+ OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream("test-stream-1", mockKVSerde);
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetOutputStreamWithNullSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ graphSpec.getOutputStream("test-stream-1", null);
+ }
+
+ @Test
+ public void testGetOutputStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ Serde mockValueSerde = mock(Serde.class);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ graphSpec.setDefaultSerde(mockValueSerde);
+ OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test
+ public void testGetOutputStreamWithDefaultKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graphSpec.setDefaultSerde(mockKVSerde);
+
+ OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test
+ public void testGetOutputStreamWithDefaultDefaultSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+
+ OutputStream<TestMessageEnvelope> outputStream = graphSpec.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingStreams() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ graphSpec.getInputStream("test-stream-1");
+ graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingOutputStream() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ graphSpec.getOutputStream("test-stream-1");
+ graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingIntermediateStream() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ graphSpec.getIntermediateStream("test-stream-1", null);
+ graphSpec.setDefaultSerde(mock(Serde.class)); // should throw exception
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameOutputStreamTwice() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ graphSpec.getOutputStream("test-stream-1");
+ graphSpec.getOutputStream("test-stream-1"); // should throw exception
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ String mockStreamName = "mockStreamName";
+ when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+
+ Serde mockValueSerde = mock(Serde.class);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graphSpec.getIntermediateStream(mockStreamName, mockValueSerde);
+
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ String mockStreamName = "mockStreamName";
+ when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graphSpec.getIntermediateStream(mockStreamName, mockKVSerde);
+
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ String mockStreamName = "mockStreamName";
+ when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graph = new StreamGraphSpec(mockRunner, mockConfig);
+
+ Serde mockValueSerde = mock(Serde.class);
+ graph.setDefaultSerde(mockValueSerde);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream(mockStreamName, null);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ String mockStreamName = "mockStreamName";
+ when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graphSpec.setDefaultSerde(mockKVSerde);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graphSpec.getIntermediateStream(mockStreamName, null);
+
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithDefaultDefaultSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ String mockStreamName = "mockStreamName";
+ when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graphSpec.getIntermediateStream(mockStreamName, null);
+
+ assertEquals(graphSpec.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graphSpec.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde);
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameIntermediateStreamTwice() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
+ graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class));
+ graphSpec.getIntermediateStream("test-stream-1", mock(Serde.class));
+ }
+
+ @Test
+ public void testGetNextOpIdIncrementsId() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+ when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+ assertEquals("jobName-1234-merge-0", graphSpec.getNextOpId(OpCode.MERGE, null));
+ assertEquals("jobName-1234-join-customName", graphSpec.getNextOpId(OpCode.JOIN, "customName"));
+ assertEquals("jobName-1234-map-2", graphSpec.getNextOpId(OpCode.MAP, null));
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testGetNextOpIdRejectsDuplicates() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+ when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+ assertEquals("jobName-1234-join-customName", graphSpec.getNextOpId(OpCode.JOIN, "customName"));
+ graphSpec.getNextOpId(OpCode.JOIN, "customName"); // should throw
+ }
+
+ @Test
+ public void testUserDefinedIdValidation() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+ when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+
+ // null and empty userDefinedIDs should fall back to autogenerated IDs.
+ try {
+ graphSpec.getNextOpId(OpCode.FILTER, null);
+ graphSpec.getNextOpId(OpCode.FILTER, "");
+ graphSpec.getNextOpId(OpCode.FILTER, " ");
+ graphSpec.getNextOpId(OpCode.FILTER, "\t");
+ } catch (SamzaException e) {
+ fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
+ }
+
+ List<String> validOpIds = ImmutableList.of("op.id", "op_id", "op-id", "1000", "op_1", "OP_ID");
+ for (String validOpId: validOpIds) {
+ try {
+ graphSpec.getNextOpId(OpCode.FILTER, validOpId);
+ } catch (Exception e) {
+ fail("Received an exception with a valid operator ID: " + validOpId);
+ }
+ }
+
+ List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
+ for (String invalidOpId: invalidOpIds) {
+ try {
+ graphSpec.getNextOpId(OpCode.FILTER, invalidOpId);
+ fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
+ } catch (SamzaException e) { }
+ }
+ }
+
+ @Test
+ public void testGetInputStreamPreservesInsertionOrder() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+
+ StreamSpec testStreamSpec1 = new StreamSpec("test-stream-1", "physical-stream-1", "test-system");
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(testStreamSpec1);
+
+ StreamSpec testStreamSpec2 = new StreamSpec("test-stream-2", "physical-stream-2", "test-system");
+ when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(testStreamSpec2);
+
+ StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
+ when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
+
+ graphSpec.getInputStream("test-stream-1");
+ graphSpec.getInputStream("test-stream-2");
+ graphSpec.getInputStream("test-stream-3");
+
+ List<InputOperatorSpec> inputSpecs = new ArrayList<>(graphSpec.getInputOperators().values());
+ assertEquals(inputSpecs.size(), 3);
+ assertEquals(inputSpecs.get(0).getStreamSpec(), testStreamSpec1);
+ assertEquals(inputSpecs.get(1).getStreamSpec(), testStreamSpec2);
+ assertEquals(inputSpecs.get(2).getStreamSpec(), testStreamSpec3);
+ }
+
+ @Test
+ public void testGetTable() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+
+ BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+ when(mockTableDescriptor.getTableSpec()).thenReturn(
+ new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>()));
+ assertNotNull(graphSpec.getTable(mockTableDescriptor));
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
index f9537a3..519e5df 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/data/TestOutputMessageEnvelope.java
@@ -35,5 +35,19 @@ public class TestOutputMessageEnvelope {
public String getKey() {
return this.key;
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof TestOutputMessageEnvelope)) {
+ return false;
+ }
+ TestOutputMessageEnvelope otherMsg = (TestOutputMessageEnvelope) other;
+ return this.key.equals(otherMsg.key) && this.value.equals(otherMsg.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return String.format("%s:%d", key, value).hashCode();
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 2d8d1eb..b87e5ed 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -21,11 +21,17 @@ package org.apache.samza.operators.impl;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import org.apache.samza.Partition;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
@@ -39,9 +45,11 @@ import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.StreamGraphSpec;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.functions.ClosableFunction;
import org.apache.samza.operators.functions.FilterFunction;
+import org.apache.samza.operators.functions.InitableFunction;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.impl.store.TimestampedValue;
@@ -58,34 +66,160 @@ import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskContext;
+import java.util.List;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.util.Clock;
import org.apache.samza.util.SystemClock;
+import org.junit.After;
import org.junit.Test;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestOperatorImplGraph {
+ private void addOperatorRecursively(HashSet<OperatorImpl> s, OperatorImpl op) {
+ List<OperatorImpl> operators = new ArrayList<>();
+ operators.add(op);
+ while (!operators.isEmpty()) {
+ OperatorImpl opImpl = operators.remove(0);
+ s.add(opImpl);
+ if (!opImpl.registeredOperators.isEmpty()) {
+ operators.addAll(opImpl.registeredOperators);
+ }
+ }
+ }
+
+ static class TestMapFunction<M, OM> extends BaseTestFunction implements MapFunction<M, OM> {
+ final Function<M, OM> mapFn;
+
+ public TestMapFunction(String opId, Function<M, OM> mapFn) {
+ super(opId);
+ this.mapFn = mapFn;
+ }
+
+ @Override
+ public OM apply(M message) {
+ return this.mapFn.apply(message);
+ }
+ }
+
+ static class TestJoinFunction<K, M, JM, RM> extends BaseTestFunction implements JoinFunction<K, M, JM, RM> {
+ final BiFunction<M, JM, RM> joiner;
+ final Function<M, K> firstKeyFn;
+ final Function<JM, K> secondKeyFn;
+ final Collection<RM> joinResults = new HashSet<>();
+
+ public TestJoinFunction(String opId, BiFunction<M, JM, RM> joiner, Function<M, K> firstKeyFn, Function<JM, K> secondKeyFn) {
+ super(opId);
+ this.joiner = joiner;
+ this.firstKeyFn = firstKeyFn;
+ this.secondKeyFn = secondKeyFn;
+ }
+
+ @Override
+ public RM apply(M message, JM otherMessage) {
+ RM result = this.joiner.apply(message, otherMessage);
+ this.joinResults.add(result);
+ return result;
+ }
+
+ @Override
+ public K getFirstKey(M message) {
+ return this.firstKeyFn.apply(message);
+ }
+
+ @Override
+ public K getSecondKey(JM message) {
+ return this.secondKeyFn.apply(message);
+ }
+ }
+
+ static abstract class BaseTestFunction implements InitableFunction, ClosableFunction, Serializable {
+
+ static Map<TaskName, Map<String, BaseTestFunction>> perTaskFunctionMap = new HashMap<>();
+ static Map<TaskName, List<String>> perTaskInitList = new HashMap<>();
+ static Map<TaskName, List<String>> perTaskCloseList = new HashMap<>();
+ int numInitCalled = 0;
+ int numCloseCalled = 0;
+ TaskName taskName = null;
+ final String opId;
+
+ public BaseTestFunction(String opId) {
+ this.opId = opId;
+ }
+
+ static public void reset() {
+ perTaskFunctionMap.clear();
+ perTaskCloseList.clear();
+ perTaskInitList.clear();
+ }
+
+ static public BaseTestFunction getInstanceByTaskName(TaskName taskName, String opId) {
+ return perTaskFunctionMap.get(taskName).get(opId);
+ }
+
+ static public List<String> getInitListByTaskName(TaskName taskName) {
+ return perTaskInitList.get(taskName);
+ }
+
+ static public List<String> getCloseListByTaskName(TaskName taskName) {
+ return perTaskCloseList.get(taskName);
+ }
+
+ @Override
+ public void close() {
+ if (this.taskName == null) {
+ throw new IllegalStateException("Close called before init");
+ }
+ if (perTaskFunctionMap.get(this.taskName) == null || !perTaskFunctionMap.get(this.taskName).containsKey(opId)) {
+ throw new IllegalStateException("Close called before init");
+ }
+
+ if (perTaskCloseList.get(this.taskName) == null) {
+ perTaskCloseList.put(taskName, new ArrayList<String>() { { this.add(opId); } });
+ } else {
+ perTaskCloseList.get(taskName).add(opId);
+ }
+
+ this.numCloseCalled++;
+ }
+
+ @Override
+ public void init(Config config, TaskContext context) {
+ if (perTaskFunctionMap.get(context.getTaskName()) == null) {
+ perTaskFunctionMap.put(context.getTaskName(), new HashMap<String, BaseTestFunction>() { { this.put(opId, BaseTestFunction.this); } });
+ } else {
+ if (perTaskFunctionMap.get(context.getTaskName()).containsKey(opId)) {
+ throw new IllegalStateException(String.format("Multiple init called for op %s in the same task instance %s", opId, this.taskName.getTaskName()));
+ }
+ perTaskFunctionMap.get(context.getTaskName()).put(opId, this);
+ }
+ if (perTaskInitList.get(context.getTaskName()) == null) {
+ perTaskInitList.put(context.getTaskName(), new ArrayList<String>() { { this.add(opId); } });
+ } else {
+ perTaskInitList.get(context.getTaskName()).add(opId);
+ }
+ this.taskName = context.getTaskName();
+ this.numInitCalled++;
+ }
+ }
+
+ @After
+ public void tearDown() {
+ BaseTestFunction.reset();
+ }
+
@Test
public void testEmptyChain() {
- StreamGraphImpl streamGraph = new StreamGraphImpl(mock(ApplicationRunner.class), mock(Config.class));
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mock(ApplicationRunner.class), mock(Config.class));
OperatorImplGraph opGraph =
- new OperatorImplGraph(streamGraph, mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mock(TaskContextImpl.class), mock(Clock.class));
assertEquals(0, opGraph.getAllInputOperators().size());
}
@@ -94,10 +228,10 @@ public class TestOperatorImplGraph {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
- StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input");
- OutputStream<Object> outputStream = streamGraph.getOutputStream("output");
+ MessageStream<Object> inputStream = graphSpec.getInputStream("input");
+ OutputStream<Object> outputStream = graphSpec.getOutputStream("output");
inputStream
.filter(mock(FilterFunction.class))
@@ -108,7 +242,7 @@ public class TestOperatorImplGraph {
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(mockTaskContext.getTaskName()).thenReturn(new TaskName("task 0"));
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class));
InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
assertEquals(1, inputOpImpl.registeredOperators.size());
@@ -136,9 +270,9 @@ public class TestOperatorImplGraph {
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
- StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
- MessageStream<Object> inputStream = streamGraph.getInputStream("input");
- OutputStream<KV<Integer, String>> outputStream = streamGraph
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+ MessageStream<Object> inputStream = graphSpec.getInputStream("input");
+ OutputStream<KV<Integer, String>> outputStream = graphSpec
.getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
inputStream
@@ -160,7 +294,7 @@ public class TestOperatorImplGraph {
new SamzaContainerContext("0", mockConfig, Collections.singleton(new TaskName("task 0")), new MetricsRegistryMap());
when(mockTaskContext.getSamzaContainerContext()).thenReturn(containerContext);
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockTaskContext, mock(Clock.class));
InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
assertEquals(1, inputOpImpl.registeredOperators.size());
@@ -182,16 +316,16 @@ public class TestOperatorImplGraph {
public void testBroadcastChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
- StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+ MessageStream<Object> inputStream = graphSpec.getInputStream("input");
inputStream.filter(mock(FilterFunction.class));
inputStream.map(mock(MapFunction.class));
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class));
InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
assertEquals(2, inputOpImpl.registeredOperators.size());
@@ -204,23 +338,36 @@ public class TestOperatorImplGraph {
@Test
public void testMergeChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
- when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
- StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ when(mockRunner.getStreamSpec(eq("input")))
+ .thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+ MessageStream<Object> inputStream = graphSpec.getInputStream("input");
MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class));
MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2));
- MapFunction mockMapFunction = mock(MapFunction.class);
- mergedStream.map(mockMapFunction);
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+ TaskName mockTaskName = mock(TaskName.class);
+ when(mockTaskContext.getTaskName()).thenReturn(mockTaskName);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+
+ MapFunction testMapFunction = new TestMapFunction<Object, Object>("test-map-1", (Function & Serializable) m -> m);
+ mergedStream.map(testMapFunction);
+
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ Set<OperatorImpl> opSet = opImplGraph.getAllInputOperators().stream().collect(HashSet::new,
+ (s, op) -> addOperatorRecursively(s, op), HashSet::addAll);
+ Object[] mergeOps = opSet.stream().filter(op -> op.getOperatorSpec().getOpCode() == OpCode.MERGE).toArray();
+ assertEquals(mergeOps.length, 1);
+ assertEquals(((OperatorImpl) mergeOps[0]).registeredOperators.size(), 1);
+ OperatorImpl mapOp = (OperatorImpl) ((OperatorImpl) mergeOps[0]).registeredOperators.iterator().next();
+ assertEquals(mapOp.getOperatorSpec().getOpCode(), OpCode.MAP);
// verify that the DAG after merge is only traversed & initialized once
- verify(mockMapFunction, times(1)).init(any(Config.class), any(TaskContext.class));
+ assertEquals(TestMapFunction.getInstanceByTaskName(mockTaskName, "test-map-1").numInitCalled, 1);
}
@Test
@@ -231,25 +378,30 @@ public class TestOperatorImplGraph {
Config mockConfig = mock(Config.class);
when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
- StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
-
- JoinFunction mockJoinFunction = mock(JoinFunction.class);
- MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
- MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
- inputStream1.join(inputStream2, mockJoinFunction,
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
+
+ Integer joinKey = new Integer(1);
+ Function<Object, Integer> keyFn = (Function & Serializable) m -> joinKey;
+ JoinFunction testJoinFunction = new TestJoinFunction("jobName-jobId-join-j1",
+ (BiFunction & Serializable) (m1, m2) -> KV.of(m1, m2), keyFn, keyFn);
+ MessageStream<Object> inputStream1 = graphSpec.getInputStream("input1", new NoOpSerde<>());
+ MessageStream<Object> inputStream2 = graphSpec.getInputStream("input2", new NoOpSerde<>());
+ inputStream1.join(inputStream2, testJoinFunction,
mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j1");
+ TaskName mockTaskName = mock(TaskName.class);
TaskContextImpl mockTaskContext = mock(TaskContextImpl.class);
+ when(mockTaskContext.getTaskName()).thenReturn(mockTaskName);
when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
KeyValueStore mockLeftStore = mock(KeyValueStore.class);
when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-L"))).thenReturn(mockLeftStore);
KeyValueStore mockRightStore = mock(KeyValueStore.class);
when(mockTaskContext.getStore(eq("jobName-jobId-join-j1-R"))).thenReturn(mockRightStore);
OperatorImplGraph opImplGraph =
- new OperatorImplGraph(streamGraph, mockConfig, mockTaskContext, mock(Clock.class));
+ new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockTaskContext, mock(Clock.class));
// verify that join function is initialized once.
- verify(mockJoinFunction, times(1)).init(any(Config.class), any(TaskContext.class));
+ assertEquals(TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1").numInitCalled, 1);
InputOperatorImpl inputOpImpl1 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream1"));
InputOperatorImpl inputOpImpl2 = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream2"));
@@ -261,24 +413,23 @@ public class TestOperatorImplGraph {
assertEquals(leftPartialJoinOpImpl.getOperatorSpec(), rightPartialJoinOpImpl.getOperatorSpec());
assertNotSame(leftPartialJoinOpImpl, rightPartialJoinOpImpl);
- Object joinKey = new Object();
// verify that left partial join operator calls getFirstKey
Object mockLeftMessage = mock(Object.class);
long currentTimeMillis = System.currentTimeMillis();
when(mockLeftStore.get(eq(joinKey))).thenReturn(new TimestampedValue<>(mockLeftMessage, currentTimeMillis));
- when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
- verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
// verify that right partial join operator calls getSecondKey
Object mockRightMessage = mock(Object.class);
when(mockRightStore.get(eq(joinKey))).thenReturn(new TimestampedValue<>(mockRightMessage, currentTimeMillis));
- when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
- verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
+
// verify that the join function apply is called with the correct messages on match
- verify(mockJoinFunction, times(1)).apply(mockLeftMessage, mockRightMessage);
+ assertEquals(((TestJoinFunction) TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1")).joinResults.size(), 1);
+ KV joinResult = (KV) ((TestJoinFunction) TestJoinFunction.getInstanceByTaskName(mockTaskName, "jobName-jobId-join-j1")).joinResults.iterator().next();
+ assertEquals(joinResult.getKey(), mockLeftMessage);
+ assertEquals(joinResult.getValue(), mockRightMessage);
}
@Test
@@ -287,23 +438,25 @@ public class TestOperatorImplGraph {
when(mockRunner.getStreamSpec("input1")).thenReturn(new StreamSpec("input1", "input-stream1", "input-system"));
when(mockRunner.getStreamSpec("input2")).thenReturn(new StreamSpec("input2", "input-stream2", "input-system"));
Config mockConfig = mock(Config.class);
+ TaskName mockTaskName = mock(TaskName.class);
TaskContextImpl mockContext = mock(TaskContextImpl.class);
+ when(mockContext.getTaskName()).thenReturn(mockTaskName);
when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
- StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(mockRunner, mockConfig);
- MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1");
- MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2");
+ MessageStream<Object> inputStream1 = graphSpec.getInputStream("input1");
+ MessageStream<Object> inputStream2 = graphSpec.getInputStream("input2");
- List<String> initializedOperators = new ArrayList<>();
- List<String> closedOperators = new ArrayList<>();
+ Function mapFn = (Function & Serializable) m -> m;
+ inputStream1.map(new TestMapFunction<Object, Object>("1", mapFn))
+ .map(new TestMapFunction<Object, Object>("2", mapFn));
- inputStream1.map(createMapFunction("1", initializedOperators, closedOperators))
- .map(createMapFunction("2", initializedOperators, closedOperators));
+ inputStream2.map(new TestMapFunction<Object, Object>("3", mapFn))
+ .map(new TestMapFunction<Object, Object>("4", mapFn));
- inputStream2.map(createMapFunction("3", initializedOperators, closedOperators))
- .map(createMapFunction("4", initializedOperators, closedOperators));
+ OperatorImplGraph opImplGraph = new OperatorImplGraph(graphSpec.getOperatorSpecGraph(), mockConfig, mockContext, SystemClock.instance());
- OperatorImplGraph opImplGraph = new OperatorImplGraph(streamGraph, mockConfig, mockContext, SystemClock.instance());
+ List<String> initializedOperators = BaseTestFunction.getInitListByTaskName(mockTaskName);
// Assert that initialization occurs in topological order.
assertEquals(initializedOperators.get(0), "1");
@@ -313,35 +466,13 @@ public class TestOperatorImplGraph {
// Assert that finalization occurs in reverse topological order.
opImplGraph.close();
+ List<String> closedOperators = BaseTestFunction.getCloseListByTaskName(mockTaskName);
assertEquals(closedOperators.get(0), "4");
assertEquals(closedOperators.get(1), "3");
assertEquals(closedOperators.get(2), "2");
assertEquals(closedOperators.get(3), "1");
}
- /**
- * Creates an identity map function that appends to the provided lists when init/close is invoked.
- */
- private MapFunction<Object, Object> createMapFunction(String id,
- List<String> initializedOperators, List<String> finalizedOperators) {
- return new MapFunction<Object, Object>() {
- @Override
- public void init(Config config, TaskContext context) {
- initializedOperators.add(id);
- }
-
- @Override
- public void close() {
- finalizedOperators.add(id);
- }
-
- @Override
- public Object apply(Object message) {
- return message;
- }
- };
- }
-
@Test
public void testGetStreamToConsumerTasks() {
String system = "test-system";
@@ -409,16 +540,16 @@ public class TestOperatorImplGraph {
when(runner.getStreamSpec("test-app-1-partition_by-p2")).thenReturn(int1);
when(runner.getStreamSpec("test-app-1-partition_by-p1")).thenReturn(int2);
- StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- MessageStream messageStream1 = streamGraph.getInputStream("input1").map(m -> m);
- MessageStream messageStream2 = streamGraph.getInputStream("input2").filter(m -> true);
+ StreamGraphSpec graphSpec = new StreamGraphSpec(runner, config);
+ MessageStream messageStream1 = graphSpec.getInputStream("input1").map(m -> m);
+ MessageStream messageStream2 = graphSpec.getInputStream("input2").filter(m -> true);
MessageStream messageStream3 =
- streamGraph.getInputStream("input3")
+ graphSpec.getInputStream("input3")
.filter(m -> true)
.partitionBy(m -> "hehe", m -> m, "p1")
.map(m -> m);
- OutputStream<Object> outputStream1 = streamGraph.getOutputStream("output1");
- OutputStream<Object> outputStream2 = streamGraph.getOutputStream("output2");
+ OutputStream<Object> outputStream1 = graphSpec.getOutputStream("output1");
+ OutputStream<Object> outputStream2 = graphSpec.getOutputStream("output2");
messageStream1
.join(messageStream2, mock(JoinFunction.class),
@@ -430,7 +561,8 @@ public class TestOperatorImplGraph {
mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
.sendTo(outputStream2);
- Multimap<SystemStream, SystemStream> outputToInput = OperatorImplGraph.getIntermediateToInputStreamsMap(streamGraph);
+ Multimap<SystemStream, SystemStream> outputToInput =
+ OperatorImplGraph.getIntermediateToInputStreamsMap(graphSpec.getOperatorSpecGraph());
Collection<SystemStream> inputs = outputToInput.get(int1.toSystemStream());
assertEquals(inputs.size(), 2);
assertTrue(inputs.contains(input1.toSystemStream()));
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index a91c1af..873cd3c 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -48,7 +48,7 @@ public class TestStreamOperatorImpl {
Config mockConfig = mock(Config.class);
TaskContext mockContext = mock(TaskContext.class);
StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
- new StreamOperatorImpl<>(mockOp, mockConfig, mockContext);
+ new StreamOperatorImpl<>(mockOp);
TestMessageEnvelope inMsg = mock(TestMessageEnvelope.class);
Collection<TestOutputMessageEnvelope> mockOutputs = mock(Collection.class);
when(txfmFn.apply(inMsg)).thenReturn(mockOutputs);
@@ -69,7 +69,7 @@ public class TestStreamOperatorImpl {
TaskContext mockContext = mock(TaskContext.class);
StreamOperatorImpl<TestMessageEnvelope, TestOutputMessageEnvelope> opImpl =
- new StreamOperatorImpl<>(mockOp, mockConfig, mockContext);
+ new StreamOperatorImpl<>(mockOp);
// ensure that close is not called yet
verify(txfmFn, times(0)).close();
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
index 7d0c623..9741fc4 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestWindowOperator.java
@@ -22,19 +22,20 @@ package org.apache.samza.operators.impl;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import junit.framework.Assert;
import org.apache.samza.Partition;
-import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.container.TaskContextImpl;
import org.apache.samza.container.TaskName;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphSpec;
+import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.impl.store.TestInMemoryStore;
import org.apache.samza.operators.impl.store.TimeSeriesKeySerde;
+import org.apache.samza.operators.OperatorSpecGraph;
import org.apache.samza.operators.triggers.FiringType;
import org.apache.samza.operators.triggers.Trigger;
import org.apache.samza.operators.triggers.Triggers;
@@ -54,19 +55,25 @@ import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamOperatorTask;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.testUtils.TestClock;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
-import java.util.function.Function;
+import java.util.Map;
+import java.util.Collections;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class TestWindowOperator {
private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class);
@@ -83,26 +90,32 @@ public class TestWindowOperator {
taskContext = mock(TaskContextImpl.class);
runner = mock(ApplicationRunner.class);
Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
- Serde storeValSerde = new IntegerEnvelopeSerde();
+ Serde storeValSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
-
when(taskContext.getStore("jobName-jobId-window-w1"))
.thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
+
+ Map<String, String> mapConfig = new HashMap<>();
+ mapConfig.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+ mapConfig.put("job.default.system", "kafka");
+ mapConfig.put("job.name", "jobName");
+ mapConfig.put("job.id", "jobId");
+ config = new MapConfig(mapConfig);
}
@Test
public void testTumblingWindowsDiscardingMode() throws Exception {
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -130,12 +143,12 @@ public class TestWindowOperator {
@Test
public void testNonKeyedTumblingWindowsDiscardingMode() throws Exception {
- StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000)));
+ OperatorSpecGraph sgb = this.getTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(1000))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
MessageCollector messageCollector =
@@ -159,12 +172,12 @@ public class TestWindowOperator {
when(taskContext.getStore("jobName-jobId-window-w1"))
.thenReturn(new TestInMemoryStore<>(new TimeSeriesKeySerde(new IntegerSerde()), new IntegerSerde()));
- StreamApplication sgb = new AggregateTumblingWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ OperatorSpecGraph sgb = this.getAggregateTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Integer>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
MessageCollector messageCollector = envelope -> windowPanes.add((WindowPane<Integer, Integer>) envelope.getMessage());
integers.forEach(n -> task.process(new IntegerEnvelope(n), messageCollector, taskCoordinator));
@@ -181,11 +194,11 @@ public class TestWindowOperator {
@Test
public void testTumblingWindowsAccumulatingMode() throws Exception {
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
MessageCollector messageCollector =
@@ -210,10 +223,11 @@ public class TestWindowOperator {
@Test
public void testSessionWindowsDiscardingMode() throws Exception {
- StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ OperatorSpecGraph sgb =
+ this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
@@ -255,10 +269,10 @@ public class TestWindowOperator {
@Test
public void testSessionWindowsAccumulatingMode() throws Exception {
- StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofMillis(500));
+ OperatorSpecGraph sgb = this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING,
+ Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
MessageCollector messageCollector =
@@ -287,10 +301,10 @@ public class TestWindowOperator {
@Test
public void testCancellationOfOnceTrigger() throws Exception {
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING,
- Duration.ofSeconds(1), Triggers.count(2));
+ OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING,
+ Duration.ofSeconds(1), Triggers.count(2)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
@@ -331,10 +345,10 @@ public class TestWindowOperator {
@Test
public void testCancellationOfAnyTrigger() throws Exception {
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
- Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))));
+ OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+ Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))).getOperatorSpecGraph();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
@@ -389,15 +403,15 @@ public class TestWindowOperator {
@Test
public void testCancelationOfRepeatingNestedTriggers() throws Exception {
- StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
- Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))));
+ OperatorSpecGraph sgb = this.getKeyedTumblingWindowStreamGraph(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1),
+ Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
MessageCollector messageCollector =
envelope -> windowPanes.add((WindowPane<Integer, Collection<IntegerEnvelope>>) envelope.getMessage());
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
task.process(new IntegerEnvelope(1), messageCollector, taskCoordinator);
@@ -434,12 +448,12 @@ public class TestWindowOperator {
when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
- StreamApplication sgb = new TumblingWindowStreamApplication(AccumulationMode.DISCARDING,
- Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2)));
+ OperatorSpecGraph sgb = this.getTumblingWindowStreamGraph(AccumulationMode.DISCARDING,
+ Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))).getOperatorSpecGraph();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
TestClock testClock = new TestClock();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
MessageCollector messageCollector =
@@ -475,10 +489,11 @@ public class TestWindowOperator {
when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
- StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ OperatorSpecGraph sgb =
+ this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
MessageCollector messageCollector =
@@ -511,10 +526,11 @@ public class TestWindowOperator {
when(taskContext.fetchObject(EndOfStreamStates.class.getName())).thenReturn(endOfStreamStates);
when(taskContext.fetchObject(WatermarkStates.class.getName())).thenReturn(mock(WatermarkStates.class));
- StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500));
+ OperatorSpecGraph sgb =
+ this.getKeyedSessionWindowStreamGraph(AccumulationMode.DISCARDING, Duration.ofMillis(500)).getOperatorSpecGraph();
TestClock testClock = new TestClock();
List<WindowPane<Integer, Collection<IntegerEnvelope>>> windowPanes = new ArrayList<>();
- StreamOperatorTask task = new StreamOperatorTask(sgb, runner, testClock);
+ StreamOperatorTask task = new StreamOperatorTask(sgb, null, testClock);
task.init(config, taskContext);
MessageCollector messageCollector =
@@ -534,144 +550,83 @@ public class TestWindowOperator {
verify(taskCoordinator, times(1)).shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
}
- private class KeyedTumblingWindowStreamApplication implements StreamApplication {
-
- private final AccumulationMode mode;
- private final Duration duration;
- private final Trigger<IntegerEnvelope> earlyTrigger;
- private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ private StreamGraphSpec getKeyedTumblingWindowStreamGraph(AccumulationMode mode,
+ Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException {
+ StreamGraphSpec graph = new StreamGraphSpec(runner, config);
- KeyedTumblingWindowStreamApplication(AccumulationMode mode,
- Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
- this.mode = mode;
- this.duration = timeDuration;
- this.earlyTrigger = earlyTrigger;
- }
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ graph.getInputStream("integers", kvSerde)
+ .window(Windows.keyedTumblingWindow(KV::getKey, duration, new IntegerSerde(), kvSerde)
+ .setEarlyTrigger(earlyTrigger).setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream =
- graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
- .map(kv -> new IntegerEnvelope(kv.getKey()));
- Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
- inStream
- .map(m -> m)
- .window(Windows.keyedTumblingWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
- .setEarlyTrigger(earlyTrigger)
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- }
+ return graph;
}
- private class TumblingWindowStreamApplication implements StreamApplication {
-
- private final AccumulationMode mode;
- private final Duration duration;
- private final Trigger<IntegerEnvelope> earlyTrigger;
- private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ private StreamGraphSpec getTumblingWindowStreamGraph(AccumulationMode mode,
+ Duration duration, Trigger<KV<Integer, Integer>> earlyTrigger) throws IOException {
+ StreamGraphSpec graph = new StreamGraphSpec(runner, config);
- TumblingWindowStreamApplication(AccumulationMode mode,
- Duration timeDuration, Trigger<IntegerEnvelope> earlyTrigger) {
- this.mode = mode;
- this.duration = timeDuration;
- this.earlyTrigger = earlyTrigger;
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream =
- graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
- .map(kv -> new IntegerEnvelope(kv.getKey()));
- Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
- inStream
- .map(m -> m)
- .window(Windows.tumblingWindow(duration, new IntegerEnvelopeSerde())
- .setEarlyTrigger(earlyTrigger)
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- }
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ graph.getInputStream("integers", kvSerde)
+ .window(Windows.tumblingWindow(duration, kvSerde).setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ return graph;
}
- private class AggregateTumblingWindowStreamApplication implements StreamApplication {
+ private StreamGraphSpec getKeyedSessionWindowStreamGraph(AccumulationMode mode, Duration duration) throws IOException {
+ StreamGraphSpec graph = new StreamGraphSpec(runner, config);
- private final AccumulationMode mode;
- private final Duration duration;
- private final Trigger<IntegerEnvelope> earlyTrigger;
- private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
-
- AggregateTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration,
- Trigger<IntegerEnvelope> earlyTrigger) {
- this.mode = mode;
- this.duration = timeDuration;
- this.earlyTrigger = earlyTrigger;
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers",
- KVSerde.of(new IntegerSerde(), new IntegerSerde()));
-
- integers
- .map(kv -> new IntegerEnvelope(kv.getKey()))
- .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(this.duration, () -> 0, (m, c) -> c + 1, new IntegerSerde())
- .setEarlyTrigger(earlyTrigger)
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
+ graph.getInputStream("integers", kvSerde)
+ .window(Windows.keyedSessionWindow(KV::getKey, duration, new IntegerSerde(), kvSerde)
.setAccumulationMode(mode), "w1")
.sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
});
- }
+ return graph;
}
- private class KeyedSessionWindowStreamApplication implements StreamApplication {
+ private StreamGraphSpec getAggregateTumblingWindowStreamGraph(AccumulationMode mode, Duration timeDuration,
+ Trigger<IntegerEnvelope> earlyTrigger) throws IOException {
+ StreamGraphSpec graph = new StreamGraphSpec(runner, config);
- private final AccumulationMode mode;
- private final Duration duration;
- private final SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ MessageStream<KV<Integer, Integer>> integers = graph.getInputStream("integers",
+ KVSerde.of(new IntegerSerde(), new IntegerSerde()));
- KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) {
- this.mode = mode;
- this.duration = duration;
- }
-
- @Override
- public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream =
- graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
- .map(kv -> new IntegerEnvelope(kv.getKey()));
- Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
-
- inStream
- .map(m -> m)
- .window(Windows.keyedSessionWindow(keyFn, duration, new IntegerSerde(), new IntegerEnvelopeSerde())
- .setAccumulationMode(mode), "w1")
- .sink((message, messageCollector, taskCoordinator) -> {
- messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
- });
- }
+ integers
+ .map(new KVMapFunction())
+ .window(Windows.<IntegerEnvelope, Integer>tumblingWindow(timeDuration, () -> 0, (m, c) -> c + 1, new IntegerSerde())
+ .setEarlyTrigger(earlyTrigger)
+ .setAccumulationMode(mode), "w1")
+ .sink((message, messageCollector, taskCoordinator) -> {
+ SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
+ messageCollector.send(new OutgoingMessageEnvelope(outputSystemStream, message));
+ });
+ return graph;
}
- private class IntegerEnvelope extends IncomingMessageEnvelope {
+ private static class IntegerEnvelope extends IncomingMessageEnvelope {
IntegerEnvelope(Integer key) {
- super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, key);
+ super(new SystemStreamPartition("kafka", "integers", new Partition(0)), null, key, key);
}
}
- private class IntegerEnvelopeSerde implements Serde<IntegerEnvelope> {
- private final IntegerSerde intSerde = new IntegerSerde();
+ private static class KVMapFunction implements MapFunction<KV<Integer, Integer>, IntegerEnvelope> {
@Override
- public byte[] toBytes(IntegerEnvelope object) {
- return intSerde.toBytes((Integer) object.getKey());
- }
-
- @Override
- public IntegerEnvelope fromBytes(byte[] bytes) {
- return new IntegerEnvelope(intSerde.fromBytes(bytes));
+ public IntegerEnvelope apply(KV<Integer, Integer> message) {
+ return new IntegerEnvelope(message.getKey());
}
}
+
}