You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/07 23:02:43 UTC
samza git commit: SAMZA-1122;
Rename ExecutionEnvironment to ApplicationRunner
Repository: samza
Updated Branches:
refs/heads/master e6c1eed4f -> 1956dac94
SAMZA-1122; Rename ExecutionEnvironment to ApplicationRunner
Some refactoring/cleanup:
- rename ExecutionEnvironment to ApplicationRunner, including all the subclasses.
- rename the package to be org.apache.samza.runtime
- rename the StandalondApplicationRunner to be LocalApplicationRunner
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Reviewers: Prateek Maheshwari <pm...@linkedin.com>
Closes #76 from xinyuiscool/SAMZA-1122 and squashes the following commits:
cff5206 [Xinyu Liu] Merge branch 'SAMZA-1122' of https://github.com/xinyuiscool/samza into SAMZA-1122
c341d3d [Xinyu Liu] SAMZA-1122: Rename ExecutionEnvironment to ApplicationRunner
6a71205 [Xinyu Liu] SAMZA-1122: Rename ExecutionEnvironment to ApplicationRunner
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1956dac9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1956dac9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1956dac9
Branch: refs/heads/master
Commit: 1956dac94b2c41e3e99a0e47d1f5704882b10bbd
Parents: e6c1eed
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Tue Mar 7 15:02:30 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Tue Mar 7 15:02:30 2017 -0800
----------------------------------------------------------------------
.../apache/samza/runtime/ApplicationRunner.java | 103 ++++++
.../samza/system/ExecutionEnvironment.java | 101 ------
.../runtime/AbstractApplicationRunner.java | 89 +++++
.../samza/runtime/LocalApplicationRunner.java | 54 +++
.../samza/runtime/RemoteApplicationRunner.java | 42 +++
.../system/AbstractExecutionEnvironment.java | 88 -----
.../system/RemoteExecutionEnvironment.java | 41 ---
.../system/StandaloneExecutionEnvironment.java | 54 ---
.../samza/example/KeyValueStoreExample.java | 17 +-
.../samza/example/NoContextStreamExample.java | 15 +-
.../samza/example/OrderShipmentJoinExample.java | 15 +-
.../samza/example/PageViewCounterExample.java | 6 +-
.../samza/example/RepartitionExample.java | 19 +-
.../TestAbstractExecutionEnvironment.java | 333 +++++++++++++++++++
.../TestAbstractExecutionEnvironment.java | 331 ------------------
15 files changed, 663 insertions(+), 645 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
new file mode 100644
index 0000000..ff31eff
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.lang.reflect.Constructor;
+import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.config.ConfigException;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.system.StreamSpec;
+
+
+/**
+ * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
+ *
+ * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
+ * to support the {@link ApplicationRunner#fromConfig(Config)} static constructor.
+ */
+@InterfaceStability.Unstable
+public interface ApplicationRunner {
+
+ String RUNNER_CONFIG = "job.runner.class";
+ String DEFAULT_RUNNER_CLASS = "org.apache.samza.runtime.RemoteApplicationRunner";
+
+ /**
+ * Static method to create the local {@link ApplicationRunner}.
+ *
+ * @param config configuration passed in to initialize the Samza local process
+ * @return the local {@link ApplicationRunner} to run the user-defined stream applications
+ */
+ static ApplicationRunner getLocalRunner(Config config) {
+ return null;
+ }
+
+ /**
+ * Static method to load the {@link ApplicationRunner}
+ *
+ * Requires the implementation class to define a constructor with a single {@link Config} as the argument.
+ *
+ * @param config configuration passed in to initialize the Samza processes
+ * @return the configure-driven {@link ApplicationRunner} to run the user-defined stream applications
+ */
+ static ApplicationRunner fromConfig(Config config) {
+ try {
+ Class<?> runnerClass = Class.forName(config.get(RUNNER_CONFIG, DEFAULT_RUNNER_CLASS));
+ if (ApplicationRunner.class.isAssignableFrom(runnerClass)) {
+ Constructor<?> constructor = runnerClass.getConstructor(Config.class); // *sigh*
+ return (ApplicationRunner) constructor.newInstance(config);
+ }
+ } catch (Exception e) {
+ throw new ConfigException(String.format("Problem in loading ApplicationRunner class %s", config.get(
+ RUNNER_CONFIG)), e);
+ }
+ throw new ConfigException(String.format(
+ "Class %s does not implement interface ApplicationRunner properly",
+ config.get(RUNNER_CONFIG)));
+ }
+
+ /**
+ * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph}
+ *
+ * @param graphBuilder the user-defined {@link StreamGraphBuilder} object
+ * @param config the {@link Config} object for this job
+ */
+ void run(StreamGraphBuilder graphBuilder, Config config);
+
+ /**
+ * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+ *
+ * The stream configurations are read from the following properties in the config:
+ * {@code streams.{$streamId}.*}
+ * <br>
+ * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
+ * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
+ *
+ * <ul>
+ * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
+ * the stream will be associated with the System defined in {@code job.default.system}</li>
+ * <li>samza.physical.name - The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+ * If this property isn't defined the physical.name will be set to the streamId</li>
+ * </ul>
+ *
+ * @param streamId The logical identifier for the stream in Samza.
+ * @return The {@link StreamSpec} instance.
+ */
+ StreamSpec streamFromConfig(String streamId);
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java b/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
deleted file mode 100644
index 8444c91..0000000
--- a/samza-api/src/main/java/org/apache/samza/system/ExecutionEnvironment.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import java.lang.reflect.Constructor;
-import org.apache.samza.annotation.InterfaceStability;
-import org.apache.samza.config.ConfigException;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-
-
-/**
- * Interface to be implemented by physical execution engine to deploy the config and jobs to run the {@link org.apache.samza.operators.StreamGraph}
- *
- * Implementations of this interface must define a constructor with a single {@link Config} as the argument in order
- * to support the {@link ExecutionEnvironment#fromConfig(Config)} static constructor.
- */
-@InterfaceStability.Unstable
-public interface ExecutionEnvironment {
-
- String ENVIRONMENT_CONFIG = "job.execution.environment.class";
- String DEFAULT_ENVIRONMENT_CLASS = "org.apache.samza.system.StandaloneExecutionEnvironment";
-
- /**
- * Static method to load the local standalone environment
- *
- * @param config configuration passed in to initialize the Samza standalone process
- * @return the standalone {@link ExecutionEnvironment} to run the user-defined stream applications
- */
- static ExecutionEnvironment getLocalEnvironment(Config config) {
- return null;
- }
-
- /**
- * Static method to load the non-standalone environment.
- *
- * Requires the implementation class to define a constructor with a single {@link Config} as the argument.
- *
- * @param config configuration passed in to initialize the Samza processes
- * @return the configure-driven {@link ExecutionEnvironment} to run the user-defined stream applications
- */
- static ExecutionEnvironment fromConfig(Config config) {
- try {
- Class<?> environmentClass = Class.forName(config.get(ENVIRONMENT_CONFIG, DEFAULT_ENVIRONMENT_CLASS));
- if (ExecutionEnvironment.class.isAssignableFrom(environmentClass)) {
- Constructor<?> constructor = environmentClass.getConstructor(Config.class); // *sigh*
- return (ExecutionEnvironment) constructor.newInstance(config);
- }
- } catch (Exception e) {
- throw new ConfigException(String.format("Problem in loading ExecutionEnvironment class %s", config.get(ENVIRONMENT_CONFIG)), e);
- }
- throw new ConfigException(String.format(
- "Class %s does not implement interface ExecutionEnvironment properly",
- config.get(ENVIRONMENT_CONFIG)));
- }
-
- /**
- * Method to be invoked to deploy and run the actual Samza jobs to execute {@link org.apache.samza.operators.StreamGraph}
- *
- * @param graphBuilder the user-defined {@link StreamGraphBuilder} object
- * @param config the {@link Config} object for this job
- */
- void run(StreamGraphBuilder graphBuilder, Config config);
-
- /**
- * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
- *
- * The stream configurations are read from the following properties in the config:
- * {@code streams.{$streamId}.*}
- * <br>
- * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two
- * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system.
- *
- * <ul>
- * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
- * the stream will be associated with the System defined in {@code job.default.system}</li>
- * <li>samza.physical.name - The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
- * If this property isn't defined the physical.name will be set to the streamId</li>
- * </ul>
- *
- * @param streamId The logical identifier for the stream in Samza.
- * @return The {@link StreamSpec} instance.
- */
- StreamSpec streamFromConfig(String streamId);
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
new file mode 100644
index 0000000..8e21ec2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.system.StreamSpec;
+
+
+public abstract class AbstractApplicationRunner implements ApplicationRunner {
+
+ private final Config config;
+
+ public AbstractApplicationRunner(Config config) {
+ if (config == null) {
+ throw new NullPointerException("Parameter 'config' cannot be null.");
+ }
+
+ this.config = config;
+ }
+
+ @Override
+ public StreamSpec streamFromConfig(String streamId) {
+ StreamConfig streamConfig = new StreamConfig(config);
+ String physicalName = streamConfig.getPhysicalName(streamId, streamId);
+
+ return streamFromConfig(streamId, physicalName);
+ }
+
+ /**
+ * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+ *
+ * The stream configurations are read from the following properties in the config:
+ * {@code streams.{$streamId}.*}
+ * <br>
+ * All properties matching this pattern are assumed to be system-specific with one exception. The following
+ * property is a Samza property which is used to bind the stream to a system.
+ *
+ * <ul>
+ * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
+ * the stream will be associated with the System defined in {@code job.default.system}</li>
+ * </ul>
+ *
+ * @param streamId The logical identifier for the stream in Samza.
+ * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+ * @return The {@link StreamSpec} instance.
+ */
+ /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
+ StreamConfig streamConfig = new StreamConfig(config);
+ String system = streamConfig.getSystem(streamId);
+
+ return streamFromConfig(streamId, physicalName, system);
+ }
+
+ /**
+ * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
+ *
+ * The stream configurations are read from the following properties in the config:
+ * {@code streams.{$streamId}.*}
+ *
+ * @param streamId The logical identifier for the stream in Samza.
+ * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
+ * @param system The name of the System on which this stream will be used.
+ * @return The {@link StreamSpec} instance.
+ */
+ /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
+ StreamConfig streamConfig = new StreamConfig(config);
+ Map<String, String> properties = streamConfig.getStreamProperties(streamId);
+
+ return new StreamSpec(streamId, physicalName, system, properties);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
new file mode 100644
index 0000000..c936553
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.runtime;
+
+import org.apache.samza.operators.StreamGraph;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.StreamGraphImpl;
+
+
+/**
+ * This class implements the {@link ApplicationRunner} that runs the applications in standalone environment
+ */
+public class LocalApplicationRunner extends AbstractApplicationRunner {
+
+ public LocalApplicationRunner(Config config) {
+ super(config);
+ }
+
+ // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
+ StreamGraph createGraph(StreamGraphBuilder app, Config config) {
+ StreamGraphImpl graph = new StreamGraphImpl();
+ app.init(graph, config);
+ return graph;
+ }
+
+ @Override public void run(StreamGraphBuilder app, Config config) {
+ // 1. get logic graph for optimization
+ // StreamGraph logicGraph = this.createGraph(app, config);
+ // 2. potential optimization....
+ // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
+ // 4. create all input/output/intermediate topics
+ // 5. create the configuration for StreamProcessor
+ // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
new file mode 100644
index 0000000..6e33fe8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/runtime/RemoteApplicationRunner.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.config.Config;
+
+
+/**
+ * This class implements the {@link ApplicationRunner} that runs the applications in a remote cluster
+ */
+public class RemoteApplicationRunner extends AbstractApplicationRunner {
+
+ public RemoteApplicationRunner(Config config) {
+ super(config);
+ }
+
+ @Override public void run(StreamGraphBuilder app, Config config) {
+ // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
+ // TODO: actually instantiate the tasks and run the job, i.e.
+ // 1. create all input/output/intermediate topics
+ // 2. create the single job configuration
+ // 3. execute JobRunner to submit the single job for the whole graph
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
deleted file mode 100644
index 64d60b7..0000000
--- a/samza-core/src/main/java/org/apache/samza/system/AbstractExecutionEnvironment.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.StreamConfig;
-
-
-public abstract class AbstractExecutionEnvironment implements ExecutionEnvironment {
-
- private final Config config;
-
- public AbstractExecutionEnvironment(Config config) {
- if (config == null) {
- throw new NullPointerException("Parameter 'config' cannot be null.");
- }
-
- this.config = config;
- }
-
- @Override
- public StreamSpec streamFromConfig(String streamId) {
- StreamConfig streamConfig = new StreamConfig(config);
- String physicalName = streamConfig.getPhysicalName(streamId, streamId);
-
- return streamFromConfig(streamId, physicalName);
- }
-
- /**
- * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
- *
- * The stream configurations are read from the following properties in the config:
- * {@code streams.{$streamId}.*}
- * <br>
- * All properties matching this pattern are assumed to be system-specific with one exception. The following
- * property is a Samza property which is used to bind the stream to a system.
- *
- * <ul>
- * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined
- * the stream will be associated with the System defined in {@code job.default.system}</li>
- * </ul>
- *
- * @param streamId The logical identifier for the stream in Samza.
- * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
- * @return The {@link StreamSpec} instance.
- */
- /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName) {
- StreamConfig streamConfig = new StreamConfig(config);
- String system = streamConfig.getSystem(streamId);
-
- return streamFromConfig(streamId, physicalName, system);
- }
-
- /**
- * Constructs a {@link StreamSpec} from the configuration for the specified streamId.
- *
- * The stream configurations are read from the following properties in the config:
- * {@code streams.{$streamId}.*}
- *
- * @param streamId The logical identifier for the stream in Samza.
- * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer.
- * @param system The name of the System on which this stream will be used.
- * @return The {@link StreamSpec} instance.
- */
- /*package private*/ StreamSpec streamFromConfig(String streamId, String physicalName, String system) {
- StreamConfig streamConfig = new StreamConfig(config);
- Map<String, String> properties = streamConfig.getStreamProperties(streamId);
-
- return new StreamSpec(streamId, physicalName, system, properties);
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
deleted file mode 100644
index e592e66..0000000
--- a/samza-core/src/main/java/org/apache/samza/system/RemoteExecutionEnvironment.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in YARN environment
- */
-public class RemoteExecutionEnvironment extends AbstractExecutionEnvironment {
-
- public RemoteExecutionEnvironment(Config config) {
- super(config);
- }
-
- @Override public void run(StreamGraphBuilder app, Config config) {
- // TODO: add description of ProcessContext that is going to create a sub-DAG of the {@code graph}
- // TODO: actually instantiate the tasks and run the job, i.e.
- // 1. create all input/output/intermediate topics
- // 2. create the single job configuration
- // 3. execute JobRunner to submit the single job for the whole graph
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java b/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
deleted file mode 100644
index 71d60ef..0000000
--- a/samza-core/src/main/java/org/apache/samza/system/StandaloneExecutionEnvironment.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.system;
-
-import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.apache.samza.config.Config;
-import org.apache.samza.operators.StreamGraphImpl;
-
-
-/**
- * This class implements the {@link ExecutionEnvironment} that runs the applications in standalone environment
- */
-public class StandaloneExecutionEnvironment extends AbstractExecutionEnvironment {
-
- public StandaloneExecutionEnvironment(Config config) {
- super(config);
- }
-
- // TODO: may want to move this to a common base class for all {@link ExecutionEnvironment}
- StreamGraph createGraph(StreamGraphBuilder app, Config config) {
- StreamGraphImpl graph = new StreamGraphImpl();
- app.init(graph, config);
- return graph;
- }
-
- @Override public void run(StreamGraphBuilder app, Config config) {
- // 1. get logic graph for optimization
- // StreamGraph logicGraph = this.createGraph(app, config);
- // 2. potential optimization....
- // 3. create new instance of StreamGraphBuilder that would generate the optimized graph
- // 4. create all input/output/intermediate topics
- // 5. create the configuration for StreamProcessor
- // 6. start the StreamProcessor w/ optimized instance of StreamGraphBuilder
- }
-
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 4a0681e..e54d01a 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -31,7 +31,7 @@ import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.KeyValueStore;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.CommandLine;
@@ -43,15 +43,14 @@ import org.apache.samza.util.CommandLine;
public class KeyValueStoreExample implements StreamGraphBuilder {
/**
- * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
- * invoking context as in standalone:
+ * used by remote application runner to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in local runner:
*
* public static void main(String args[]) throws Exception {
* CommandLine cmdLine = new CommandLine();
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
- * UserMainExample runnableApp = new UserMainExample();
- * runnableApp.run(remoteEnv, config);
+ * ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+ * runner.run(new UserMainExample(), config)
* }
*
*/
@@ -67,12 +66,12 @@ public class KeyValueStoreExample implements StreamGraphBuilder {
}
- // standalone local program model
+ // local program model
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new KeyValueStoreExample(), config);
+ ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ localRunner.run(new KeyValueStoreExample(), config);
}
class MyStatsCounter implements FlatMapFunction<PageViewEvent, StatsOutput> {
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
index 320680c..ef58c8b 100644
--- a/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/NoContextStreamExample.java
@@ -27,7 +27,7 @@ import org.apache.samza.operators.data.Offset;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.CommandLine;
@@ -89,15 +89,16 @@ public class NoContextStreamExample implements StreamGraphBuilder {
}
}
+
/**
- * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
- * invoking context as in standalone:
+ * used by remote application runner to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in local:
*
* public static void main(String args[]) throws Exception {
* CommandLine cmdLine = new CommandLine();
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ExecutionEnvironment remoteEnv = ExecutionEnvironment.fromConfig(config);
- * remoteEnv.run(new NoContextStreamExample(), config);
+ * ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+ * runner.run(new NoContextStreamExample(), config);
* }
*
*/
@@ -119,8 +120,8 @@ public class NoContextStreamExample implements StreamGraphBuilder {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new NoContextStreamExample(), config);
+ ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ localRunner.run(new NoContextStreamExample(), config);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index 30ce7d2..0e60c2f 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -27,7 +27,7 @@ import org.apache.samza.operators.data.MessageEnvelope;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
@@ -38,15 +38,14 @@ import org.apache.samza.util.CommandLine;
public class OrderShipmentJoinExample implements StreamGraphBuilder {
/**
- * used by remote execution environment to launch the job in remote program. The remote program should follow the similar
- * invoking context as in standalone:
+ * used by remote application runner to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in local runner:
*
* public static void main(String args[]) throws Exception {
* CommandLine cmdLine = new CommandLine();
* Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- * ExecutionEnvironment remoteEnv = ExecutionEnvironment.getRemoteEnvironment(config);
- * UserMainExample runnableApp = new UserMainExample();
- * runnableApp.run(remoteEnv, config);
+ * ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+ * runner.run(new UserMainExample(), config);
* }
*
*/
@@ -64,8 +63,8 @@ public class OrderShipmentJoinExample implements StreamGraphBuilder {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new OrderShipmentJoinExample(), config);
+ ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ localRunner.run(new OrderShipmentJoinExample(), config);
}
StreamSpec input1 = new StreamSpec("orderStream", "OrderEvent", "kafka");
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index fcf67a7..886bf1c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -28,7 +28,7 @@ import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
@@ -57,8 +57,8 @@ public class PageViewCounterExample implements StreamGraphBuilder {
public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new PageViewCounterExample(), config);
+ ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ localRunner.run(new PageViewCounterExample(), config);
}
StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 228668c..b8abacf 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -26,7 +26,7 @@ import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.StringSerde;
-import org.apache.samza.system.ExecutionEnvironment;
+import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.CommandLine;
@@ -39,6 +39,19 @@ import java.time.Duration;
public class RepartitionExample implements StreamGraphBuilder {
/**
+ * used by remote application runner to launch the job in remote program. The remote program should follow the similar
+ * invoking context as in local runner:
+ *
+ * public static void main(String args[]) throws Exception {
+ * CommandLine cmdLine = new CommandLine();
+ * Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
+ * ApplicationRunner runner = ApplicationRunner.fromConfig(config);
+ * runner.run(new UserMainExample(), config);
+ * }
+ *
+ */
+
+ /**
* used by remote execution environment to launch the job in remote program. The remote program should follow the similar
* invoking context as in standalone:
*
@@ -68,8 +81,8 @@ public class RepartitionExample implements StreamGraphBuilder {
public static void main(String[] args) throws Exception {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
- ExecutionEnvironment standaloneEnv = ExecutionEnvironment.getLocalEnvironment(config);
- standaloneEnv.run(new RepartitionExample(), config);
+ ApplicationRunner localRunner = ApplicationRunner.getLocalRunner(config);
+ localRunner.run(new RepartitionExample(), config);
}
StreamSpec input1 = new StreamSpec("pageViewEventStream", "PageViewEvent", "kafka");
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java
new file mode 100644
index 0000000..4942ed6
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestAbstractExecutionEnvironment.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.runtime;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.StreamConfig;
+import org.apache.samza.operators.StreamGraphBuilder;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+public class TestAbstractExecutionEnvironment {
+ private static final String STREAM_ID = "t3st-Stream_Id";
+ private static final String STREAM_ID_INVALID = "test#Str3amId!";
+
+ private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
+ private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
+ private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?";
+
+ private static final String TEST_SYSTEM = "t3st-System_Name";
+ private static final String TEST_SYSTEM2 = "testSystemName2";
+ private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
+
+ private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
+
+
+ @Test(expected = NullPointerException.class)
+ public void testConfigValidation() {
+ new TestAbstractApplicationRunnerImpl(null);
+ }
+
+ // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
+ @Test
+ public void testStreamFromConfigWithPhysicalNameInConfig() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+ assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+ }
+
+ // The streamId should be used as the physicalName when the physical name is not specified.
+ // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
+ @Test
+ public void testStreamFromConfigWithoutPhysicalNameInConfig() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+ assertEquals(STREAM_ID, spec.getPhysicalName());
+ }
+
+ // If the system is specified at the stream scope, use it
+ @Test
+ public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+ assertEquals(TEST_SYSTEM, spec.getSystemName());
+ }
+
+ // If system isn't specified at stream scope, use the default system
+ @Test
+ public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
+ Config config = addConfigs(buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
+ JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+ assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
+ }
+
+ // Stream scope should override default scope
+ @Test
+ public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
+ Config config = addConfigs(buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+ StreamConfig.SYSTEM(), TEST_SYSTEM),
+ JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+ assertEquals(TEST_SYSTEM, spec.getSystemName());
+ }
+
+ // System is required. Throw if it cannot be determined.
+ @Test(expected = Exception.class)
+ public void testStreamFromConfigWithOutSystemInConfig() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+ assertEquals(TEST_SYSTEM, spec.getSystemName());
+ }
+
+ // The properties in the config "streams.{streamId}.*" should be passed through to the spec.
+ @Test
+ public void testStreamFromConfigPropertiesPassthrough() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+ StreamConfig.SYSTEM(), TEST_SYSTEM,
+ "systemProperty1", "systemValue1",
+ "systemProperty2", "systemValue2",
+ "systemProperty3", "systemValue3");
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+ Map<String, String> properties = spec.getConfig();
+ assertEquals(3, properties.size());
+ assertEquals("systemValue1", properties.get("systemProperty1"));
+ assertEquals("systemValue2", properties.get("systemProperty2"));
+ assertEquals("systemValue3", properties.get("systemProperty3"));
+ assertEquals("systemValue1", spec.get("systemProperty1"));
+ assertEquals("systemValue2", spec.get("systemProperty2"));
+ assertEquals("systemValue3", spec.get("systemProperty3"));
+ }
+
+ // The samza properties (which are invalid for the underlying system) should be filtered out.
+ @Test
+ public void testStreamFromConfigSamzaPropertiesOmitted() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
+ StreamConfig.SYSTEM(), TEST_SYSTEM,
+ "systemProperty1", "systemValue1",
+ "systemProperty2", "systemValue2",
+ "systemProperty3", "systemValue3");
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID);
+
+ Map<String, String> properties = spec.getConfig();
+ assertEquals(3, properties.size());
+ assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
+ assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
+ assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
+ assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
+ }
+
+ // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
+ @Test
+ public void testStreamFromConfigPhysicalNameArgSimple() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
+
+ assertEquals(STREAM_ID, spec.getId());
+ assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+ assertEquals(TEST_SYSTEM, spec.getSystemName());
+ }
+
+ // Special characters are allowed for the physical name
+ @Test
+ public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
+ assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
+ }
+
+ // Null is allowed for the physical name
+ @Test
+ public void testStreamFromConfigPhysicalNameArgNull() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
+ assertNull(spec.getPhysicalName());
+ }
+
+ // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
+ @Test
+ public void testStreamFromConfigSystemNameArgValid() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
+ StreamConfig.SYSTEM(), TEST_SYSTEM2); // This too
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
+
+ assertEquals(STREAM_ID, spec.getId());
+ assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
+ assertEquals(TEST_SYSTEM, spec.getSystemName());
+ }
+
+ // Special characters are NOT allowed for system name, because it's used as an identifier in the config.
+ @Test(expected = IllegalArgumentException.class)
+ public void testStreamFromConfigSystemNameArgInvalid() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+ StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
+ }
+
+ // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
+ @Test(expected = IllegalArgumentException.class)
+ public void testStreamFromConfigSystemNameArgEmpty() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+ StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, "");
+ }
+
+ // Null is not allowed for system name.
+ @Test(expected = NullPointerException.class)
+ public void testStreamFromConfigSystemNameArgNull() {
+ Config config = buildStreamConfig(STREAM_ID,
+ StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
+ StreamConfig.SYSTEM(), TEST_SYSTEM2);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
+ }
+
+ // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
+ @Test(expected = IllegalArgumentException.class)
+ public void testStreamFromConfigStreamIdInvalid() {
+ Config config = buildStreamConfig(STREAM_ID_INVALID,
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ env.streamFromConfig(STREAM_ID_INVALID);
+ }
+
+ // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
+ @Test(expected = IllegalArgumentException.class)
+ public void testStreamFromConfigStreamIdEmpty() {
+ Config config = buildStreamConfig("",
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ env.streamFromConfig("");
+ }
+
+ // Null is not allowed for streamId.
+ @Test(expected = NullPointerException.class)
+ public void testStreamFromConfigStreamIdNull() {
+ Config config = buildStreamConfig(null,
+ StreamConfig.SYSTEM(), TEST_SYSTEM);
+
+ AbstractApplicationRunner env = new TestAbstractApplicationRunnerImpl(config);
+ env.streamFromConfig(null);
+ }
+
+
+ // Helper methods
+
+ private Config buildStreamConfig(String streamId, String... kvs) {
+ // inject streams.x. into each key
+ for (int i = 0; i < kvs.length - 1; i += 2) {
+ kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i];
+ }
+ return buildConfig(kvs);
+ }
+
+ private Config buildConfig(String... kvs) {
+ if (kvs.length % 2 != 0) {
+ throw new IllegalArgumentException("There must be parity between the keys and values");
+ }
+
+ Map<String, String> configMap = new HashMap<>();
+ for (int i = 0; i < kvs.length - 1; i += 2) {
+ configMap.put(kvs[i], kvs[i + 1]);
+ }
+ return new MapConfig(configMap);
+ }
+
+ private Config addConfigs(Config original, String... kvs) {
+ Map<String, String> result = new HashMap<>();
+ result.putAll(original);
+ result.putAll(buildConfig(kvs));
+ return new MapConfig(result);
+ }
+
+ private class TestAbstractApplicationRunnerImpl extends AbstractApplicationRunner {
+
+ public TestAbstractApplicationRunnerImpl(Config config) {
+ super(config);
+ }
+
+ @Override
+ public void run(StreamGraphBuilder graphBuilder, Config config) {
+ // do nothing
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/1956dac9/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java b/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
deleted file mode 100644
index 861f049..0000000
--- a/samza-core/src/test/java/org/apache/samza/system/TestAbstractExecutionEnvironment.java
+++ /dev/null
@@ -1,331 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.system;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
-import org.apache.samza.config.StreamConfig;
-import org.apache.samza.operators.StreamGraphBuilder;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-public class TestAbstractExecutionEnvironment {
- private static final String STREAM_ID = "t3st-Stream_Id";
- private static final String STREAM_ID_INVALID = "test#Str3amId!";
-
- private static final String TEST_PHYSICAL_NAME = "t3st-Physical_Name";
- private static final String TEST_PHYSICAL_NAME2 = "testPhysicalName2";
- private static final String TEST_PHYSICAL_NAME_SPECIAL_CHARS = "test://Physical.Name?";
-
- private static final String TEST_SYSTEM = "t3st-System_Name";
- private static final String TEST_SYSTEM2 = "testSystemName2";
- private static final String TEST_SYSTEM_INVALID = "test:System!Name@";
-
- private static final String TEST_DEFAULT_SYSTEM = "testDefaultSystemName";
-
-
- @Test(expected = NullPointerException.class)
- public void testConfigValidation() {
- new TestAbstractExecutionEnvironmentImpl(null);
- }
-
- // The physical name should be pulled from the StreamConfig.PHYSICAL_NAME property value.
- @Test
- public void testStreamFromConfigWithPhysicalNameInConfig() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
- assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
- }
-
- // The streamId should be used as the physicalName when the physical name is not specified.
- // NOTE: its either this, set to null, or exception. This seems better for backward compatibility and API brevity.
- @Test
- public void testStreamFromConfigWithoutPhysicalNameInConfig() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
- assertEquals(STREAM_ID, spec.getPhysicalName());
- }
-
- // If the system is specified at the stream scope, use it
- @Test
- public void testStreamFromConfigWithSystemAtStreamScopeInConfig() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
- assertEquals(TEST_SYSTEM, spec.getSystemName());
- }
-
- // If system isn't specified at stream scope, use the default system
- @Test
- public void testStreamFromConfigWithSystemAtDefaultScopeInConfig() {
- Config config = addConfigs(buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME),
- JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
- assertEquals(TEST_DEFAULT_SYSTEM, spec.getSystemName());
- }
-
- // Stream scope should override default scope
- @Test
- public void testStreamFromConfigWithSystemAtBothScopesInConfig() {
- Config config = addConfigs(buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
- StreamConfig.SYSTEM(), TEST_SYSTEM),
- JobConfig.JOB_DEFAULT_SYSTEM(), TEST_DEFAULT_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
- assertEquals(TEST_SYSTEM, spec.getSystemName());
- }
-
- // System is required. Throw if it cannot be determined.
- @Test(expected = Exception.class)
- public void testStreamFromConfigWithOutSystemInConfig() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
- assertEquals(TEST_SYSTEM, spec.getSystemName());
- }
-
- // The properties in the config "streams.{streamId}.*" should be passed through to the spec.
- @Test
- public void testStreamFromConfigPropertiesPassthrough() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
- StreamConfig.SYSTEM(), TEST_SYSTEM,
- "systemProperty1", "systemValue1",
- "systemProperty2", "systemValue2",
- "systemProperty3", "systemValue3");
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
- Map<String, String> properties = spec.getConfig();
- assertEquals(3, properties.size());
- assertEquals("systemValue1", properties.get("systemProperty1"));
- assertEquals("systemValue2", properties.get("systemProperty2"));
- assertEquals("systemValue3", properties.get("systemProperty3"));
- assertEquals("systemValue1", spec.get("systemProperty1"));
- assertEquals("systemValue2", spec.get("systemProperty2"));
- assertEquals("systemValue3", spec.get("systemProperty3"));
- }
-
- // The samza properties (which are invalid for the underlying system) should be filtered out.
- @Test
- public void testStreamFromConfigSamzaPropertiesOmitted() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME,
- StreamConfig.SYSTEM(), TEST_SYSTEM,
- "systemProperty1", "systemValue1",
- "systemProperty2", "systemValue2",
- "systemProperty3", "systemValue3");
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID);
-
- Map<String, String> properties = spec.getConfig();
- assertEquals(3, properties.size());
- assertNull(properties.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
- assertNull(properties.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
- assertNull(spec.get(String.format(StreamConfig.PHYSICAL_NAME_FOR_STREAM_ID(), STREAM_ID)));
- assertNull(spec.get(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), STREAM_ID)));
- }
-
- // When the physicalName argument is passed explicitly it should be used, regardless of whether it is also in the config
- @Test
- public void testStreamFromConfigPhysicalNameArgSimple() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME);
-
- assertEquals(STREAM_ID, spec.getId());
- assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
- assertEquals(TEST_SYSTEM, spec.getSystemName());
- }
-
- // Special characters are allowed for the physical name
- @Test
- public void testStreamFromConfigPhysicalNameArgSpecialCharacters() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME_SPECIAL_CHARS);
- assertEquals(TEST_PHYSICAL_NAME_SPECIAL_CHARS, spec.getPhysicalName());
- }
-
- // Null is allowed for the physical name
- @Test
- public void testStreamFromConfigPhysicalNameArgNull() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID, null);
- assertNull(spec.getPhysicalName());
- }
-
- // When the system name is provided explicitly, it should be used, regardless of whether it's also in the config
- @Test
- public void testStreamFromConfigSystemNameArgValid() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2, // This should be ignored because of the explicit arg
- StreamConfig.SYSTEM(), TEST_SYSTEM2); // This too
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- StreamSpec spec = env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM);
-
- assertEquals(STREAM_ID, spec.getId());
- assertEquals(TEST_PHYSICAL_NAME, spec.getPhysicalName());
- assertEquals(TEST_SYSTEM, spec.getSystemName());
- }
-
- // Special characters are NOT allowed for system name, because it's used as an identifier in the config.
- @Test(expected = IllegalArgumentException.class)
- public void testStreamFromConfigSystemNameArgInvalid() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
- StreamConfig.SYSTEM(), TEST_SYSTEM2);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, TEST_SYSTEM_INVALID);
- }
-
- // Empty strings are NOT allowed for system name, because it's used as an identifier in the config.
- @Test(expected = IllegalArgumentException.class)
- public void testStreamFromConfigSystemNameArgEmpty() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
- StreamConfig.SYSTEM(), TEST_SYSTEM2);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, "");
- }
-
- // Null is not allowed for system name.
- @Test(expected = NullPointerException.class)
- public void testStreamFromConfigSystemNameArgNull() {
- Config config = buildStreamConfig(STREAM_ID,
- StreamConfig.PHYSICAL_NAME(), TEST_PHYSICAL_NAME2,
- StreamConfig.SYSTEM(), TEST_SYSTEM2);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- env.streamFromConfig(STREAM_ID, TEST_PHYSICAL_NAME, null);
- }
-
- // Special characters are NOT allowed for streamId, because it's used as an identifier in the config.
- @Test(expected = IllegalArgumentException.class)
- public void testStreamFromConfigStreamIdInvalid() {
- Config config = buildStreamConfig(STREAM_ID_INVALID,
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- env.streamFromConfig(STREAM_ID_INVALID);
- }
-
- // Empty strings are NOT allowed for streamId, because it's used as an identifier in the config.
- @Test(expected = IllegalArgumentException.class)
- public void testStreamFromConfigStreamIdEmpty() {
- Config config = buildStreamConfig("",
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- env.streamFromConfig("");
- }
-
- // Null is not allowed for streamId.
- @Test(expected = NullPointerException.class)
- public void testStreamFromConfigStreamIdNull() {
- Config config = buildStreamConfig(null,
- StreamConfig.SYSTEM(), TEST_SYSTEM);
-
- AbstractExecutionEnvironment env = new TestAbstractExecutionEnvironmentImpl(config);
- env.streamFromConfig(null);
- }
-
-
- // Helper methods
-
- private Config buildStreamConfig(String streamId, String... kvs) {
- // inject streams.x. into each key
- for (int i = 0; i < kvs.length - 1; i += 2) {
- kvs[i] = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId) + kvs[i];
- }
- return buildConfig(kvs);
- }
-
- private Config buildConfig(String... kvs) {
- if (kvs.length % 2 != 0) {
- throw new IllegalArgumentException("There must be parity between the keys and values");
- }
-
- Map<String, String> configMap = new HashMap<>();
- for (int i = 0; i < kvs.length - 1; i += 2) {
- configMap.put(kvs[i], kvs[i + 1]);
- }
- return new MapConfig(configMap);
- }
-
- private Config addConfigs(Config original, String... kvs) {
- Map<String, String> result = new HashMap<>();
- result.putAll(original);
- result.putAll(buildConfig(kvs));
- return new MapConfig(result);
- }
-
- private class TestAbstractExecutionEnvironmentImpl extends AbstractExecutionEnvironment {
-
- public TestAbstractExecutionEnvironmentImpl(Config config) {
- super(config);
- }
-
- @Override
- public void run(StreamGraphBuilder graphBuilder, Config config) {
- // do nothing
- }
- }
-}