You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/03/22 14:59:46 UTC
samza git commit: samza-1156: Improve ApplicationRunner method naming
and class structure
Repository: samza
Updated Branches:
refs/heads/master bd71d609c -> e2c35c000
samza-1156: Improve ApplicationRunner method naming and class structure
navina xinyuiscool nickpan47 take a look
Author: Jacob Maes <jm...@linkedin.com>
Reviewers: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Closes #93 from jmakes/app-runner-class-hier
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e2c35c00
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e2c35c00
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e2c35c00
Branch: refs/heads/master
Commit: e2c35c00068f071840332a200d9879e07fdb1001
Parents: bd71d60
Author: Jacob Maes <jm...@linkedin.com>
Authored: Wed Mar 22 07:59:08 2017 -0700
Committer: Jacob Maes <jm...@linkedin.com>
Committed: Wed Mar 22 07:59:08 2017 -0700
----------------------------------------------------------------------
.../apache/samza/runtime/ApplicationRunner.java | 34 +++---
.../apache/samza/operators/StreamGraphImpl.java | 2 +-
.../runtime/AbstractApplicationRunner.java | 23 ++--
.../runtime/TestAbstractApplicationRunner.java | 108 +++++++++----------
4 files changed, 85 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/e2c35c00/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
index 6da1242..d148626 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -27,16 +27,15 @@ import org.apache.samza.system.StreamSpec;
/**
- * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
- *
- * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
- * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
+ * A physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
*/
@InterfaceStability.Unstable
-public interface ApplicationRunner {
+public abstract class ApplicationRunner {
+
+ private static final String RUNNER_CONFIG = "app.runner.class";
+ private static final String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner";
- String RUNNER_CONFIG = "app.runner.class";
- String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner";
+ protected final Config config;
/**
* Static method to create the local {@link ApplicationRunner}.
@@ -44,19 +43,17 @@ public interface ApplicationRunner {
* @param config configuration passed in to initialize the Samza local process
* @return the local {@link ApplicationRunner} to run the user-defined stream applications
*/
- static ApplicationRunner getLocalRunner(Config config) {
+ public static ApplicationRunner getLocalRunner(Config config) {
return null;
}
/**
* Static method to load the {@link ApplicationRunner}
*
- * Requires the implementation class to define a constructor with a single {@link Config} as the argument.
- *
* @param config configuration passed in to initialize the Samza processes
* @return the configure-driven {@link ApplicationRunner} to run the user-defined stream applications
*/
- static ApplicationRunner fromConfig(Config config) {
+ public static ApplicationRunner fromConfig(Config config) {
try {
Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, DEFAULT_RUNNER_CLASS));
if (ApplicationRunner.class.isAssignableFrom(runnerClass)) {
@@ -68,16 +65,25 @@ public interface ApplicationRunner {
RUNNER_CONFIG)), e);
}
throw new ConfigException(String.format(
- "Class %s does not implement interface ApplicationRunner properly",
+ "Class %s does not extend ApplicationRunner properly",
config.get(RUNNER_CONFIG)));
}
+
+ public ApplicationRunner(Config config) {
+ if (config == null) {
+ throw new NullPointerException("Parameter 'config' cannot be null.");
+ }
+
+ this.config = config;
+ }
+
/**
* Method to be invoked to deploy and run the actual Samza jobs to execute {@link StreamApplication}
*
* @param streamApp the user-defined {@link StreamApplication} object
*/
- void run(StreamApplication streamApp);
+ public abstract void run(StreamApplication streamApp);
/**
* Constructs a {@link StreamSpec} from the configuration for the specified streamId.
@@ -98,5 +104,5 @@ public interface ApplicationRunner {
* @param streamId The logical identifier for the stream in Samza.
* @return The {@link StreamSpec} instance.
*/
- StreamSpec streamFromConfig(String streamId);
+ public abstract StreamSpec getStream(String streamId);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/e2c35c00/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 1b36f76..fe86699 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -255,7 +255,7 @@ public class StreamGraphImpl implements StreamGraph {
config.get(JobConfig.JOB_NAME()),
config.get(JobConfig.JOB_ID(), "1"),
opNameWithId);
- StreamSpec streamSpec = runner.streamFromConfig(streamId);
+ StreamSpec streamSpec = runner.getStream(streamId);
this.inStreams.putIfAbsent(streamSpec.getId(), new IntermediateStreamImpl(this, streamSpec, null, null, parKeyFn));
IntermediateStreamImpl intStream = (IntermediateStreamImpl) this.inStreams.get(streamSpec.getId());
http://git-wip-us.apache.org/repos/asf/samza/blob/e2c35c00/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
index a5d784a..5f01ca7 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -24,23 +24,20 @@ import org.apache.samza.config.StreamConfig;
import org.apache.samza.system.StreamSpec;
-public abstract class AbstractApplicationRunner implements ApplicationRunner {
-
- protected final Config config;
+/**
+ * Defines common, core behavior for implementations of the {@link ApplicationRunner} API
+ */
+public abstract class AbstractApplicationRunner extends ApplicationRunner {
public AbstractApplicationRunner(Config config) {
- if (config == null) {
- throw new NullPointerException("Parameter 'config' cannot be null.");
- }
-
- this.config = config;
+ super(config);
}
@Override
- public StreamSpec streamFromConfig(String streamId) {
+ public StreamSpec getStream(String streamId) {
StreamConfig streamConfig = new StreamConfig(config);
String physicalName = streamConfig.getPhysicalName(streamId);
- return streamFromConfig(streamId, physicalName);
+ return getStream(streamId, physicalName);
}
/**
@@ -61,11 +58,11 @@ public abstract class AbstractApplicationRunner implements ApplicationRunner {
* @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
* @return The {@link StreamSpec} instance.
*/
- /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
+ /*package private*/ StreamSpec getStream(String streamId, String physicalName) {
StreamConfig streamConfig = new StreamConfig(config);
String system = streamConfig.getSystem(streamId);
- return streamFromConfig(streamId, physicalName, system);
+ return getStream(streamId, physicalName, system);
}
/**
@@ -79,7 +76,7 @@ public abstract class AbstractApplicationRunner implements ApplicationRunner {
* @param system The name of the System on which this stream will be used.
* @return The {@link StreamSpec} instance.
*/
- /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
+ /*package private*/ StreamSpec getStream(String streamId, String physicalName, String system) {
StreamConfig streamConfig = new StreamConfig(config);
Map<String, String> properties = streamConfig.getStreamProperties(streamId);
http://git-wip-us.apache.org/repos/asf/samza/blob/e2c35c00/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
index 155a47d..8d7db9f 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractApplicationRunner.java
@@ -52,13 +52,13 @@ public class TestAbstractApplicationRunner {
// The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
@Test
- public void testStreamFromConfigWithPhysicalNameInConfig() {
+ public void testgetStreamWithPhysicalNameInConfig() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID);
assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
}
@@ -66,71 +66,71 @@ public class TestAbstractApplicationRunner {
// The streamId should be used as the physicalName when the physical name is not specified.
// NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
@Test
- public void testStreamFromConfigWithoutPhysicalNameInConfig() {
+ public void testgetStreamWithoutPhysicalNameInConfig() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID);
assertEquals(STREAM_ID, spec.getPhysicalName());
}
// If the system is specified at the stream scope, use it
@Test
- public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
+ public void testgetStreamWithSystemAtStreamScopeInConfig() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID);
assertEquals(TEST_SYSTEM, spec.getSystemName());
}
// If system isn't specified at stream scope, use the default system
@Test
- public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
+ public void testgetStreamWithSystemAtDefaultScopeInConfig() {
Config config = addConfigs(buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID);
assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
}
// Stream scope should override default scope
@Test
- public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
+ public void testgetStreamWithSystemAtBothScopesInConfig() {
Config config = addConfigs(buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
StreamConfig.SYSTEM(), TEST_SYSTEM),
JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID);
assertEquals(TEST_SYSTEM, spec.getSystemName());
}
// System is required. Throw if it cannot be determined.
@Test(expected = Exception.class)
- public void testStreamFromConfigWithOutSystemInConfig() {
+ public void testgetStreamWithOutSystemInConfig() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID);
assertEquals(TEST_SYSTEM, spec.getSystemName());
}
// The properties in the config "streams.{streamId}.*" should be passed through to the spec.
@Test
- public void testStreamFromConfigPropertiesPassthrough() {
+ public void testgetStreamPropertiesPassthrough() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
StreamConfig.SYSTEM(), TEST_SYSTEM,
@@ -138,8 +138,8 @@ public class TestAbstractApplicationRunner {
"systemProperty2", "systemValue2",
"systemProperty3", "systemValue3");
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID);
Map<String, String> properties = spec.getConfig();
assertEquals(3, properties.size());
@@ -153,7 +153,7 @@ public class TestAbstractApplicationRunner {
// The samza properties (which are invalid for the underlying system) should be filtered out.
@Test
- public void testStreamFromConfigSamzaPropertiesOmitted() {
+ public void testgetStreamSamzaPropertiesOmitted() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
StreamConfig.SYSTEM(), TEST_SYSTEM,
@@ -161,8 +161,8 @@ public class TestAbstractApplicationRunner {
"systemProperty2", "systemValue2",
"systemProperty3", "systemValue3");
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID);
Map<String, String> properties = spec.getConfig();
assertEquals(3, properties.size());
@@ -174,13 +174,13 @@ public class TestAbstractApplicationRunner {
// When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
@Test
- public void testStreamFromConfigPhysicalNameArgSimple() {
+ public void testgetStreamPhysicalNameArgSimple() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME);
assertEquals(STREAM_ID, spec.getId());
assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
@@ -189,37 +189,37 @@ public class TestAbstractApplicationRunner {
// Special characters are allowed for the physical name
@Test
- public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
+ public void testgetStreamPhysicalNameArgSpecialCharacters() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
}
// Null is allowed for the physical name
@Test
- public void testStreamFromConfigPhysicalNameArgNull() {
+ public void testgetStreamPhysicalNameArgNull() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID, null);
assertNull(spec.getPhysicalName());
}
// When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
@Test
- public void testStreamFromConfigSystemNameArgValid() {
+ public void testgetStreamSystemNameArgValid() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
StreamConfig.SYSTEM(), TEST_SYSTEM2); // This too
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
assertEquals(STREAM_ID, spec.getId());
assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
@@ -228,65 +228,65 @@ public class TestAbstractApplicationRunner {
// Special characters are NOT allowed for system name, because it's used as an identifier in the config.
@Test(expected = IllegalArgumentException.class)
- public void testStreamFromConfigSystemNameArgInvalid() {
+ public void testgetStreamSystemNameArgInvalid() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
StreamConfig.SYSTEM(), TEST_SYSTEM2);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
}
// Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
@Test(expected = IllegalArgumentException.class)
- public void testStreamFromConfigSystemNameArgEmpty() {
+ public void testgetStreamSystemNameArgEmpty() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
StreamConfig.SYSTEM(), TEST_SYSTEM2);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, "");
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, "");
}
// Null is not allowed for system name.
@Test(expected = NullPointerException.class)
- public void testStreamFromConfigSystemNameArgNull() {
+ public void testgetStreamSystemNameArgNull() {
Config config = buildStreamConfig(STREAM_ID,
StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
StreamConfig.SYSTEM(), TEST_SYSTEM2);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ runner.getStream(STREAM_ID, TEST_PHYSICAL_NAME, null);
}
// Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
@Test(expected = IllegalArgumentException.class)
- public void testStreamFromConfigStreamIdInvalid() {
+ public void testgetStreamStreamIdInvalid() {
Config config = buildStreamConfig(STREAM_ID_INVALID,
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- env.streamFromConfig(STREAM_ID_INVALID);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ runner.getStream(STREAM_ID_INVALID);
}
// Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
@Test(expected = IllegalArgumentException.class)
- public void testStreamFromConfigStreamIdEmpty() {
+ public void testgetStreamStreamIdEmpty() {
Config config = buildStreamConfig("",
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- env.streamFromConfig("");
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ runner.getStream("");
}
// Null is not allowed for streamId.
@Test(expected = NullPointerException.class)
- public void testStreamFromConfigStreamIdNull() {
+ public void testgetStreamStreamIdNull() {
Config config = buildStreamConfig(null,
StreamConfig.SYSTEM(), TEST_SYSTEM);
- AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
- env.streamFromConfig(null);
+ AbstractApplicationRunner runner = new TestAbstractApplicationRunnerImpl(config);
+ runner.getStream(null);
}