You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/02/27 21:08:18 UTC
samza git commit: Make execution environment use api's configs
Repository: samza
Updated Branches:
refs/heads/samza-fluent-api-v1 a9b213c13 -> 8c1f56d6d
Make execution environment use api's configs
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8c1f56d6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8c1f56d6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8c1f56d6
Branch: refs/heads/samza-fluent-api-v1
Commit: 8c1f56d6d0b596ed455d215b00439da9098049c4
Parents: a9b213c
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Mon Feb 27 13:07:23 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Feb 27 13:07:23 2017 -0800
----------------------------------------------------------------------
.../main/scala/org/apache/samza/config/JobConfig.scala | 4 ----
.../src/main/scala/org/apache/samza/job/JobRunner.scala | 11 +++--------
2 files changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/8c1f56d6/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 6b1473c..1c58293 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -196,8 +196,4 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
case Some(mode) => mode.toBoolean
case _ => false
}
-
- def getExecutionEnv = getOrElse(JobConfig.EXECUTION_ENV, "")
-
- def getStreamGraphBuilder = getOrElse(JobConfig.STREAM_GRAPH_BUILDER, "")
}
http://git-wip-us.apache.org/repos/asf/samza/blob/8c1f56d6/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index a34cedb..61bfafb 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -70,16 +70,11 @@ object JobRunner extends Logging {
val config = cmdline.loadConfig(options)
// start execution env if it's defined
- val envClass: String = config.getExecutionEnv
+ val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, "")
if (!envClass.isEmpty) {
val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass)
- val streamGraphBuilderClass: String = config.getStreamGraphBuilder
- if (!streamGraphBuilderClass.isEmpty) {
- val streamGraphBuilder: StreamGraphBuilder = ClassLoaderHelper.fromClassName(streamGraphBuilderClass)
- env.run(streamGraphBuilder, config)
- } else {
- throw new SamzaException("No stream graph builder defined")
- }
+ val graphBuilder: StreamGraphBuilder = Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder]
+ env.run(graphBuilder, config)
} else {
new JobRunner(rewriteConfig(config)).run()
}