You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2018/09/07 06:36:12 UTC
[6/9] samza git commit: SAMZA-1789: unify ApplicationDescriptor and
ApplicationRunner for high- and low-level APIs in YARN and standalone
environment
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
new file mode 100644
index 0000000..db85e33
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/application/TestStreamApplicationDescriptorImpl.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.operators.BaseTableDescriptor;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.data.TestMessageEnvelope;
+import org.apache.samza.operators.descriptors.GenericInputDescriptor;
+import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
+import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.ExpandingInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.operators.descriptors.base.system.TransformingInputDescriptorProvider;
+import org.apache.samza.operators.functions.InputTransformer;
+import org.apache.samza.operators.functions.StreamExpander;
+import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec.OpCode;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.table.TableSpec;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for {@link StreamApplicationDescriptorImpl}
+ */
+public class TestStreamApplicationDescriptorImpl {
+
+ @Test
+ public void testConstructor() {
+ StreamApplication mockApp = mock(StreamApplication.class);
+ Config mockConfig = mock(Config.class);
+ StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(mockApp, mockConfig);
+ verify(mockApp).describe(appDesc);
+ assertEquals(mockConfig, appDesc.config);
+ }
+
+ @Test
+ public void testGetInputStreamWithValueSerde() {
+
+ String streamId = "test-stream-1";
+ Serde mockValueSerde = mock(Serde.class);
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd);
+ }, mock(Config.class));
+
+ InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(streamId, inputOpSpec.getStreamId());
+ assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithKeyValueSerde() {
+
+ String streamId = "test-stream-1";
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mockKVSerde);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd);
+ }, mock(Config.class));
+
+ InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(streamId, inputOpSpec.getStreamId());
+ assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
+ assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetInputStreamWithNullSerde() {
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericInputDescriptor isd = sd.getInputDescriptor("mockStreamId", null);
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd);
+ }, mock(Config.class));
+ }
+
+ @Test
+ public void testGetInputStreamWithTransformFunction() {
+ String streamId = "test-stream-1";
+ Serde mockValueSerde = mock(Serde.class);
+ InputTransformer transformer = ime -> ime;
+ MockTransformingSystemDescriptor sd = new MockTransformingSystemDescriptor("mockSystem", transformer);
+ MockInputDescriptor isd = sd.getInputDescriptor(streamId, mockValueSerde);
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd);
+ }, mock(Config.class));
+
+ InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(streamId, inputOpSpec.getStreamId());
+ assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
+ assertEquals(transformer, inputOpSpec.getTransformer());
+ }
+
+ @Test
+ public void testGetInputStreamWithExpandingSystem() {
+ String streamId = "test-stream-1";
+ String expandedStreamId = "expanded-stream";
+ AtomicInteger expandCallCount = new AtomicInteger();
+ StreamExpander expander = (sg, isd) -> {
+ expandCallCount.incrementAndGet();
+ InputDescriptor expandedISD =
+ new GenericSystemDescriptor("expanded-system", "mockFactoryClass")
+ .getInputDescriptor(expandedStreamId, new IntegerSerde());
+
+ return sg.getInputStream(expandedISD);
+ };
+ MockExpandingSystemDescriptor sd = new MockExpandingSystemDescriptor("mock-system", expander);
+ MockInputDescriptor isd = sd.getInputDescriptor(streamId, new IntegerSerde());
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd);
+ }, mock(Config.class));
+
+ InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(expandedStreamId);
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(1, expandCallCount.get());
+ assertFalse(streamAppDesc.getInputOperators().containsKey(streamId));
+ assertFalse(streamAppDesc.getInputDescriptors().containsKey(streamId));
+ assertTrue(streamAppDesc.getInputDescriptors().containsKey(expandedStreamId));
+ assertEquals(expandedStreamId, inputOpSpec.getStreamId());
+ }
+
+ @Test
+ public void testGetInputStreamWithRelaxedTypes() {
+ String streamId = "test-stream-1";
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd);
+ }, mock(Config.class));
+
+ InputOperatorSpec inputOpSpec = streamAppDesc.getInputOperators().get(streamId);
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(streamId, inputOpSpec.getStreamId());
+ assertEquals(isd, streamAppDesc.getInputDescriptors().get(streamId));
+ }
+
+ @Test
+ public void testMultipleGetInputStreams() {
+ String streamId1 = "test-stream-1";
+ String streamId2 = "test-stream-2";
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId1, mock(Serde.class));
+ GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId2, mock(Serde.class));
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd1);
+ appDesc.getInputStream(isd2);
+ }, mock(Config.class));
+
+ InputOperatorSpec inputOpSpec1 = streamAppDesc.getInputOperators().get(streamId1);
+ InputOperatorSpec inputOpSpec2 = streamAppDesc.getInputOperators().get(streamId2);
+
+ assertEquals(2, streamAppDesc.getInputOperators().size());
+ assertEquals(streamId1, inputOpSpec1.getStreamId());
+ assertEquals(streamId2, inputOpSpec2.getStreamId());
+ assertEquals(2, streamAppDesc.getInputDescriptors().size());
+ assertEquals(isd1, streamAppDesc.getInputDescriptors().get(streamId1));
+ assertEquals(isd2, streamAppDesc.getInputDescriptors().get(streamId2));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameInputStreamTwice() {
+ String streamId = "test-stream-1";
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericInputDescriptor isd1 = sd.getInputDescriptor(streamId, mock(Serde.class));
+ GenericInputDescriptor isd2 = sd.getInputDescriptor(streamId, mock(Serde.class));
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd1);
+ // should throw exception
+ appDesc.getInputStream(isd2);
+ }, mock(Config.class));
+ }
+
+ @Test
+ public void testMultipleSystemDescriptorForSameSystemName() {
+ GenericSystemDescriptor sd1 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericSystemDescriptor sd2 = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericInputDescriptor isd1 = sd1.getInputDescriptor("test-stream-1", mock(Serde.class));
+ GenericInputDescriptor isd2 = sd2.getInputDescriptor("test-stream-2", mock(Serde.class));
+ GenericOutputDescriptor osd1 = sd2.getOutputDescriptor("test-stream-3", mock(Serde.class));
+
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd1);
+ try {
+ appDesc.getInputStream(isd2);
+ fail("Adding input stream with the same system name but different SystemDescriptor should have failed");
+ } catch (IllegalStateException e) { }
+
+ try {
+ appDesc.getOutputStream(osd1);
+ fail("adding output stream with the same system name but different SystemDescriptor should have failed");
+ } catch (IllegalStateException e) { }
+ }, mock(Config.class));
+
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.withDefaultSystem(sd2);
+ try {
+ appDesc.getInputStream(isd1);
+ fail("Adding input stream with the same system name as the default system but different SystemDescriptor should have failed");
+ } catch (IllegalStateException e) { }
+ }, mock(Config.class));
+ }
+
+ @Test
+ public void testGetOutputStreamWithKeyValueSerde() {
+ String streamId = "test-stream-1";
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockKVSerde);
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getOutputStream(osd);
+ }, mock(Config.class));
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
+ assertEquals(streamId, outputStreamImpl.getStreamId());
+ assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
+ assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetOutputStreamWithNullSerde() {
+ String streamId = "test-stream-1";
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, null);
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getOutputStream(osd);
+ }, mock(Config.class));
+ }
+
+ @Test
+ public void testGetOutputStreamWithValueSerde() {
+ String streamId = "test-stream-1";
+ Serde mockValueSerde = mock(Serde.class);
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mockValueSerde);
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getOutputStream(osd);
+ }, mock(Config.class));
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = streamAppDesc.getOutputStreams().get(streamId);
+ assertEquals(streamId, outputStreamImpl.getStreamId());
+ assertEquals(osd, streamAppDesc.getOutputDescriptors().get(streamId));
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSystemDescriptorAfterGettingInputStream() {
+ String streamId = "test-stream-1";
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericInputDescriptor isd = sd.getInputDescriptor(streamId, mock(Serde.class));
+
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(isd);
+ appDesc.withDefaultSystem(sd); // should throw exception
+ }, mock(Config.class));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSystemDescriptorAfterGettingOutputStream() {
+ String streamId = "test-stream-1";
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericOutputDescriptor osd = sd.getOutputDescriptor(streamId, mock(Serde.class));
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getOutputStream(osd);
+ appDesc.withDefaultSystem(sd); // should throw exception
+ }, mock(Config.class));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSystemDescriptorAfterGettingIntermediateStream() {
+ String streamId = "test-stream-1";
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+ streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
+ streamAppDesc.withDefaultSystem(mock(SystemDescriptor.class)); // should throw exception
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameOutputStreamTwice() {
+ String streamId = "test-stream-1";
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ GenericOutputDescriptor osd1 = sd.getOutputDescriptor(streamId, mock(Serde.class));
+ GenericOutputDescriptor osd2 = sd.getOutputDescriptor(streamId, mock(Serde.class));
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getOutputStream(osd1);
+ appDesc.getOutputStream(osd2); // should throw exception
+ }, mock(Config.class));
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithValueSerde() {
+ String streamId = "stream-1";
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+
+ Serde mockValueSerde = mock(Serde.class);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ streamAppDesc.getIntermediateStream(streamId, mockValueSerde, false);
+
+ assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+ assertEquals(streamId, intermediateStreamImpl.getStreamId());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertTrue(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithKeyValueSerde() {
+ String streamId = "streamId";
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ streamAppDesc.getIntermediateStream(streamId, mockKVSerde, false);
+
+ assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+ assertEquals(streamId, intermediateStreamImpl.getStreamId());
+ assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertEquals(mockKeySerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+ assertEquals(mockValueSerde, ((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithDefaultSystemDescriptor() {
+ Config mockConfig = mock(Config.class);
+ String streamId = "streamId";
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mock-system", "mock-system-factory");
+ streamAppDesc.withDefaultSystem(sd);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ streamAppDesc.getIntermediateStream(streamId, mock(Serde.class), false);
+
+ assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+ assertEquals(streamId, intermediateStreamImpl.getStreamId());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithNoSerde() {
+ Config mockConfig = mock(Config.class);
+ String streamId = "streamId";
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ streamAppDesc.getIntermediateStream(streamId, null, false);
+
+ assertEquals(streamAppDesc.getInputOperators().get(streamId), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(streamAppDesc.getOutputStreams().get(streamId), intermediateStreamImpl.getOutputStream());
+ assertEquals(streamId, intermediateStreamImpl.getStreamId());
+ assertNull(intermediateStreamImpl.getOutputStream().getKeySerde());
+ assertNull(intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertNull(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+ assertNull(((InputOperatorSpec) (OperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGetSameIntermediateStreamTwice() {
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mock(Config.class));
+ streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
+ // should throw exception
+ streamAppDesc.getIntermediateStream("test-stream-1", mock(Serde.class), false);
+ }
+
+ @Test
+ public void testGetNextOpIdIncrementsId() {
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+ when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+ assertEquals("jobName-1234-merge-0", streamAppDesc.getNextOpId(OpCode.MERGE, null));
+ assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
+ assertEquals("jobName-1234-map-2", streamAppDesc.getNextOpId(OpCode.MAP, null));
+ }
+
+ @Test(expected = SamzaException.class)
+ public void testGetNextOpIdRejectsDuplicates() {
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+ when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+ assertEquals("jobName-1234-join-customName", streamAppDesc.getNextOpId(OpCode.JOIN, "customName"));
+ streamAppDesc.getNextOpId(OpCode.JOIN, "customName"); // should throw
+ }
+
+ @Test
+ public void testOpIdValidation() {
+ Config mockConfig = mock(Config.class);
+ when(mockConfig.get(eq(JobConfig.JOB_NAME()))).thenReturn("jobName");
+ when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("1234");
+
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> { }, mockConfig);
+
+ // null and empty userDefinedIDs should fall back to autogenerated IDs.
+ try {
+ streamAppDesc.getNextOpId(OpCode.FILTER, null);
+ streamAppDesc.getNextOpId(OpCode.FILTER, "");
+ streamAppDesc.getNextOpId(OpCode.FILTER, " ");
+ streamAppDesc.getNextOpId(OpCode.FILTER, "\t");
+ } catch (SamzaException e) {
+ fail("Received an error with a null or empty operator ID instead of defaulting to auto-generated ID.");
+ }
+
+ List<String> validOpIds = ImmutableList.of("op_id", "op-id", "1000", "op_1", "OP_ID");
+ for (String validOpId: validOpIds) {
+ try {
+ streamAppDesc.getNextOpId(OpCode.FILTER, validOpId);
+ } catch (Exception e) {
+ fail("Received an exception with a valid operator ID: " + validOpId);
+ }
+ }
+
+ List<String> invalidOpIds = ImmutableList.of("op id", "op#id");
+ for (String invalidOpId: invalidOpIds) {
+ try {
+ streamAppDesc.getNextOpId(OpCode.FILTER, invalidOpId);
+ fail("Did not receive an exception with an invalid operator ID: " + invalidOpId);
+ } catch (SamzaException e) { }
+ }
+ }
+
+ @Test
+ public void testGetInputStreamPreservesInsertionOrder() {
+ Config mockConfig = mock(Config.class);
+
+ String testStreamId1 = "test-stream-1";
+ String testStreamId2 = "test-stream-2";
+ String testStreamId3 = "test-stream-3";
+
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("mockSystem", "mockSystemFactoryClass");
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getInputStream(sd.getInputDescriptor(testStreamId1, mock(Serde.class)));
+ appDesc.getInputStream(sd.getInputDescriptor(testStreamId2, mock(Serde.class)));
+ appDesc.getInputStream(sd.getInputDescriptor(testStreamId3, mock(Serde.class)));
+ }, mockConfig);
+
+ List<InputOperatorSpec> inputSpecs = new ArrayList<>(streamAppDesc.getInputOperators().values());
+ assertEquals(inputSpecs.size(), 3);
+ assertEquals(inputSpecs.get(0).getStreamId(), testStreamId1);
+ assertEquals(inputSpecs.get(1).getStreamId(), testStreamId2);
+ assertEquals(inputSpecs.get(2).getStreamId(), testStreamId3);
+ }
+
+ @Test
+ public void testGetTable() throws Exception {
+ Config mockConfig = mock(Config.class);
+
+ BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+ TableSpec testTableSpec = new TableSpec("t1", KVSerde.of(new NoOpSerde(), new NoOpSerde()), "", new HashMap<>());
+ when(mockTableDescriptor.getTableSpec()).thenReturn(testTableSpec);
+ when(mockTableDescriptor.getTableId()).thenReturn(testTableSpec.getId());
+ StreamApplicationDescriptorImpl streamAppDesc = new StreamApplicationDescriptorImpl(appDesc -> {
+ appDesc.getTable(mockTableDescriptor);
+ }, mockConfig);
+ assertNotNull(streamAppDesc.getTables().get(testTableSpec));
+ }
+
+ @Test
+ public void testContextManager() {
+ ContextManager cntxMan = mock(ContextManager.class);
+ StreamApplication testApp = appDesc -> appDesc.withContextManager(cntxMan);
+ StreamApplicationDescriptorImpl appSpec = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appSpec.getContextManager(), cntxMan);
+ }
+
+ @Test
+ public void testProcessorLifecycleListenerFactory() {
+ ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
+ StreamApplication testApp = appSpec -> appSpec.withProcessorLifecycleListenerFactory(mockFactory);
+ StreamApplicationDescriptorImpl appDesc = new StreamApplicationDescriptorImpl(testApp, mock(Config.class));
+ assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testGetTableWithBadId() {
+ Config mockConfig = mock(Config.class);
+ new StreamApplicationDescriptorImpl(appDesc -> {
+ BaseTableDescriptor mockTableDescriptor = mock(BaseTableDescriptor.class);
+ when(mockTableDescriptor.getTableId()).thenReturn("my.table");
+ appDesc.getTable(mockTableDescriptor);
+ }, mockConfig);
+ }
+
+ class MockExpandingSystemDescriptor extends SystemDescriptor<MockExpandingSystemDescriptor> implements ExpandingInputDescriptorProvider<Integer> {
+ public MockExpandingSystemDescriptor(String systemName, StreamExpander expander) {
+ super(systemName, "factory.class", null, expander);
+ }
+
+ @Override
+ public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
+ return new MockInputDescriptor<>(streamId, this, serde);
+ }
+ }
+
+ class MockTransformingSystemDescriptor extends SystemDescriptor<MockTransformingSystemDescriptor> implements TransformingInputDescriptorProvider<Integer> {
+ public MockTransformingSystemDescriptor(String systemName, InputTransformer transformer) {
+ super(systemName, "factory.class", transformer, null);
+ }
+
+ @Override
+ public MockInputDescriptor<Integer> getInputDescriptor(String streamId, Serde serde) {
+ return new MockInputDescriptor<>(streamId, this, serde);
+ }
+ }
+
+ public class MockInputDescriptor<StreamMessageType> extends InputDescriptor<StreamMessageType, MockInputDescriptor<StreamMessageType>> {
+ MockInputDescriptor(String streamId, SystemDescriptor systemDescriptor, Serde serde) {
+ super(streamId, serde, systemDescriptor, null);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
new file mode 100644
index 0000000..9418c1f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/application/TestTaskApplicationDescriptorImpl.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.application;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.TableDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.runtime.ProcessorLifecycleListenerFactory;
+import org.apache.samza.task.TaskFactory;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit test for {@link TaskApplicationDescriptorImpl}
+ */
+public class TestTaskApplicationDescriptorImpl {
+
+ private Config config = mock(Config.class);
+ private String defaultSystemName = "test-system";
+ private SystemDescriptor defaultSystemDescriptor = mock(SystemDescriptor.class);
+ private List<InputDescriptor> mockInputs = new ArrayList<InputDescriptor>() { {
+ InputDescriptor mock1 = mock(InputDescriptor.class);
+ InputDescriptor mock2 = mock(InputDescriptor.class);
+ when(mock1.getStreamId()).thenReturn("test-input1");
+ when(mock2.getStreamId()).thenReturn("test-input2");
+ this.add(mock1);
+ this.add(mock2);
+ } };
+ private List<OutputDescriptor> mockOutputs = new ArrayList<OutputDescriptor>() { {
+ OutputDescriptor mock1 = mock(OutputDescriptor.class);
+ OutputDescriptor mock2 = mock(OutputDescriptor.class);
+ when(mock1.getStreamId()).thenReturn("test-output1");
+ when(mock2.getStreamId()).thenReturn("test-output2");
+ this.add(mock1);
+ this.add(mock2);
+ } };
+ private Set<TableDescriptor> mockTables = new HashSet<TableDescriptor>() { {
+ TableDescriptor mock1 = mock(TableDescriptor.class);
+ TableDescriptor mock2 = mock(TableDescriptor.class);
+ when(mock1.getTableId()).thenReturn("test-table1");
+ when(mock2.getTableId()).thenReturn("test-table2");
+ this.add(mock1);
+ this.add(mock2);
+ } };
+
+ @Before
+ public void setUp() {
+ when(defaultSystemDescriptor.getSystemName()).thenReturn(defaultSystemName);
+ mockInputs.forEach(isd -> when(isd.getSystemDescriptor()).thenReturn(defaultSystemDescriptor));
+ mockOutputs.forEach(osd -> when(osd.getSystemDescriptor()).thenReturn(defaultSystemDescriptor));
+ }
+
+ @Test
+ public void testConstructor() {
+ TaskApplication mockApp = mock(TaskApplication.class);
+ TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(mockApp, config);
+ verify(mockApp).describe(appDesc);
+ assertEquals(config, appDesc.config);
+ }
+
+ @Test
+ public void testAddInputStreams() {
+ TaskApplication testApp = appDesc -> {
+ mockInputs.forEach(appDesc::addInputStream);
+ };
+ TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+ assertEquals(mockInputs.toArray(), appDesc.getInputDescriptors().values().toArray());
+ }
+
+ @Test
+ public void testAddOutputStreams() {
+ TaskApplication testApp = appDesc -> {
+ mockOutputs.forEach(appDesc::addOutputStream);
+ };
+ TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+ assertEquals(mockOutputs.toArray(), appDesc.getOutputDescriptors().values().toArray());
+ }
+
+ @Test
+ public void testAddTables() {
+ TaskApplication testApp = appDesc -> {
+ mockTables.forEach(appDesc::addTable);
+ };
+ TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+ assertEquals(mockTables, appDesc.getTableDescriptors());
+ }
+
+ @Test
+ public void testSetTaskFactory() {
+ TaskFactory mockTf = mock(TaskFactory.class);
+ TaskApplication testApp = appDesc -> appDesc.setTaskFactory(mockTf);
+ TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+ assertEquals(appDesc.getTaskFactory(), mockTf);
+ }
+
+ @Test
+ public void testContextManager() {
+ ContextManager cntxMan = mock(ContextManager.class);
+ TaskApplication testApp = appDesc -> {
+ appDesc.withContextManager(cntxMan);
+ };
+ TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+ assertEquals(appDesc.getContextManager(), cntxMan);
+ }
+
+ @Test
+ public void testProcessorLifecycleListener() {
+ ProcessorLifecycleListenerFactory mockFactory = mock(ProcessorLifecycleListenerFactory.class);
+ TaskApplication testApp = appDesc -> {
+ appDesc.withProcessorLifecycleListenerFactory(mockFactory);
+ };
+ TaskApplicationDescriptorImpl appDesc = new TaskApplicationDescriptorImpl(testApp, config);
+ assertEquals(appDesc.getProcessorLifecycleListenerFactory(), mockFactory);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 9912d8b..61cf6c5 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -28,13 +28,13 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.Partition;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
-import org.apache.samza.operators.StreamGraphSpec;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
@@ -107,24 +107,24 @@ public class TestExecutionPlanner {
};
}
- private StreamGraphSpec createSimpleGraph() {
+ private StreamApplicationDescriptorImpl createSimpleGraph() {
/**
* a simple graph of partitionBy and map
*
* input1 -> partitionBy -> map -> output1
*
*/
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream(input1Descriptor);
- OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor);
- input1
- .partitionBy(m -> m.key, m -> m.value, "p1")
- .map(kv -> kv)
- .sendTo(output1);
- return graphSpec;
+ return new StreamApplicationDescriptorImpl(appDesc-> {
+ MessageStream<KV<Object, Object>> input1 = appDesc.getInputStream(input1Descriptor);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+ input1
+ .partitionBy(m -> m.key, m -> m.value, "p1")
+ .map(kv -> kv)
+ .sendTo(output1);
+ }, config);
}
- private StreamGraphSpec createStreamGraphWithJoin() {
+ private StreamApplicationDescriptorImpl createStreamGraphWithJoin() {
/**
* the graph looks like the following. number of partitions in parentheses. quotes indicate expected value.
@@ -136,80 +136,77 @@ public class TestExecutionPlanner {
* input3 (32) -> filter -> partitionBy ("64") -> map -> join -> output2 (16)
*
*/
-
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- MessageStream<KV<Object, Object>> messageStream1 =
- graphSpec.getInputStream(input1Descriptor)
- .map(m -> m);
- MessageStream<KV<Object, Object>> messageStream2 =
- graphSpec.getInputStream(input2Descriptor)
- .partitionBy(m -> m.key, m -> m.value, "p1")
- .filter(m -> true);
- MessageStream<KV<Object, Object>> messageStream3 =
- graphSpec.getInputStream(input3Descriptor)
- .filter(m -> true)
- .partitionBy(m -> m.key, m -> m.value, "p2")
- .map(m -> m);
- OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor);
- OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream(output2Descriptor);
-
- messageStream1
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
- .sendTo(output1);
- messageStream3
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
- .sendTo(output2);
-
- return graphSpec;
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> messageStream1 =
+ appDesc.getInputStream(input1Descriptor)
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> messageStream2 =
+ appDesc.getInputStream(input2Descriptor)
+ .partitionBy(m -> m.key, m -> m.value, "p1")
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> messageStream3 =
+ appDesc.getInputStream(input3Descriptor)
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value, "p2")
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+ OutputStream<KV<Object, Object>> output2 = appDesc.getOutputStream(output2Descriptor);
+
+ messageStream1
+ .join(messageStream2,
+ (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+ .sendTo(output1);
+ messageStream3
+ .join(messageStream2,
+ (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+ .sendTo(output2);
+ }, config);
}
- private StreamGraphSpec createStreamGraphWithJoinAndWindow() {
-
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- MessageStream<KV<Object, Object>> messageStream1 =
- graphSpec.getInputStream(input1Descriptor)
- .map(m -> m);
- MessageStream<KV<Object, Object>> messageStream2 =
- graphSpec.getInputStream(input2Descriptor)
- .partitionBy(m -> m.key, m -> m.value, "p1")
- .filter(m -> true);
- MessageStream<KV<Object, Object>> messageStream3 =
- graphSpec.getInputStream(input3Descriptor)
- .filter(m -> true)
- .partitionBy(m -> m.key, m -> m.value, "p2")
- .map(m -> m);
- OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor);
- OutputStream<KV<Object, Object>> output2 = graphSpec.getOutputStream(output2Descriptor);
-
- messageStream1.map(m -> m)
- .filter(m->true)
- .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
-
- messageStream2.map(m -> m)
- .filter(m->true)
- .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
-
- messageStream1
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1")
- .sendTo(output1);
- messageStream3
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2")
- .sendTo(output2);
- messageStream3
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3")
- .sendTo(output2);
-
- return graphSpec;
+ private StreamApplicationDescriptorImpl createStreamGraphWithJoinAndWindow() {
+
+ return new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> messageStream1 =
+ appDesc.getInputStream(input1Descriptor)
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> messageStream2 =
+ appDesc.getInputStream(input2Descriptor)
+ .partitionBy(m -> m.key, m -> m.value, "p1")
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> messageStream3 =
+ appDesc.getInputStream(input3Descriptor)
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value, "p2")
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+ OutputStream<KV<Object, Object>> output2 = appDesc.getOutputStream(output2Descriptor);
+
+ messageStream1.map(m -> m)
+ .filter(m->true)
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8), mock(Serde.class), mock(Serde.class)), "w1");
+
+ messageStream2.map(m -> m)
+ .filter(m->true)
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16), mock(Serde.class), mock(Serde.class)), "w2");
+
+ messageStream1
+ .join(messageStream2,
+ (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(1600), "j1")
+ .sendTo(output1);
+ messageStream3
+ .join(messageStream2,
+ (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(100), "j2")
+ .sendTo(output2);
+ messageStream3
+ .join(messageStream2,
+ (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofMillis(252), "j3")
+ .sendTo(output2);
+ }, config);
}
@Before
@@ -265,7 +262,7 @@ public class TestExecutionPlanner {
@Test
public void testCreateProcessorGraph() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
assertTrue(jobGraph.getSources().size() == 3);
@@ -276,7 +273,7 @@ public class TestExecutionPlanner {
@Test
public void testFetchExistingStreamPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -294,7 +291,7 @@ public class TestExecutionPlanner {
@Test
public void testCalculateJoinInputPartitions() {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
ExecutionPlanner.updateExistingPartitions(jobGraph, streamManager);
@@ -313,7 +310,7 @@ public class TestExecutionPlanner {
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
- StreamGraphSpec graphSpec = createSimpleGraph();
+ StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
JobGraph jobGraph = planner.createJobGraph(graphSpec.getOperatorSpecGraph());
planner.calculatePartitions(jobGraph);
@@ -330,7 +327,7 @@ public class TestExecutionPlanner {
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
- StreamGraphSpec graphSpec = createStreamGraphWithJoin();
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoin();
ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
List<JobConfig> jobConfigs = plan.getJobConfigs();
for (JobConfig config : jobConfigs) {
@@ -345,7 +342,7 @@ public class TestExecutionPlanner {
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
- StreamGraphSpec graphSpec = createStreamGraphWithJoinAndWindow();
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoinAndWindow();
ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
@@ -362,7 +359,7 @@ public class TestExecutionPlanner {
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
- StreamGraphSpec graphSpec = createStreamGraphWithJoinAndWindow();
+ StreamApplicationDescriptorImpl graphSpec = createStreamGraphWithJoinAndWindow();
ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
@@ -379,7 +376,7 @@ public class TestExecutionPlanner {
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
- StreamGraphSpec graphSpec = createSimpleGraph();
+ StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
@@ -394,7 +391,7 @@ public class TestExecutionPlanner {
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
- StreamGraphSpec graphSpec = createSimpleGraph();
+ StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
@@ -404,7 +401,7 @@ public class TestExecutionPlanner {
@Test
public void testCalculateIntStreamPartitions() throws Exception {
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraphSpec graphSpec = createSimpleGraph();
+ StreamApplicationDescriptorImpl graphSpec = createSimpleGraph();
JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph());
// the partitions should be the same as input1
@@ -437,11 +434,12 @@ public class TestExecutionPlanner {
int partitionLimit = ExecutionPlanner.MAX_INFERRED_PARTITIONS;
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ MessageStream<KV<Object, Object>> input1 = appDesc.getInputStream(input4Descriptor);
+ OutputStream<KV<Object, Object>> output1 = appDesc.getOutputStream(output1Descriptor);
+ input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1);
+ }, config);
- MessageStream<KV<Object, Object>> input1 = graphSpec.getInputStream(input4Descriptor);
- OutputStream<KV<Object, Object>> output1 = graphSpec.getOutputStream(output1Descriptor);
- input1.partitionBy(m -> m.key, m -> m.value, "p1").map(kv -> kv).sendTo(output1);
JobGraph jobGraph = (JobGraph) planner.plan(graphSpec.getOperatorSpecGraph());
// the partitions should be the same as input1
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 960693f..ae6e25e 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -22,13 +22,13 @@ package org.apache.samza.execution;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphSpec;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -102,43 +102,44 @@ public class TestJobGraphJsonGenerator {
when(systemAdmins.getSystemAdmin("system2")).thenReturn(systemAdmin2);
StreamManager streamManager = new StreamManager(systemAdmins);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new NoOpSerde());
- String mockSystemFactoryClass = "factory.class.name";
- GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", mockSystemFactoryClass);
- GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", mockSystemFactoryClass);
- GenericInputDescriptor<KV<Object, Object>> input1Descriptor = system1.getInputDescriptor("input1", kvSerde);
- GenericInputDescriptor<KV<Object, Object>> input2Descriptor = system2.getInputDescriptor("input2", kvSerde);
- GenericInputDescriptor<KV<Object, Object>> input3Descriptor = system2.getInputDescriptor("input3", kvSerde);
- GenericOutputDescriptor<KV<Object, Object>> output1Descriptor = system1.getOutputDescriptor("output1", kvSerde);
- GenericOutputDescriptor<KV<Object, Object>> output2Descriptor = system2.getOutputDescriptor("output2", kvSerde);
-
- MessageStream<KV<Object, Object>> messageStream1 =
- graphSpec.getInputStream(input1Descriptor)
- .map(m -> m);
- MessageStream<KV<Object, Object>> messageStream2 =
- graphSpec.getInputStream(input2Descriptor)
- .partitionBy(m -> m.key, m -> m.value, "p1")
- .filter(m -> true);
- MessageStream<KV<Object, Object>> messageStream3 =
- graphSpec.getInputStream(input3Descriptor)
- .filter(m -> true)
- .partitionBy(m -> m.key, m -> m.value, "p2")
- .map(m -> m);
- OutputStream<KV<Object, Object>> outputStream1 = graphSpec.getOutputStream(output1Descriptor);
- OutputStream<KV<Object, Object>> outputStream2 = graphSpec.getOutputStream(output2Descriptor);
-
- messageStream1
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
- .sendTo(outputStream1);
- messageStream2.sink((message, collector, coordinator) -> { });
- messageStream3
- .join(messageStream2,
- (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
- mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
- .sendTo(outputStream2);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ KVSerde<Object, Object> kvSerde = new KVSerde<>(new NoOpSerde(), new NoOpSerde());
+ String mockSystemFactoryClass = "factory.class.name";
+ GenericSystemDescriptor system1 = new GenericSystemDescriptor("system1", mockSystemFactoryClass);
+ GenericSystemDescriptor system2 = new GenericSystemDescriptor("system2", mockSystemFactoryClass);
+ GenericInputDescriptor<KV<Object, Object>> input1Descriptor = system1.getInputDescriptor("input1", kvSerde);
+ GenericInputDescriptor<KV<Object, Object>> input2Descriptor = system2.getInputDescriptor("input2", kvSerde);
+ GenericInputDescriptor<KV<Object, Object>> input3Descriptor = system2.getInputDescriptor("input3", kvSerde);
+ GenericOutputDescriptor<KV<Object, Object>> output1Descriptor = system1.getOutputDescriptor("output1", kvSerde);
+ GenericOutputDescriptor<KV<Object, Object>> output2Descriptor = system2.getOutputDescriptor("output2", kvSerde);
+
+ MessageStream<KV<Object, Object>> messageStream1 =
+ appDesc.getInputStream(input1Descriptor)
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> messageStream2 =
+ appDesc.getInputStream(input2Descriptor)
+ .partitionBy(m -> m.key, m -> m.value, "p1")
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> messageStream3 =
+ appDesc.getInputStream(input3Descriptor)
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value, "p2")
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> outputStream1 = appDesc.getOutputStream(output1Descriptor);
+ OutputStream<KV<Object, Object>> outputStream2 = appDesc.getOutputStream(output2Descriptor);
+
+ messageStream1
+ .join(messageStream2,
+ (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(2), "j1")
+ .sendTo(outputStream1);
+ messageStream2.sink((message, collector, coordinator) -> { });
+ messageStream3
+ .join(messageStream2,
+ (JoinFunction<Object, KV<Object, Object>, KV<Object, Object>, KV<Object, Object>>) mock(JoinFunction.class),
+ mock(Serde.class), mock(Serde.class), mock(Serde.class), Duration.ofHours(1), "j2")
+ .sendTo(outputStream2);
+ }, config);
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
@@ -177,27 +178,28 @@ public class TestJobGraphJsonGenerator {
when(systemAdmins.getSystemAdmin("kafka")).thenReturn(systemAdmin2);
StreamManager streamManager = new StreamManager(systemAdmins);
- StreamGraphSpec graphSpec = new StreamGraphSpec(config);
- KVSerde<String, PageViewEvent> pvSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class));
- GenericSystemDescriptor isd = new GenericSystemDescriptor("hdfs", "mockSystemFactoryClass");
- GenericInputDescriptor<KV<String, PageViewEvent>> pageView = isd.getInputDescriptor("PageView", pvSerde);
-
- KVSerde<String, Long> pvcSerde = KVSerde.of(new StringSerde(), new LongSerde());
- GenericSystemDescriptor osd = new GenericSystemDescriptor("kafka", "mockSystemFactoryClass");
- GenericOutputDescriptor<KV<String, Long>> pageViewCount = osd.getOutputDescriptor("PageViewCount", pvcSerde);
-
- MessageStream<KV<String, PageViewEvent>> inputStream = graphSpec.getInputStream(pageView);
- OutputStream<KV<String, Long>> outputStream = graphSpec.getOutputStream(pageViewCount);
- inputStream
- .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), "keyed-by-country")
- .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(),
- Duration.ofSeconds(10L),
- () -> 0L,
- (m, c) -> c + 1L,
- new StringSerde(),
- new LongSerde()), "count-by-country")
- .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
- .sendTo(outputStream);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ KVSerde<String, PageViewEvent> pvSerde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class));
+ GenericSystemDescriptor isd = new GenericSystemDescriptor("hdfs", "mockSystemFactoryClass");
+ GenericInputDescriptor<KV<String, PageViewEvent>> pageView = isd.getInputDescriptor("PageView", pvSerde);
+
+ KVSerde<String, Long> pvcSerde = KVSerde.of(new StringSerde(), new LongSerde());
+ GenericSystemDescriptor osd = new GenericSystemDescriptor("kafka", "mockSystemFactoryClass");
+ GenericOutputDescriptor<KV<String, Long>> pageViewCount = osd.getOutputDescriptor("PageViewCount", pvcSerde);
+
+ MessageStream<KV<String, PageViewEvent>> inputStream = appDesc.getInputStream(pageView);
+ OutputStream<KV<String, Long>> outputStream = appDesc.getOutputStream(pageViewCount);
+ inputStream
+ .partitionBy(kv -> kv.getValue().getCountry(), kv -> kv.getValue(), "keyed-by-country")
+ .window(Windows.keyedTumblingWindow(kv -> kv.getValue().getCountry(),
+ Duration.ofSeconds(10L),
+ () -> 0L,
+ (m, c) -> c + 1L,
+ new StringSerde(),
+ new LongSerde()), "count-by-country")
+ .map(pane -> new KV<>(pane.getKey().getKey(), pane.getMessage()))
+ .sendTo(outputStream);
+ }, config);
ExecutionPlanner planner = new ExecutionPlanner(config, streamManager);
ExecutionPlan plan = planner.plan(graphSpec.getOperatorSpecGraph());
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
index 864c3fc..163b094 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -24,6 +24,7 @@ import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
@@ -31,7 +32,6 @@ import org.apache.samza.config.SerializerConfig;
import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
-import org.apache.samza.operators.StreamGraphSpec;
import org.apache.samza.operators.descriptors.GenericInputDescriptor;
import org.apache.samza.operators.descriptors.GenericOutputDescriptor;
import org.apache.samza.operators.descriptors.GenericSystemDescriptor;
@@ -49,8 +49,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
public class TestJobNode {
@@ -66,23 +65,24 @@ public class TestJobNode {
when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
- KVSerde<String, Object> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>());
- GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClass");
- GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = sd.getInputDescriptor("input1", serde);
- GenericInputDescriptor<KV<String, Object>> inputDescriptor2 = sd.getInputDescriptor("input2", serde);
- GenericOutputDescriptor<KV<String, Object>> outputDescriptor = sd.getOutputDescriptor("output", serde);
- MessageStream<KV<String, Object>> input1 = graphSpec.getInputStream(inputDescriptor1);
- MessageStream<KV<String, Object>> input2 = graphSpec.getInputStream(inputDescriptor2);
- OutputStream<KV<String, Object>> output = graphSpec.getOutputStream(outputDescriptor);
- JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
- input1
- .partitionBy(KV::getKey, KV::getValue, serde, "p1")
- .map(kv -> kv.value)
- .join(input2.map(kv -> kv.value), mockJoinFn,
- new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class),
- Duration.ofHours(1), "j1")
- .sendTo(output);
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ KVSerde<String, Object> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>());
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClass");
+ GenericInputDescriptor<KV<String, Object>> inputDescriptor1 = sd.getInputDescriptor("input1", serde);
+ GenericInputDescriptor<KV<String, Object>> inputDescriptor2 = sd.getInputDescriptor("input2", serde);
+ GenericOutputDescriptor<KV<String, Object>> outputDescriptor = sd.getOutputDescriptor("output", serde);
+ MessageStream<KV<String, Object>> input1 = appDesc.getInputStream(inputDescriptor1);
+ MessageStream<KV<String, Object>> input2 = appDesc.getInputStream(inputDescriptor2);
+ OutputStream<KV<String, Object>> output = appDesc.getOutputStream(outputDescriptor);
+ JoinFunction<String, Object, Object, KV<String, Object>> mockJoinFn = mock(JoinFunction.class);
+ input1
+ .partitionBy(KV::getKey, KV::getValue, serde, "p1")
+ .map(kv -> kv.value)
+ .join(input2.map(kv -> kv.value), mockJoinFn,
+ new StringSerde(), new JsonSerdeV2<>(Object.class), new JsonSerdeV2<>(Object.class),
+ Duration.ofHours(1), "j1")
+ .sendTo(output);
+ }, mockConfig);
JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig);
Config config = new MapConfig();
@@ -188,12 +188,13 @@ public class TestJobNode {
when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("jobName");
when(mockConfig.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
- StreamGraphSpec graphSpec = new StreamGraphSpec(mockConfig);
- GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClassName");
- GenericInputDescriptor<KV<String, Object>> inputDescriptor1 =
- sd.getInputDescriptor("input", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
- MessageStream<KV<String, Object>> input = graphSpec.getInputStream(inputDescriptor1);
- input.partitionBy(KV::getKey, KV::getValue, "p1");
+ StreamApplicationDescriptorImpl graphSpec = new StreamApplicationDescriptorImpl(appDesc -> {
+ GenericSystemDescriptor sd = new GenericSystemDescriptor("system1", "mockSystemFactoryClassName");
+ GenericInputDescriptor<KV<String, Object>> inputDescriptor1 =
+ sd.getInputDescriptor("input", KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+ MessageStream<KV<String, Object>> input = appDesc.getInputStream(inputDescriptor1);
+ input.partitionBy(KV::getKey, KV::getValue, "p1");
+ }, mockConfig);
JobNode jobNode = new JobNode("jobName", "jobId", graphSpec.getOperatorSpecGraph(), mockConfig);
Config config = new MapConfig();
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
new file mode 100644
index 0000000..9ed57fa
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationUtilsFactory;
+import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit tests for {@link LocalJobPlanner}
+ *
+ * TODO: consolidate this with unit tests for ExecutionPlanner after SAMZA-1811
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LocalJobPlanner.class, JobCoordinatorConfig.class})
+public class TestLocalJobPlanner {
+
+ private static final String PLAN_JSON =
+ "{" + "\"jobs\":[{" + "\"jobName\":\"test-application\"," + "\"jobId\":\"1\"," + "\"operatorGraph\":{"
+ + "\"intermediateStreams\":{%s}," + "\"applicationName\":\"test-application\",\"applicationId\":\"1\"}";
+ private static final String STREAM_SPEC_JSON_FORMAT =
+ "\"%s\":{" + "\"streamSpec\":{" + "\"id\":\"%s\"," + "\"systemName\":\"%s\"," + "\"physicalName\":\"%s\","
+ + "\"partitionCount\":2}," + "\"sourceJobs\":[\"test-app\"]," + "\"targetJobs\":[\"test-target-app\"]},";
+
+ private LocalJobPlanner localPlanner;
+
+ @Test
+ public void testStreamCreation()
+ throws Exception {
+ localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+ StreamManager streamManager = mock(StreamManager.class);
+ doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class));
+
+ ExecutionPlan plan = mock(ExecutionPlan.class);
+ when(plan.getIntermediateStreams()).thenReturn(
+ Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
+ when(plan.getPlanAsJson()).thenReturn("");
+ when(plan.getJobConfigs()).thenReturn(Collections.singletonList(mock(JobConfig.class)));
+ doReturn(plan).when(localPlanner).getExecutionPlan(any());
+
+ CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
+ JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
+ when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
+ PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
+
+ localPlanner.prepareJobs();
+
+ ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+ verify(streamManager).createStreams(captor.capture());
+ List<StreamSpec> streamSpecs = captor.getValue();
+ assertEquals(streamSpecs.size(), 1);
+ assertEquals(streamSpecs.get(0).getId(), "test-stream");
+ verify(streamManager).stop();
+ }
+
+ @Test
+ public void testStreamCreationWithCoordination()
+ throws Exception {
+ localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+ StreamManager streamManager = mock(StreamManager.class);
+ doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class));
+
+ ExecutionPlan plan = mock(ExecutionPlan.class);
+ when(plan.getIntermediateStreams()).thenReturn(Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
+ when(plan.getPlanAsJson()).thenReturn("");
+ when(plan.getJobConfigs()).thenReturn(Collections.singletonList(mock(JobConfig.class)));
+ doReturn(plan).when(localPlanner).getExecutionPlan(any());
+
+ CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
+ CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
+ JobCoordinatorConfig mockJcConfig = mock(JobCoordinatorConfig.class);
+ when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
+ PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
+
+ DistributedLockWithState lock = mock(DistributedLockWithState.class);
+ when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);
+ when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock);
+ when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject()))
+ .thenReturn(coordinationUtils);
+
+ localPlanner.prepareJobs();
+
+ ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+ verify(streamManager).createStreams(captor.capture());
+
+ List<StreamSpec> streamSpecs = captor.getValue();
+ assertEquals(streamSpecs.size(), 1);
+ assertEquals(streamSpecs.get(0).getId(), "test-stream");
+ verify(streamManager).stop();
+ }
+
+ /**
+ * A test case to verify if the plan results in different hash if there is change in topological sort order.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testPlanIdWithShuffledStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+ List<StreamSpec> shuffledStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+
+ assertFalse("Expected both of the latch ids to be different",
+ planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs)));
+ }
+
+ /**
+ * A test case to verify if the plan results in same hash in case of same plan.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testGeneratePlanIdWithSameStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdForFirstAttempt = getExecutionPlanId(streamSpecs);
+ String planIdForSecondAttempt = getExecutionPlanId(streamSpecs);
+
+ assertEquals("Expected latch ids to match!", "1447946713", planIdForFirstAttempt);
+ assertEquals("Expected latch ids to match for the second attempt!", planIdForFirstAttempt, planIdForSecondAttempt);
+ }
+
+ /**
+ * A test case to verify plan results in different hash in case of different intermediate stream.
+ * Note: the overall JOB PLAN remains the same outside the scope of intermediate streams the sake of these test cases.
+ */
+ @Test
+ public void testGeneratePlanIdWithDifferentStreamSpecs() {
+ List<StreamSpec> streamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-2", "stream-2", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+ String planIdBeforeShuffle = getExecutionPlanId(streamSpecs);
+
+ List<StreamSpec> updatedStreamSpecs = ImmutableList.of(new StreamSpec("test-stream-1", "stream-1", "testStream"),
+ new StreamSpec("test-stream-4", "stream-4", "testStream"),
+ new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
+
+ assertFalse("Expected both of the latch ids to be different",
+ planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
+ }
+
+ private LocalJobPlanner createLocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ return spy(new LocalJobPlanner(appDesc));
+ }
+
+ private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
+ String intermediateStreamJson =
+ updatedStreamSpecs.stream().map(this::streamSpecToJson).collect(Collectors.joining(","));
+
+ int planId = String.format(PLAN_JSON, intermediateStreamJson).hashCode();
+
+ return String.valueOf(planId);
+ }
+
+ private String streamSpecToJson(StreamSpec streamSpec) {
+ return String.format(STREAM_SPEC_JSON_FORMAT, streamSpec.getId(), streamSpec.getId(), streamSpec.getSystemName(),
+ streamSpec.getPhysicalName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/abf49eaa/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
new file mode 100644
index 0000000..988fb34
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestRemoteJobPlanner.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.execution;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.samza.application.ApplicationDescriptor;
+import org.apache.samza.application.ApplicationDescriptorImpl;
+import org.apache.samza.application.StreamApplicationDescriptorImpl;
+import org.apache.samza.config.ApplicationConfig;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit tests for {@link RemoteJobPlanner}
+ *
+ * TODO: consolidate this with unit tests for ExecutionPlanner after SAMZA-1811
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(RemoteJobPlanner.class)
+public class TestRemoteJobPlanner {
+
+ private RemoteJobPlanner remotePlanner;
+
+ @Test
+ public void testStreamCreation()
+ throws Exception {
+ remotePlanner = createRemoteJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+ StreamManager streamManager = mock(StreamManager.class);
+ doReturn(streamManager).when(remotePlanner).buildAndStartStreamManager(any(Config.class));
+
+ ExecutionPlan plan = mock(ExecutionPlan.class);
+ when(plan.getIntermediateStreams()).thenReturn(
+ Collections.singletonList(new StreamSpec("test-stream", "test-stream", "test-system")));
+ when(plan.getPlanAsJson()).thenReturn("");
+ when(plan.getJobConfigs()).thenReturn(Collections.singletonList(mock(JobConfig.class)));
+ ApplicationConfig mockAppConfig = mock(ApplicationConfig.class);
+ when(mockAppConfig.getAppMode()).thenReturn(ApplicationConfig.ApplicationMode.STREAM);
+ when(plan.getApplicationConfig()).thenReturn(mockAppConfig);
+ doReturn(plan).when(remotePlanner).getExecutionPlan(any(), any());
+
+ remotePlanner.prepareJobs();
+
+ verify(streamManager, times(0)).clearStreamsFromPreviousRun(any());
+ ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
+ verify(streamManager).createStreams(captor.capture());
+ List<StreamSpec> streamSpecs = captor.getValue();
+ assertEquals(streamSpecs.size(), 1);
+ assertEquals(streamSpecs.get(0).getId(), "test-stream");
+ verify(streamManager).stop();
+ }
+
+ private RemoteJobPlanner createRemoteJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
+ return spy(new RemoteJobPlanner(appDesc));
+ }
+}