You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2018/09/19 19:21:21 UTC
samza git commit: SAMZA-1840: Refactor TestRunner Apis to use
StreamDescriptor and SystemDescriptor
Repository: samza
Updated Branches:
refs/heads/master 3bb24c8ee -> 1755268cf
SAMZA-1840: Refactor TestRunner Apis to use StreamDescriptor and SystemDescriptor
CollectionStream -> InMemoryInputDescriptor & InMemoryOutputDescriptor
CollectionStreamSystemSpec -> InMemorySystemDescriptor
Author: Sanil Jain <sn...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>, Cameron Lee <ca...@linkedin.com>
Closes #634 from Sanil15/SAMZA-1840
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1755268c
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1755268c
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1755268c
Branch: refs/heads/master
Commit: 1755268cff201663a41ca06e9dcc4602d41fc306
Parents: 3bb24c8
Author: Sanil Jain <sn...@linkedin.com>
Authored: Wed Sep 19 12:21:12 2018 -0700
Committer: Prateek Maheshwari <pm...@apache.org>
Committed: Wed Sep 19 12:21:12 2018 -0700
----------------------------------------------------------------------
.../samza/example/PageViewCounterExample.java | 1 -
.../samza/test/framework/StreamAssert.java | 73 ++---
.../apache/samza/test/framework/TestRunner.java | 286 +++++++++----------
.../test/framework/stream/CollectionStream.java | 204 -------------
.../system/CollectionStreamSystemSpec.java | 90 ------
.../system/InMemoryInputDescriptor.java | 42 +++
.../system/InMemoryOutputDescriptor.java | 46 +++
.../system/InMemorySystemDescriptor.java | 118 ++++++++
.../AsyncStreamTaskIntegrationTest.java | 108 ++++---
.../StreamApplicationIntegrationTest.java | 45 ++-
.../framework/StreamTaskIntegrationTest.java | 112 +++++---
.../table/TestLocalTableWithSideInputs.java | 32 ++-
12 files changed, 573 insertions(+), 584 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
index b540585..e2ebc93 100644
--- a/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-test/src/main/java/org/apache/samza/example/PageViewCounterExample.java
@@ -53,7 +53,6 @@ public class PageViewCounterExample implements StreamApplication {
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
PageViewCounterExample app = new PageViewCounterExample();
ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
-
runner.run();
runner.waitForFinish();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
index 9972d7f..42379f3 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/StreamAssert.java
@@ -22,9 +22,9 @@ package org.apache.samza.test.framework;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.util.stream.Collectors;
-import org.apache.samza.test.framework.stream.CollectionStream;
import java.util.List;
import java.util.Map;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.hamcrest.collection.IsIterableContainingInOrder;
@@ -32,22 +32,24 @@ import static org.junit.Assert.assertThat;
/**
- * Assertion utils non the content of a {@link CollectionStream}.
+ * Assertion utils on the content of a stream described by
+ * {@link org.apache.samza.operators.descriptors.base.stream.StreamDescriptor}.
*/
public class StreamAssert {
/**
- * Util to assert presence of messages in a stream with single partition in any order
+ * Verifies that the {@code expected} messages are present in any order in the single partition stream
+ * represented by {@code outputDescriptor}
*
- * @param collectionStream represents the actual stream which will be consumed to compare against expected list
- * @param expected represents the expected stream of messages
+ * @param expected expected stream of messages
+ * @param outputDescriptor describes the stream which will be consumed to compare against expected list
* @param timeout maximum time to wait for consuming the stream
- * @param <M> represents the type of Message in the stream
+ * @param <StreamMessageType> type of messages in the stream
* @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages
*/
- public static <M> void containsInAnyOrder(CollectionStream<M> collectionStream, final List<M> expected, Duration timeout)
- throws InterruptedException {
- Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream");
- assertThat(TestRunner.consumeStream(collectionStream, timeout)
+ public static <StreamMessageType> void containsInAnyOrder(List<StreamMessageType> expected,
+ InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException {
+ Preconditions.checkNotNull(outputDescriptor);
+ assertThat(TestRunner.consumeStream(outputDescriptor, timeout)
.entrySet()
.stream()
.flatMap(entry -> entry.getValue().stream())
@@ -55,19 +57,20 @@ public class StreamAssert {
}
/**
- * Util to assert presence of messages in a stream with multiple partition in any order
+ * Verifies that the {@code expected} messages are present in any order in the multi partition stream
+ * represented by {@code outputDescriptor}
*
- * @param collectionStream represents the actual stream which will be consumed to compare against expected partition map
- * @param expected represents a map of partitionId as key and list of messages in stream as value
+ * @param expected map of partitionId as key and list of messages in stream as value
+ * @param outputDescriptor describes the stream which will be consumed to compare against expected partition map
* @param timeout maximum time to wait for consuming the stream
- * @param <M> represents the type of Message in the stream
+ * @param <StreamMessageType> type of messages in the stream
* @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages
*
*/
- public static <M> void containsInAnyOrder(CollectionStream<M> collectionStream, final Map<Integer, List<M>> expected,
- Duration timeout) throws InterruptedException {
- Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream");
- Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, timeout);
+ public static <StreamMessageType> void containsInAnyOrder(Map<Integer, List<StreamMessageType>> expected,
+ InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException {
+ Preconditions.checkNotNull(outputDescriptor);
+ Map<Integer, List<StreamMessageType>> actual = TestRunner.consumeStream(outputDescriptor, timeout);
for (Integer paritionId : expected.keySet()) {
assertThat(actual.get(paritionId),
IsIterableContainingInAnyOrder.containsInAnyOrder(expected.get(paritionId).toArray()));
@@ -75,18 +78,19 @@ public class StreamAssert {
}
/**
- * Util to assert ordering of messages in a stream with single partition
+ * Verifies that the {@code expected} messages are present in order in the single partition stream
+ * represented by {@code outputDescriptor}
*
- * @param collectionStream represents the actual stream which will be consumed to compare against expected list
- * @param expected represents the expected stream of messages
+ * @param expected expected stream of messages
+ * @param outputDescriptor describes the stream which will be consumed to compare against expected list
* @param timeout maximum time to wait for consuming the stream
- * @param <M> represents the type of Message in the stream
+ * @param <StreamMessageType> type of messages in the stream
* @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages
*/
- public static <M> void containsInOrder(CollectionStream<M> collectionStream, final List<M> expected, Duration timeout)
- throws InterruptedException {
- Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream");
- assertThat(TestRunner.consumeStream(collectionStream, timeout)
+ public static <StreamMessageType> void containsInOrder(List<StreamMessageType> expected,
+ InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException {
+ Preconditions.checkNotNull(outputDescriptor);
+ assertThat(TestRunner.consumeStream(outputDescriptor, timeout)
.entrySet()
.stream()
.flatMap(entry -> entry.getValue().stream())
@@ -94,18 +98,19 @@ public class StreamAssert {
}
/**
- * Util to assert ordering of messages in a multi-partitioned stream
+ * Verifies that the {@code expected} messages are present in order in the multi partition stream
+ * represented by {@code outputDescriptor}
*
- * @param collectionStream represents the actual stream which will be consumed to compare against expected partition map
- * @param expected represents a map of partitionId as key and list of messages as value
+ * @param expected map of partitionId as key and list of messages as value
+ * @param outputDescriptor describes the stream which will be consumed to compare against expected partition map
* @param timeout maximum time to wait for consuming the stream
- * @param <M> represents the type of Message in the stream
+ * @param <StreamMessageType> type of messages in the stream
* @throws InterruptedException when {@code consumeStream} is interrupted by another thread during polling messages
*/
- public static <M> void containsInOrder(CollectionStream<M> collectionStream, final Map<Integer, List<M>> expected,
- Duration timeout) throws InterruptedException {
- Preconditions.checkNotNull(collectionStream, "This util is intended to use only on CollectionStream");
- Map<Integer, List<M>> actual = TestRunner.consumeStream(collectionStream, timeout);
+ public static <StreamMessageType> void containsInOrder(Map<Integer, List<StreamMessageType>> expected,
+ InMemoryOutputDescriptor<StreamMessageType> outputDescriptor, Duration timeout) throws InterruptedException {
+ Preconditions.checkNotNull(outputDescriptor);
+ Map<Integer, List<StreamMessageType>> actual = TestRunner.consumeStream(outputDescriptor, timeout);
for (Integer paritionId : expected.keySet()) {
assertThat(actual.get(paritionId), IsIterableContainingInOrder.contains(expected.get(paritionId).toArray()));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index 033bcdf..5c4ba3b 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -35,7 +35,6 @@ import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.application.TaskApplication;
import org.apache.samza.config.Config;
-import org.apache.samza.config.InMemorySystemConfig;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.MapConfig;
@@ -62,58 +61,47 @@ import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.task.TaskFactory;
-import org.apache.samza.test.framework.stream.CollectionStream;
-import org.apache.samza.test.framework.system.CollectionStreamSystemSpec;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
import org.junit.Assert;
/**
- * TestRunner provides apis to quickly set up tests for Samza low level and high level apis. Default running mode
- * for test is Single container without any distributed coordination service. Test runner maintains global job config
- * {@code configs} that are used to run the Samza job
+ * TestRunner provides APIs to set up integration tests for a Samza application.
+ * Running mode for test is Single container mode
+ * Test sets following configuration for the application
*
- * For single container mode following configs are set by default
+ * The following configs are set by default
* <ol>
* <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li>
* <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li>
* <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
* <li>"job.name" = "test-samza"</li>
* <li>"processor.id" = "1"</li>
- * <li>"inmemory.scope = " Scope id generated to isolate the run for InMemorySystem</li>
* </ol>
*
*/
public class TestRunner {
-
- private static final String JOB_NAME = "test-samza";
- public enum Mode {
- SINGLE_CONTAINER, MULTI_CONTAINER
- }
+ public static final String JOB_NAME = "samza-test";
private Map<String, String> configs;
- private Map<String, CollectionStreamSystemSpec> systems;
private Class taskClass;
private StreamApplication app;
- private String testId;
- private SystemFactory factory;
-
- /**
- * Mode defines single or multi container running configuration, by default a single container configuration is assumed
+ /*
+ * inMemoryScope is a unique global key per TestRunner, this key when configured with {@link InMemorySystemDescriptor}
+ * provides an isolated state to run with in memory system
*/
- private Mode mode;
+ private String inMemoryScope;
private TestRunner() {
- this.testId = RandomStringUtils.random(10, true, true);
- this.systems = new HashMap<String, CollectionStreamSystemSpec>();
this.configs = new HashMap<>();
- this.mode = Mode.SINGLE_CONTAINER;
- this.factory = new InMemorySystemFactory();
- configs.put(InMemorySystemConfig.INMEMORY_SCOPE, testId);
+ this.inMemoryScope = RandomStringUtils.random(10, true, true);
configs.put(JobConfig.JOB_NAME(), JOB_NAME);
- configs.putIfAbsent(JobConfig.PROCESSOR_ID(), "1");
- configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
- configs.putIfAbsent(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
- configs.putIfAbsent(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
+ configs.put(JobConfig.PROCESSOR_ID(), "1");
+ configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
+ configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
+ configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
}
/**
@@ -129,7 +117,7 @@ public class TestRunner {
/**
* Constructs a new {@link TestRunner} from following components
- * @param app represent a class containing Samza job logic implementing {@link StreamApplication}
+ * @param app samza job implementing {@link StreamApplication}
*/
private TestRunner(StreamApplication app) {
this();
@@ -138,20 +126,9 @@ public class TestRunner {
}
/**
- * Registers a system with TestRunner if not already registered and configures all the system configs to global
- * job configs
- */
- private void registerSystem(String systemName) {
- if (!systems.containsKey(systemName)) {
- systems.put(systemName, CollectionStreamSystemSpec.create(systemName, JOB_NAME));
- configs.putAll(systems.get(systemName).getSystemConfigs());
- }
- }
-
- /**
* Creates an instance of {@link TestRunner} for Low Level Samza Api
- * @param taskClass represent a class extending either {@link StreamTask} or {@link AsyncStreamTask}
- * @return a {@link TestRunner} for {@code taskClass}
+ * @param taskClass samza job extending either {@link StreamTask} or {@link AsyncStreamTask}
+ * @return this {@link TestRunner}
*/
public static TestRunner of(Class taskClass) {
Preconditions.checkNotNull(taskClass);
@@ -162,8 +139,8 @@ public class TestRunner {
/**
* Creates an instance of {@link TestRunner} for High Level/Fluent Samza Api
- * @param app represent a class representing Samza job by implementing {@link StreamApplication}
- * @return a {@link TestRunner} for {@code app}
+ * @param app samza job implementing {@link StreamApplication}
+ * @return this {@link TestRunner}
*/
public static TestRunner of(StreamApplication app) {
Preconditions.checkNotNull(app);
@@ -171,11 +148,11 @@ public class TestRunner {
}
/**
- * Only adds a config from {@code config} to global {@code configs} if they dont exist in it.
- * @param config represents the {@link Config} supposed to be added to global configs
- * @return calling instance of {@link TestRunner} with added configs if they don't exist
+ * Only adds a config from {@code config} to samza job {@code configs} if they dont exist in it.
+ * @param config configs for the application
+ * @return this {@link TestRunner}
*/
- public TestRunner addConfigs(Config config) {
+ public TestRunner addConfigs(Map<String, String> config) {
Preconditions.checkNotNull(config);
config.forEach(this.configs::putIfAbsent);
return this;
@@ -186,7 +163,7 @@ public class TestRunner {
* exisiting in {@code configs}
* @param key key of the config
* @param value value of the config
- * @return calling instance of {@link TestRunner} with added config
+ * @return this {@link TestRunner}
*/
public TestRunner addOverrideConfig(String key, String value) {
Preconditions.checkNotNull(key);
@@ -197,94 +174,72 @@ public class TestRunner {
}
/**
- * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs.
- * <p>
- * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with
- * {@link TestRunner} if not registered already. Then it creates and initializes the stream partitions with messages for
- * the registered System
- * <p>
- * @param stream represents the stream that is supposed to be configured with {@link TestRunner}
- * @return calling instance of {@link TestRunner} with {@code stream} configured with it
+ * Adds the provided input stream with mock data to the test application.
+ *
+ * @param descriptor describes the stream that is supposed to be input to Samza application
+ * @param messages messages used to initialize the single partition stream
+ * @param <StreamMessageType> a message with null key or a KV {@link org.apache.samza.operators.KV}.
+ * key of KV represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or
+ * {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message
+ * @return this {@link TestRunner}
*/
- public TestRunner addInputStream(CollectionStream stream) {
- Preconditions.checkNotNull(stream);
- registerSystem(stream.getSystemName());
- initializeInput(stream);
- stream.setTestId(testId);
- if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
- configs.put(TaskConfig.INPUT_STREAMS(),
- configs.get(TaskConfig.INPUT_STREAMS()).concat("," + stream.getSystemName() + "." + stream.getPhysicalName()));
- } else {
- configs.put(TaskConfig.INPUT_STREAMS(), stream.getSystemName() + "." + stream.getPhysicalName());
- }
- stream.getStreamConfig().forEach((key, val) -> {
- configs.putIfAbsent((String) key, (String) val);
- });
-
+ public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor descriptor,
+ List<StreamMessageType> messages) {
+ Preconditions.checkNotNull(descriptor, messages);
+ Map<Integer, Iterable<StreamMessageType>> partitionData = new HashMap<Integer, Iterable<StreamMessageType>>();
+ partitionData.put(0, messages);
+ initializeInMemoryInputStream(descriptor, partitionData);
return this;
}
/**
- * Creates an in memory stream with {@link InMemorySystemFactory} and initializes the metadata for the stream.
- * Initializes each partition of that stream with messages from {@code stream.getInitPartitions}
- *
- * @param stream represents the stream to initialize with the in memory system
- * @param <T> can represent a message or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a
- * {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope}
- * and value represents the message
+ * Adds the provided input stream with mock data to the test application.
+ * @param descriptor describes the stream that is supposed to be input to Samza application
+ * @param messages map whose key is partitionId and value is messages in the partition
+ * @param <StreamMessageType> message with null key or a KV {@link org.apache.samza.operators.KV}.
+ * A key of which represents key of {@link org.apache.samza.system.IncomingMessageEnvelope} or
+ * {@link org.apache.samza.system.OutgoingMessageEnvelope} and value is message
+ * @return this {@link TestRunner}
*/
- private <T> void initializeInput(CollectionStream stream) {
- Preconditions.checkNotNull(stream);
- Preconditions.checkState(stream.getInitPartitions().size() >= 1);
- String streamName = stream.getStreamName();
- String systemName = stream.getSystemName();
- Map<Integer, Iterable<T>> partitions = stream.getInitPartitions();
- StreamSpec spec = new StreamSpec(streamName, stream.getPhysicalName(), systemName, partitions.size());
- factory.getAdmin(systemName, new MapConfig(configs)).createStream(spec);
- SystemProducer producer = factory.getProducer(systemName, new MapConfig(configs), null);
- partitions.forEach((partitionId, partition) -> {
- partition.forEach(e -> {
- Object key = e instanceof KV ? ((KV) e).getKey() : null;
- Object value = e instanceof KV ? ((KV) e).getValue() : e;
- producer.send(systemName,
- new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), key,
- value));
- });
- producer.send(systemName,
- new OutgoingMessageEnvelope(new SystemStream(systemName, stream.getPhysicalName()), Integer.valueOf(partitionId), null,
- new EndOfStreamMessage(null)));
- });
+ public <StreamMessageType> TestRunner addInputStream(InMemoryInputDescriptor descriptor,
+ Map<Integer, ? extends Iterable<StreamMessageType>> messages) {
+ Preconditions.checkNotNull(descriptor, messages);
+ Map<Integer, Iterable<StreamMessageType>> partitionData = new HashMap<Integer, Iterable<StreamMessageType>>();
+ partitionData.putAll(messages);
+ initializeInMemoryInputStream(descriptor, partitionData);
+ return this;
}
/**
- * Configures {@code stream} with the TestRunner, adds all the stream specific configs to global job configs.
- * <p>
- * Every stream belongs to a System (here a {@link CollectionStreamSystemSpec}), this utility also registers the system with
- * {@link TestRunner} if not registered already. Then it creates the stream partitions with the registered System
- * <p>
- * @param stream represents the stream that is supposed to be configured with {@link TestRunner}
- * @return calling instance of {@link TestRunner} with {@code stream} configured with it
+ * Adds the provided output stream to the test application.
+ * @param streamDescriptor describes the stream that is supposed to be output for the Samza application
+ * @param partitionCount partition count of output stream
+ * @return this {@link TestRunner}
*/
- public TestRunner addOutputStream(CollectionStream stream) {
- Preconditions.checkNotNull(stream);
- Preconditions.checkState(stream.getInitPartitions().size() >= 1);
- registerSystem(stream.getSystemName());
- stream.setTestId(testId);
- StreamSpec spec = new StreamSpec(stream.getStreamName(), stream.getPhysicalName(), stream.getSystemName(), stream.getInitPartitions().size());
+ public TestRunner addOutputStream(InMemoryOutputDescriptor streamDescriptor, int partitionCount) {
+ Preconditions.checkNotNull(streamDescriptor);
+ Preconditions.checkState(partitionCount >= 1);
+ InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) streamDescriptor.getSystemDescriptor();
+ imsd.withInMemoryScope(this.inMemoryScope);
+ Config config = new MapConfig(streamDescriptor.toConfig(), streamDescriptor.getSystemDescriptor().toConfig());
+ InMemorySystemFactory factory = new InMemorySystemFactory();
+ String physicalName = (String) streamDescriptor.getPhysicalName().orElse(streamDescriptor.getStreamId());
+ StreamSpec spec = new StreamSpec(streamDescriptor.getStreamId(), physicalName, streamDescriptor.getSystemName(),
+ partitionCount);
factory
- .getAdmin(stream.getSystemName(), new MapConfig(configs))
+ .getAdmin(streamDescriptor.getSystemName(), config)
.createStream(spec);
- configs.putAll(stream.getStreamConfig());
+ addConfigs(streamDescriptor.toConfig());
+ addConfigs(streamDescriptor.getSystemDescriptor().toConfig());
return this;
}
-
/**
- * Utility to run a test configured using TestRunner
+ * Run the application with the specified timeout
*
- * @param timeout time to wait for the high level application or low level task to finish. This timeout does not include
+ * @param timeout time to wait for the application to finish. This timeout does not include
* input stream initialization time or the assertion time over output streams. This timeout just accounts
- * for time that samza job takes run. Samza job won't be invoked with negative or zero timeout
+ * for time that samza job takes run. Timeout must be greater than 0.
* @throws SamzaException if Samza job fails with exception and returns UnsuccessfulFinish as the statuscode
*/
public void run(Duration timeout) {
@@ -301,34 +256,33 @@ public class TestRunner {
throw new SamzaException(ExceptionUtils.getStackTrace(status.getThrowable()));
}
}
+
/**
- * Utility to read the messages from a stream from the beginning, this is supposed to be used after executing the
- * TestRunner in order to assert over the streams (ex output streams).
+ * Gets the contents of the output stream represented by {@code outputDescriptor} after {@link TestRunner#run(Duration)}
+ * has completed
*
- * @param stream represents {@link CollectionStream} whose current state of partitions is requested to be fetched
- * @param timeout poll timeout in Ms
- * @param <T> represents type of message
+ * @param outputDescriptor describes the stream to be consumed
+ * @param timeout timeout for consumption of stream in Ms
+ * @param <StreamMessageType> type of message
*
- * @return a map key of which represents the {@code partitionId} and value represents the current state of the partition
- * i.e messages in the partition
- * @throws InterruptedException Thrown when a blocking poll has been interrupted by another thread.
+ * @return a map whose key is {@code partitionId} and value is messages in partition
+ * @throws SamzaException Thrown when a poll is incomplete
*/
- public static <T> Map<Integer, List<T>> consumeStream(CollectionStream stream, Duration timeout) throws InterruptedException {
- Preconditions.checkNotNull(stream);
- Preconditions.checkNotNull(stream.getSystemName());
- String streamName = stream.getStreamName();
- String systemName = stream.getSystemName();
+ public static <StreamMessageType> Map<Integer, List<StreamMessageType>> consumeStream(
+ InMemoryOutputDescriptor outputDescriptor, Duration timeout) throws SamzaException {
+ Preconditions.checkNotNull(outputDescriptor);
+ String streamId = outputDescriptor.getStreamId();
+ String systemName = outputDescriptor.getSystemName();
Set<SystemStreamPartition> ssps = new HashSet<>();
- Set<String> streamNames = new HashSet<>();
- streamNames.add(streamName);
+ Set<String> streamIds = new HashSet<>();
+ streamIds.add(streamId);
SystemFactory factory = new InMemorySystemFactory();
- HashMap<String, String> config = new HashMap<>();
- config.put(InMemorySystemConfig.INMEMORY_SCOPE, stream.getTestId());
- Map<String, SystemStreamMetadata> metadata =
- factory.getAdmin(systemName, new MapConfig(config)).getSystemStreamMetadata(streamNames);
- SystemConsumer consumer = factory.getConsumer(systemName, new MapConfig(config), null);
- metadata.get(stream.getPhysicalName()).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
- SystemStreamPartition temp = new SystemStreamPartition(systemName, streamName, partition);
+ Config config = new MapConfig(outputDescriptor.toConfig(), outputDescriptor.getSystemDescriptor().toConfig());
+ Map<String, SystemStreamMetadata> metadata = factory.getAdmin(systemName, config).getSystemStreamMetadata(streamIds);
+ SystemConsumer consumer = factory.getConsumer(systemName, config, null);
+ String name = (String) outputDescriptor.getPhysicalName().orElse(streamId);
+ metadata.get(name).getSystemStreamPartitionMetadata().keySet().forEach(partition -> {
+ SystemStreamPartition temp = new SystemStreamPartition(systemName, streamId, partition);
ssps.add(temp);
consumer.register(temp, "0");
});
@@ -337,12 +291,17 @@ public class TestRunner {
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> output = new HashMap<>();
HashSet<SystemStreamPartition> didNotReachEndOfStream = new HashSet<>(ssps);
while (System.currentTimeMillis() < t + timeout.toMillis()) {
- Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = consumer.poll(ssps, 10);
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> currentState = null;
+ try {
+ currentState = consumer.poll(ssps, 10);
+ } catch (InterruptedException e) {
+ throw new SamzaException("Timed out while consuming stream \n" + e.getMessage());
+ }
for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> entry : currentState.entrySet()) {
SystemStreamPartition ssp = entry.getKey();
output.computeIfAbsent(ssp, k -> new LinkedList<IncomingMessageEnvelope>());
List<IncomingMessageEnvelope> currentBuffer = entry.getValue();
- Integer totalMessagesToFetch = Integer.valueOf(metadata.get(stream.getStreamName())
+ Integer totalMessagesToFetch = Integer.valueOf(metadata.get(outputDescriptor.getStreamId())
.getSystemStreamPartitionMetadata()
.get(ssp.getPartition())
.getNewestOffset());
@@ -364,7 +323,7 @@ public class TestRunner {
return output.entrySet()
.stream()
.collect(Collectors.toMap(entry -> entry.getKey().getPartition().getPartitionId(),
- entry -> entry.getValue().stream().map(e -> (T) e.getMessage()).collect(Collectors.toList())));
+ entry -> entry.getValue().stream().map(e -> (StreamMessageType) e.getMessage()).collect(Collectors.toList())));
}
private TaskFactory createTaskFactory() {
@@ -388,4 +347,41 @@ public class TestRunner {
throw new SamzaException(String.format("Not supported task.class %s. task.class has to implement either StreamTask "
+ "or AsyncStreamTask", taskClass.getName()));
}
+
+ /**
+ * Creates an in memory stream with {@link InMemorySystemFactory} and feeds its partition with stream of messages
+ * @param partitonData key of the map represents partitionId and value represents
+ * messages in the partition
+ * @param descriptor describes a stream to initialize with the in memory system
+ */
+ private <StreamMessageType> void initializeInMemoryInputStream(InMemoryInputDescriptor descriptor,
+ Map<Integer, Iterable<StreamMessageType>> partitonData) {
+ String systemName = descriptor.getSystemName();
+ String streamName = (String) descriptor.getPhysicalName().orElse(descriptor.getStreamId());
+ if (configs.containsKey(TaskConfig.INPUT_STREAMS())) {
+ configs.put(TaskConfig.INPUT_STREAMS(),
+ configs.get(TaskConfig.INPUT_STREAMS()).concat("," + systemName + "." + streamName));
+ } else {
+ configs.put(TaskConfig.INPUT_STREAMS(), systemName + "." + streamName);
+ }
+ InMemorySystemDescriptor imsd = (InMemorySystemDescriptor) descriptor.getSystemDescriptor();
+ imsd.withInMemoryScope(this.inMemoryScope);
+ addConfigs(descriptor.toConfig());
+ addConfigs(descriptor.getSystemDescriptor().toConfig());
+ StreamSpec spec = new StreamSpec(descriptor.getStreamId(), streamName, systemName, partitonData.size());
+ SystemFactory factory = new InMemorySystemFactory();
+ Config config = new MapConfig(descriptor.toConfig(), descriptor.getSystemDescriptor().toConfig());
+ factory.getAdmin(systemName, config).createStream(spec);
+ SystemProducer producer = factory.getProducer(systemName, config, null);
+ SystemStream sysStream = new SystemStream(systemName, streamName);
+ partitonData.forEach((partitionId, partition) -> {
+ partition.forEach(e -> {
+ Object key = e instanceof KV ? ((KV) e).getKey() : null;
+ Object value = e instanceof KV ? ((KV) e).getValue() : e;
+ producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), key, value));
+ });
+ producer.send(systemName, new OutgoingMessageEnvelope(sysStream, Integer.valueOf(partitionId), null,
+ new EndOfStreamMessage(null)));
+ });
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java b/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
deleted file mode 100644
index 320a0ac..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/stream/CollectionStream.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.stream;
-
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * A CollectionStream represents an in memory stream of messages that can either have single or multiple partitions.
- * Every CollectionStream is coupled with a {@link org.apache.samza.test.framework.system.CollectionStreamSystemSpec} that
- * contains all the specification for system
- *<p>
- * When sending messages using {@code CollectionStream<KV<K, V>>}, messages use K as key and V as message
- * When sending messages using {@code CollectionStream<T>}, messages use a nullkey.
- *</p>
- * @param <T>
- * can represent a message with null key or a KV {@link org.apache.samza.operators.KV}, key of which represents key of a
- * {@link org.apache.samza.system.IncomingMessageEnvelope} or {@link org.apache.samza.system.OutgoingMessageEnvelope}
- * and value represents the message of the same
- */
-public class CollectionStream<T> {
- private String testId;
- private final String streamName;
- private final String physicalName;
- private final String systemName;
- private Map<Integer, Iterable<T>> initPartitions;
- private Map<String, String> streamConfig;
- private static final String STREAM_TO_SYSTEM = "streams.%s.samza.system";
- private static final String PHYSICAL_NAME = "streams.%s.samza.physical.name";
-
- /**
- * Constructs a new CollectionStream from specified components.
- * @param systemName represents name of the system stream is associated with
- * @param streamName represents name of the stream
- */
- private CollectionStream(String systemName, String streamName) {
- Preconditions.checkNotNull(systemName);
- Preconditions.checkNotNull(streamName);
- this.systemName = systemName;
- this.streamName = streamName;
- this.streamConfig = new HashMap<>();
- // TODO: Once SAMZA-1737 is resolved, generate a randomized physical name
- this.physicalName = streamName;
- streamConfig.put(String.format(STREAM_TO_SYSTEM, this.streamName), systemName);
- streamConfig.put(String.format(PHYSICAL_NAME, this.streamName), physicalName);
- }
-
-
- /**
- * Constructs a new CollectionStream with multiple empty partitions from specified components.
- * @param systemName represents name of the system stream is associated with
- * @param streamName represents name of the stream
- * @param partitionCount represents number of partitions, each of these partitions will be empty
- */
- private CollectionStream(String systemName, String streamName, Integer partitionCount) {
- this(systemName, streamName);
- Preconditions.checkState(partitionCount > 0);
- initPartitions = new HashMap<>();
- for (int i = 0; i < partitionCount; i++) {
- initPartitions.put(i, new ArrayList<>());
- }
- }
-
- /**
- * Constructs a new CollectionStream with single partition from specified components.
- * @param systemName represents name of the system stream is associated with
- * @param streamName represents name of the stream
- * @param initPartition represents the messages that the stream will be intialized with, default partitionId for the
- * this single partition stream is 0
- */
- private CollectionStream(String systemName, String streamName, Iterable<T> initPartition) {
- this(systemName, streamName);
- Preconditions.checkNotNull(initPartition);
- initPartitions = new HashMap<>();
- initPartitions.put(0, initPartition);
- }
-
- /**
- * Constructs a new CollectionStream with multiple partitions from specified components.
- * @param systemName represents name of the system stream is associated with
- * @param streamName represents name of the stream
- * @param initPartitions represents the partition state, key of the map represents partitionId and value represents
- * the messages that partition will be initialized with
- */
- private CollectionStream(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> initPartitions) {
- this(systemName, streamName);
- Preconditions.checkNotNull(initPartitions);
- this.initPartitions = new HashMap<>(initPartitions);
- }
-
- /**
- * @return The Map of partitions that input stream is supposed to be initialized with, this method is
- * used internally and should not be used for asserting over streams.
- * The true state of stream is determined by {@code consmeStream()} of {@link org.apache.samza.test.framework.TestRunner}
- */
- public Map<Integer, Iterable<T>> getInitPartitions() {
- return initPartitions;
- }
-
- public String getStreamName() {
- return streamName;
- }
-
- public String getSystemName() {
- return systemName;
- }
-
- public Map<String, String> getStreamConfig() {
- return streamConfig;
- }
-
- public String getTestId() {
- return testId;
- }
-
- public void setTestId(String testId) {
- this.testId = testId;
- }
-
- public String getPhysicalName() {
- return physicalName;
- }
-
- /**
- * Creates an in memory stream with the name {@code streamName} and initializes the stream to only one partition
- *
- * @param systemName represents name of the system stream is associated with
- * @param streamName represents the name of the Stream
- * @param <T> represents the type of each message in a stream
- * @return an {@link CollectionStream} with only one partition that can contain messages of the type
- */
- public static <T> CollectionStream<T> empty(String systemName, String streamName) {
- return new CollectionStream<>(systemName, streamName, 1);
- }
-
- /**
- * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions
- * as specified by {@code partitionCount}. These partitions are empty and are supposed to be used by Samza job to produce
- * messages to.
- *
- * @param systemName represents name of the system stream is associated with
- * @param streamName represents the name of the Stream
- * @param partitionCount represents the number of partitions the stream would have
- * @param <T> represents the type of each message in a stream
- * @return an empty {@link CollectionStream} with multiple partitions that can contain messages of the type {@code T}
- */
- public static <T> CollectionStream<T> empty(String systemName, String streamName, int partitionCount) {
- return new CollectionStream<>(systemName, streamName, partitionCount);
- }
-
- /**
- * Creates an in memory stream with the name {@code streamName}. Stream is created with single partition having
- * {@code partitionId} is 0. This partition is intialzied with messages of type T
- *
- * @param systemName represents name of the system stream is associated with
- * @param streamName represents the name of the Stream
- * @param partition represents the messages that the {@link org.apache.samza.system.SystemStreamPartition} will be
- * initialized with
- * @param <T> represents the type of a message in the stream
- * @return a {@link CollectionStream} with only one partition containing messages of the type {@code T}
- *
- */
- public static <T> CollectionStream<T> of(String systemName, String streamName, Iterable<T> partition) {
- return new CollectionStream<>(systemName, streamName, partition);
- }
-
- /**
- * Creates an in memory stream with the name {@code streamName} and initializes the stream to have as many partitions
- * as the size of {@code partitions} map. Key of the map {@code partitions} represents the {@code partitionId} of
- * each {@link org.apache.samza.Partition} for a {@link org.apache.samza.system.SystemStreamPartition} and value is
- * an Iterable of messages that the {@link org.apache.samza.system.SystemStreamPartition} should be initialized with.
- *
- * @param systemName represents name of the system stream is associated with
- * @param streamName represents the name of the Stream
- * @param partitions Key of an entry in partitions represents a {@code partitionId} of a {@link org.apache.samza.Partition}
- * and value represents the stream of messages the {@link org.apache.samza.system.SystemStreamPartition}
- * will be initialized with
- * @param <T> represents the type of a message in the stream
- * @return a {@link CollectionStream} with multiple partitions each containing messages of the type {@code T}
- *
- */
- public static <T> CollectionStream<T> of(String systemName, String streamName, Map<Integer, ? extends Iterable<T>> partitions) {
- return new CollectionStream<>(systemName, streamName, partitions);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
deleted file mode 100644
index 5658f61..0000000
--- a/samza-test/src/main/java/org/apache/samza/test/framework/system/CollectionStreamSystemSpec.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.test.framework.system;
-
-import com.google.common.base.Preconditions;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.samza.system.inmemory.InMemorySystemFactory;
-
-
-/**
- * CollectionStreamSystem represents a system that interacts with an underlying {@link InMemorySystemFactory} to create
- * various input and output streams and initialize {@link org.apache.samza.system.SystemStreamPartition} with messages
- * <p>
- * Following system level configs are set by default
- * <ol>
- * <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
- * <li>"jobs.job-name.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
- * <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
- * <li>"jobs.job-name.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
- * </ol>
- * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
- * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
- * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
- * scope have the highest precedence.
- */
-public class CollectionStreamSystemSpec {
- private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s."; // prefix to override configs generated by the planner
- private static final String SYSTEM_FACTORY = "systems.%s.samza.factory";
- private static final String SYSTEM_OFFSET = "systems.%s.default.stream.samza.offset.default";
-
- private String systemName;
- private Map<String, String> systemConfigs;
-
- /**
- * Constructs a new CollectionStreamSystem from specified components.
- * <p>
- * Every {@link CollectionStreamSystemSpec} is assumed to consume from the oldest offset, since stream is in memory and
- * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
- * <p>
- * @param systemName represents unique name of the system
- */
- private CollectionStreamSystemSpec(String systemName, String jobName) {
- this.systemName = systemName;
- systemConfigs = new HashMap<String, String>();
- systemConfigs.put(String.format(SYSTEM_FACTORY, systemName), InMemorySystemFactory.class.getName());
- systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_FACTORY, jobName, systemName), InMemorySystemFactory.class.getName());
- systemConfigs.put(String.format(SYSTEM_OFFSET, systemName), "oldest");
- systemConfigs.put(String.format(CONFIG_OVERRIDE_PREFIX + SYSTEM_OFFSET, jobName, systemName), "oldest");
- }
-
- public String getSystemName() {
- return systemName;
- }
-
- public Map<String, String> getSystemConfigs() {
- return systemConfigs;
- }
-
- /**
- * Creates a {@link CollectionStreamSystemSpec} with name {@code systemName}
- * @param systemName represents name of the {@link CollectionStreamSystemSpec}
- * @param jobName name of the job
- * @return an instance of {@link CollectionStreamSystemSpec}
- */
- public static CollectionStreamSystemSpec create(String systemName, String jobName) {
- Preconditions.checkState(StringUtils.isNotBlank(systemName));
- Preconditions.checkState(StringUtils.isNotBlank(jobName));
- return new CollectionStreamSystemSpec(systemName, jobName);
- }
-}
-
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
new file mode 100644
index 0000000..6065bf0
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryInputDescriptor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system;
+
+import org.apache.samza.operators.descriptors.base.stream.InputDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+
+/**
+ * A descriptor for an in memory stream of messages that can either have single or multiple partitions.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ * <p>
+ * @param <StreamMessageType> type of messages in input stream
+ */
+public class InMemoryInputDescriptor<StreamMessageType>
+ extends InputDescriptor<StreamMessageType, InMemoryInputDescriptor<StreamMessageType>> {
+ /**
+ * Constructs a new InMemoryInputDescriptor from specified components.
+ * @param systemDescriptor name of the system stream is associated with
+ * @param streamId name of the stream
+ */
+ InMemoryInputDescriptor(String streamId, InMemorySystemDescriptor systemDescriptor) {
+ super(streamId, new NoOpSerde<>(), systemDescriptor, null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
new file mode 100644
index 0000000..75fe7ae
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemoryOutputDescriptor.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system;
+
+import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.NoOpSerde;
+
+/**
+ * A descriptor for an in memory output stream.
+ * <p>
+ * An instance of this descriptor may be obtained from an appropriately configured {@link InMemorySystemDescriptor}.
+ * <p>
+ * Stream properties configured using a descriptor override corresponding properties provided in configuration.
+ *
+ * @param <StreamMessageType> type of messages in this stream.
+ */
+public class InMemoryOutputDescriptor<StreamMessageType>
+ extends OutputDescriptor<StreamMessageType, InMemoryOutputDescriptor<StreamMessageType>> {
+
+ /**
+ * Constructs an {@link OutputDescriptor} instance.
+ * @param streamId id of the stream
+ * @param systemDescriptor system descriptor this stream descriptor was obtained from
+ */
+ InMemoryOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) {
+ super(streamId, new NoOpSerde<>(), systemDescriptor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
new file mode 100644
index 0000000..92b23ef
--- /dev/null
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/system/InMemorySystemDescriptor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.test.framework.system;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.InMemorySystemConfig;
+import org.apache.samza.operators.descriptors.base.system.OutputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SimpleInputDescriptorProvider;
+import org.apache.samza.operators.descriptors.base.system.SystemDescriptor;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.inmemory.InMemorySystemFactory;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.test.framework.TestRunner;
+
+
+/**
+ * A descriptor for InMemorySystem.
+ * System properties configured using a descriptor override corresponding properties provided in configuration.
+ * <p>
+ * Following system level configs are set by default
+ * <ol>
+ * <li>"systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+ * <li>"systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+ * <li>"inmemory.scope = "Scope id generated to isolate the system in memory</li>
+ * </ol>
+ */
+public class InMemorySystemDescriptor extends SystemDescriptor<InMemorySystemDescriptor>
+ implements SimpleInputDescriptorProvider, OutputDescriptorProvider {
+ private static final String FACTORY_CLASS_NAME = InMemorySystemFactory.class.getName();
+ /**
+ * <p>
+ * The "systems.*" configs are required since the planner uses the system to get metadata about streams during
+ * planning. The "jobs.job-name.systems.*" configs are required since configs generated from user provided
+ * system/stream descriptors override configs originally supplied to the planner. Configs in the "jobs.job-name.*"
+ * scope have the highest precedence.
+ *
+ * For this case, it generates following overridden configs
+ * <ol>
+ * <li>"jobs.<job-name>.systems.%s.default.stream.samza.offset.default" = "oldest"</li>
+ * <li>"jobs.<job-name>.systems.%s.samza.factory" = {@link InMemorySystemFactory}</li>
+ * </ol>
+ *
+ **/
+ private static final String CONFIG_OVERRIDE_PREFIX = "jobs.%s.";
+ private static final String DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY = "systems.%s.default.stream.samza.offset.default";
+
+ private String inMemoryScope;
+
+ /**
+ * Constructs a new InMemorySystemDescriptor from specified components.
+ * <p>
+ * Every {@link InMemorySystemDescriptor} is configured to consume from the oldest offset, since stream is in memory and
+ * is used for testing purpose. System uses {@link InMemorySystemFactory} to initialize in memory streams.
+ * <p>
+ * @param systemName unique name of the system
+ */
+ public InMemorySystemDescriptor(String systemName) {
+ super(systemName, FACTORY_CLASS_NAME, null, null);
+ this.withDefaultStreamOffsetDefault(SystemStreamMetadata.OffsetType.OLDEST);
+ }
+
+ @Override
+ public <StreamMessageType> InMemoryInputDescriptor<StreamMessageType> getInputDescriptor(
+ String streamId, Serde<StreamMessageType> serde) {
+ return new InMemoryInputDescriptor<StreamMessageType>(streamId, this);
+ }
+
+ @Override
+ public <StreamMessageType> InMemoryOutputDescriptor<StreamMessageType> getOutputDescriptor(
+ String streamId, Serde<StreamMessageType> serde) {
+ return new InMemoryOutputDescriptor<StreamMessageType>(streamId, this);
+ }
+
+ /**
+ * {@code inMemoryScope} defines the unique instance of InMemorySystem, that this system uses
+ * This method is framework use only, users are not supposed to use it
+ *
+ * @param inMemoryScope acts as a unique global identifier for this instance of InMemorySystem
+ * @return this system descriptor
+ */
+ public InMemorySystemDescriptor withInMemoryScope(String inMemoryScope) {
+ this.inMemoryScope = inMemoryScope;
+ return this;
+ }
+
+ @Override
+ public Map<String, String> toConfig() {
+ HashMap<String, String> configs = new HashMap<>(super.toConfig());
+ configs.put(InMemorySystemConfig.INMEMORY_SCOPE, this.inMemoryScope);
+ configs.put(String.format(CONFIG_OVERRIDE_PREFIX + JavaSystemConfig.SYSTEM_FACTORY_FORMAT, TestRunner.JOB_NAME, getSystemName()),
+ FACTORY_CLASS_NAME);
+ configs.put(
+ String.format(CONFIG_OVERRIDE_PREFIX + DEFAULT_STREAM_OFFSET_DEFAULT_CONFIG_KEY, TestRunner.JOB_NAME,
+ getSystemName()), SystemStreamMetadata.OffsetType.OLDEST.toString());
+ return configs;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
index 3a1eba0..581b1c3 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/AsyncStreamTaskIntegrationTest.java
@@ -27,7 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.samza.operators.KV;
-import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Assert;
import org.junit.Test;
@@ -40,16 +43,21 @@ public class AsyncStreamTaskIntegrationTest {
List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
- CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList);
- CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+ InMemoryInputDescriptor<Integer> imid = isd
+ .getInputDescriptor("ints", new NoOpSerde<Integer>());
+
+ InMemoryOutputDescriptor imod = isd
+ .getOutputDescriptor("ints-out", new NoOpSerde<>());
TestRunner
.of(MyAsyncStreamTask.class)
- .addInputStream(input)
- .addOutputStream(output)
+ .addInputStream(imid, inputList)
+ .addOutputStream(imod, 1)
.run(Duration.ofSeconds(2));
- Assert.assertThat(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(0),
+ Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0),
IsIterableContainingInOrder.contains(outputList.toArray()));
}
@@ -58,49 +66,70 @@ public class AsyncStreamTaskIntegrationTest {
List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> outputList = Arrays.asList(50, 10, 20, 30, 40);
- CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList);
- CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+ InMemoryInputDescriptor<Integer> imid = isd
+ .getInputDescriptor("ints", new NoOpSerde<Integer>());
+
+ InMemoryOutputDescriptor imod = isd
+ .getOutputDescriptor("ints-out", new NoOpSerde<>());
TestRunner
.of(MyAsyncStreamTask.class)
- .addInputStream(input)
- .addOutputStream(output)
+ .addInputStream(imid, inputList)
+ .addOutputStream(imod, 1)
.run(Duration.ofSeconds(2));
- StreamAssert.containsInAnyOrder(output, outputList, Duration.ofMillis(1000));
+ StreamAssert.containsInAnyOrder(outputList, imod, Duration.ofMillis(1000));
}
@Test
public void testAsyncTaskWithMultiplePartition() throws Exception {
Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
- List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
- List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
- for (int i = 0; i < 5; i++) {
- List<KV> keyedPartition = new ArrayList<>();
- for (Integer val : partition) {
- keyedPartition.add(KV.of(i, val));
- }
- inputPartitionData.put(i, keyedPartition);
- expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
- }
+ genData(inputPartitionData, expectedOutputPartitionData);
- CollectionStream<KV> inputStream = CollectionStream.of("async-test", "ints", inputPartitionData);
- CollectionStream outputStream = CollectionStream.empty("async-test", "ints-out", 5);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+ InMemoryInputDescriptor<KV> imid = isd
+ .getInputDescriptor("ints", new NoOpSerde<KV>());
+ InMemoryOutputDescriptor imod = isd
+ .getOutputDescriptor("ints-out", new NoOpSerde<>());
TestRunner
.of(MyAsyncStreamTask.class)
- .addInputStream(inputStream)
- .addOutputStream(outputStream)
+ .addInputStream(imid, inputPartitionData)
+ .addOutputStream(imod, 5)
.run(Duration.ofSeconds(2));
- StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000));
+ StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
}
@Test
public void testAsyncTaskWithMultiplePartitionMultithreaded() throws Exception {
Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+ genData(inputPartitionData, expectedOutputPartitionData);
+
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+ InMemoryInputDescriptor<KV> imid = isd
+ .getInputDescriptor("ints", new NoOpSerde<>());
+
+ InMemoryOutputDescriptor imod = isd
+ .getOutputDescriptor("ints-out", new NoOpSerde<>());
+
+ TestRunner
+ .of(MyAsyncStreamTask.class)
+ .addInputStream(imid, inputPartitionData)
+ .addOutputStream(imod, 5)
+ .addOverrideConfig("task.max.concurrency", "4")
+ .run(Duration.ofSeconds(2));
+
+ StreamAssert.containsInAnyOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
+ }
+
+ public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) {
List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
for (int i = 0; i < 5; i++) {
@@ -111,18 +140,6 @@ public class AsyncStreamTaskIntegrationTest {
inputPartitionData.put(i, keyedPartition);
expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
}
-
- CollectionStream<KV> inputStream = CollectionStream.of("async-test", "ints", inputPartitionData);
- CollectionStream outputStream = CollectionStream.empty("async-test", "ints-out", 5);
-
- TestRunner
- .of(MyAsyncStreamTask.class)
- .addInputStream(inputStream)
- .addOutputStream(outputStream)
- .addOverrideConfig("task.max.concurrency", "4")
- .run(Duration.ofSeconds(2));
-
- StreamAssert.containsInAnyOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000));
}
/**
@@ -130,15 +147,20 @@ public class AsyncStreamTaskIntegrationTest {
*/
@Test(expected = AssertionError.class)
public void testSamzaJobTimeoutFailureForAsyncTask() {
- List<Integer> inputList = Arrays.asList(1, 2, 3, 4);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("async-test");
+
+ InMemoryInputDescriptor<Integer> imid = isd
+ .getInputDescriptor("ints", new NoOpSerde<>());
- CollectionStream<Integer> input = CollectionStream.of("async-test", "ints", inputList);
- CollectionStream output = CollectionStream.empty("async-test", "ints-out");
+ InMemoryOutputDescriptor imod = isd
+ .getOutputDescriptor("ints-out", new NoOpSerde<>());
TestRunner
.of(MyAsyncStreamTask.class)
- .addInputStream(input)
- .addOutputStream(output)
+ .addInputStream(imid, Arrays.asList(1, 2, 3, 4))
+ .addOutputStream(imod, 1)
.run(Duration.ofMillis(1));
}
+
+
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
index 1000f22..6dd9159 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTest.java
@@ -36,7 +36,9 @@ import org.apache.samza.system.SystemStream;
import org.apache.samza.system.kafka.KafkaInputDescriptor;
import org.apache.samza.system.kafka.KafkaSystemDescriptor;
import org.apache.samza.test.controlmessages.TestData;
-import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
import org.junit.Assert;
import org.junit.Test;
@@ -82,17 +84,22 @@ public class StreamApplicationIntegrationTest {
pageviews.add(pv);
}
- CollectionStream<PageView> input = CollectionStream.of("test", "PageView", pageviews);
- CollectionStream output = CollectionStream.empty("test", "Output", 10);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+ InMemoryInputDescriptor<PageView> imid = isd
+ .getInputDescriptor("PageView", new NoOpSerde<PageView>());
+
+ InMemoryOutputDescriptor<PageView> imod = isd
+ .getOutputDescriptor("Output", new NoOpSerde<PageView>());
TestRunner
.of(pageViewRepartition)
- .addInputStream(input)
- .addOutputStream(output)
+ .addInputStream(imid, pageviews)
+ .addOutputStream(imod, 10)
.addOverrideConfig("job.default.system", "test")
.run(Duration.ofMillis(1500));
- Assert.assertEquals(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
+ Assert.assertEquals(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(random.nextInt(count)).size(), 1);
}
public static final class Values {
@@ -107,13 +114,18 @@ public class StreamApplicationIntegrationTest {
@Test(expected = SamzaException.class)
public void testSamzaJobStartMissingConfigFailureForStreamApplication() {
- CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", new ArrayList<>());
- CollectionStream output = CollectionStream.empty("test", "Output", 10);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+ InMemoryInputDescriptor<PageView> imid = isd
+ .getInputDescriptor("PageView", new NoOpSerde<PageView>());
+
+ InMemoryOutputDescriptor<PageView> imod = isd
+ .getOutputDescriptor("Output", new NoOpSerde<PageView>());
TestRunner
.of(pageViewRepartition)
- .addInputStream(input)
- .addOutputStream(output)
+ .addInputStream(imid, new ArrayList<>())
+ .addOutputStream(imod, 10)
.run(Duration.ofMillis(1000));
}
@@ -131,12 +143,17 @@ public class StreamApplicationIntegrationTest {
pageviews.add(new TestData.PageView(null, memberId));
}
- CollectionStream<TestData.PageView> input = CollectionStream.of("test", "PageView", pageviews);
- CollectionStream output = CollectionStream.empty("test", "Output", 1);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+ InMemoryInputDescriptor<PageView> imid = isd
+ .getInputDescriptor("PageView", new NoOpSerde<PageView>());
+
+ InMemoryOutputDescriptor<PageView> imod = isd
+ .getOutputDescriptor("Output", new NoOpSerde<PageView>());
TestRunner.of(pageViewFilter)
- .addInputStream(input)
- .addOutputStream(output)
+ .addInputStream(imid, pageviews)
+ .addOutputStream(imod, 10)
.addOverrideConfig("job.default.system", "test")
.run(Duration.ofMillis(1000));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
index f888b4a..0580598 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamTaskIntegrationTest.java
@@ -28,7 +28,10 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.operators.KV;
-import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
import org.hamcrest.collection.IsIterableContainingInOrder;
import org.junit.Assert;
import org.junit.Test;
@@ -41,13 +44,23 @@ public class StreamTaskIntegrationTest {
List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
- CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList);
- CollectionStream output = CollectionStream.empty("test", "output");
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
- TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1));
+ InMemoryInputDescriptor<Integer> imid = isd
+ .getInputDescriptor("input", new NoOpSerde<Integer>());
- Assert.assertThat(TestRunner.consumeStream(output, Duration.ofMillis(1000)).get(0),
+ InMemoryOutputDescriptor<Integer> imod = isd
+ .getOutputDescriptor("output", new NoOpSerde<Integer>());
+
+ TestRunner
+ .of(MyStreamTestTask.class)
+ .addInputStream(imid, inputList)
+ .addOutputStream(imod, 1)
+ .run(Duration.ofSeconds(1));
+
+ Assert.assertThat(TestRunner.consumeStream(imod, Duration.ofMillis(1000)).get(0),
IsIterableContainingInOrder.contains(outputList.toArray()));
+
}
/**
@@ -57,10 +70,19 @@ public class StreamTaskIntegrationTest {
public void testSamzaJobFailureForSyncTask() {
List<Double> inputList = Arrays.asList(1.2, 2.3, 3.33, 4.5);
- CollectionStream<Double> input = CollectionStream.of("test", "doubles", inputList);
- CollectionStream output = CollectionStream.empty("test", "output");
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
- TestRunner.of(MyStreamTestTask.class).addInputStream(input).addOutputStream(output).run(Duration.ofSeconds(1));
+ InMemoryInputDescriptor<Double> imid = isd
+ .getInputDescriptor("doubles", new NoOpSerde<Double>());
+
+ InMemoryOutputDescriptor imod = isd
+ .getOutputDescriptor("output", new NoOpSerde<>());
+
+ TestRunner
+ .of(MyStreamTestTask.class)
+ .addInputStream(imid, inputList)
+ .addOutputStream(imod, 1)
+ .run(Duration.ofSeconds(1));
}
@Test
@@ -68,50 +90,72 @@ public class StreamTaskIntegrationTest {
List<Integer> inputList = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> outputList = Arrays.asList(10, 20, 30, 40, 50);
- CollectionStream<Integer> input = CollectionStream.of("test", "input", inputList);
- CollectionStream output = CollectionStream.empty("test", "output");
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+ InMemoryInputDescriptor<Integer> imid = isd
+ .getInputDescriptor("input", new NoOpSerde<Integer>());
+
+ InMemoryOutputDescriptor<Integer> imod = isd
+ .getOutputDescriptor("output", new NoOpSerde<Integer>());
TestRunner
.of(MyStreamTestTask.class)
- .addInputStream(input)
- .addOutputStream(output)
+ .addInputStream(imid, inputList)
+ .addOutputStream(imod, 1)
.addOverrideConfig("job.container.thread.pool.size", "4")
.run(Duration.ofSeconds(1));
- StreamAssert.containsInOrder(output, outputList, Duration.ofMillis(1000));
+ StreamAssert.containsInOrder(outputList, imod, Duration.ofMillis(1000));
}
@Test
public void testSyncTaskWithMultiplePartition() throws Exception {
Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
- List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
- List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
- for (int i = 0; i < 5; i++) {
- List<KV> keyedPartition = new ArrayList<>();
- for (Integer val : partition) {
- keyedPartition.add(KV.of(i, val));
- }
- inputPartitionData.put(i, keyedPartition);
- expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
- }
+ genData(inputPartitionData, expectedOutputPartitionData);
+
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+ InMemoryInputDescriptor<KV> imid = isd
+ .getInputDescriptor("input", new NoOpSerde<KV>());
- CollectionStream<KV> inputStream = CollectionStream.of("test", "input", inputPartitionData);
- CollectionStream outputStream = CollectionStream.empty("test", "output", 5);
+ InMemoryOutputDescriptor<Integer> imod = isd
+ .getOutputDescriptor("output", new NoOpSerde<Integer>());
TestRunner
.of(MyStreamTestTask.class)
- .addInputStream(inputStream)
- .addOutputStream(outputStream)
+ .addInputStream(imid, inputPartitionData)
+ .addOutputStream(imod, 5)
.run(Duration.ofSeconds(2));
- StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000));
+ StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
}
@Test
public void testSyncTaskWithMultiplePartitionMultithreaded() throws Exception {
Map<Integer, List<KV>> inputPartitionData = new HashMap<>();
Map<Integer, List<Integer>> expectedOutputPartitionData = new HashMap<>();
+ genData(inputPartitionData, expectedOutputPartitionData);
+
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor("test");
+
+ InMemoryInputDescriptor<KV> imid = isd
+ .getInputDescriptor("input", new NoOpSerde<KV>());
+
+ InMemoryOutputDescriptor<Integer> imod = isd
+ .getOutputDescriptor("output", new NoOpSerde<Integer>());
+
+ TestRunner
+ .of(MyStreamTestTask.class)
+ .addInputStream(imid, inputPartitionData)
+ .addOutputStream(imod, 5)
+ .addOverrideConfig("job.container.thread.pool.size", "4")
+ .run(Duration.ofSeconds(2));
+
+ StreamAssert.containsInOrder(expectedOutputPartitionData, imod, Duration.ofMillis(1000));
+ }
+
+ public void genData(Map<Integer, List<KV>> inputPartitionData, Map<Integer, List<Integer>> expectedOutputPartitionData) {
List<Integer> partition = Arrays.asList(1, 2, 3, 4, 5);
List<Integer> outputPartition = partition.stream().map(x -> x * 10).collect(Collectors.toList());
for (int i = 0; i < 5; i++) {
@@ -122,17 +166,5 @@ public class StreamTaskIntegrationTest {
inputPartitionData.put(i, keyedPartition);
expectedOutputPartitionData.put(i, new ArrayList<Integer>(outputPartition));
}
-
- CollectionStream<KV> inputStream = CollectionStream.of("test", "input", inputPartitionData);
- CollectionStream outputStream = CollectionStream.empty("test", "output", 5);
-
- TestRunner
- .of(MyStreamTestTask.class)
- .addInputStream(inputStream)
- .addOutputStream(outputStream)
- .addOverrideConfig("job.container.thread.pool.size", "4")
- .run(Duration.ofSeconds(2));
-
- StreamAssert.containsInOrder(outputStream, expectedOutputPartitionData, Duration.ofMillis(1000));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1755268c/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
index 5c067ad..adcea48 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTableWithSideInputs.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.JobConfig;
@@ -44,7 +45,9 @@ import org.apache.samza.storage.kv.inmemory.InMemoryTableDescriptor;
import org.apache.samza.system.kafka.KafkaSystemDescriptor;
import org.apache.samza.table.Table;
import org.apache.samza.test.framework.TestRunner;
-import org.apache.samza.test.framework.stream.CollectionStream;
+import org.apache.samza.test.framework.system.InMemoryInputDescriptor;
+import org.apache.samza.test.framework.system.InMemoryOutputDescriptor;
+import org.apache.samza.test.framework.system.InMemorySystemDescriptor;
import org.apache.samza.test.harness.AbstractIntegrationTestHarness;
import org.junit.Test;
@@ -84,25 +87,28 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);
configs.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName);
- CollectionStream<PageView> pageViewStream =
- CollectionStream.of(systemName, PAGEVIEW_STREAM, pageViews);
- CollectionStream<Profile> profileStream =
- CollectionStream.of(systemName, PROFILE_STREAM, profiles);
+ InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
- CollectionStream<EnrichedPageView> outputStream =
- CollectionStream.empty(systemName, ENRICHED_PAGEVIEW_STREAM);
+ InMemoryInputDescriptor<PageView> pageViewStreamDesc = isd
+ .getInputDescriptor(PAGEVIEW_STREAM, new NoOpSerde<PageView>());
+
+ InMemoryInputDescriptor<Profile> profileStreamDesc = isd
+ .getInputDescriptor(PROFILE_STREAM, new NoOpSerde<Profile>());
+
+ InMemoryOutputDescriptor<EnrichedPageView> outputStreamDesc = isd
+ .getOutputDescriptor(ENRICHED_PAGEVIEW_STREAM, new NoOpSerde<EnrichedPageView>());
TestRunner
.of(app)
- .addInputStream(pageViewStream)
- .addInputStream(profileStream)
- .addOutputStream(outputStream)
+ .addInputStream(pageViewStreamDesc, pageViews)
+ .addInputStream(profileStreamDesc, profiles)
+ .addOutputStream(outputStreamDesc, 1)
.addConfigs(new MapConfig(configs))
.addOverrideConfig(ClusterManagerConfig.CLUSTER_MANAGER_HOST_AFFINITY_ENABLED, Boolean.FALSE.toString())
.run(Duration.ofMillis(100000));
try {
- Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStream, Duration.ofMillis(1000));
+ Map<Integer, List<EnrichedPageView>> result = TestRunner.consumeStream(outputStreamDesc, Duration.ofMillis(1000));
List<EnrichedPageView> results = result.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
@@ -117,7 +123,7 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
assertEquals("Mismatch between the expected and actual join count", results.size(),
expectedEnrichedPageviews.size());
assertTrue("Pageview profile join did not succeed for all inputs", successfulJoin);
- } catch (InterruptedException e) {
+ } catch (SamzaException e) {
e.printStackTrace();
}
}
@@ -163,4 +169,4 @@ public class TestLocalTableWithSideInputs extends AbstractIntegrationTestHarness
});
}
}
-}
+}
\ No newline at end of file