You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/02/27 21:08:18 UTC

samza git commit: Make execution environment use api's configs

Repository: samza
Updated Branches:
  refs/heads/samza-fluent-api-v1 a9b213c13 -> 8c1f56d6d


Make execution environment use api's configs


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/8c1f56d6
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/8c1f56d6
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/8c1f56d6

Branch: refs/heads/samza-fluent-api-v1
Commit: 8c1f56d6d0b596ed455d215b00439da9098049c4
Parents: a9b213c
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Mon Feb 27 13:07:23 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Mon Feb 27 13:07:23 2017 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/samza/config/JobConfig.scala   |  4 ----
 .../src/main/scala/org/apache/samza/job/JobRunner.scala  | 11 +++--------
 2 files changed, 3 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/8c1f56d6/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
index 6b1473c..1c58293 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala
@@ -196,8 +196,4 @@ class JobConfig(config: Config) extends ScalaMapConfig(config) with Logging {
     case Some(mode) => mode.toBoolean
     case _ => false
   }
-
-  def getExecutionEnv = getOrElse(JobConfig.EXECUTION_ENV, "")
-
-  def getStreamGraphBuilder = getOrElse(JobConfig.STREAM_GRAPH_BUILDER, "")
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/8c1f56d6/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index a34cedb..61bfafb 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -70,16 +70,11 @@ object JobRunner extends Logging {
     val config = cmdline.loadConfig(options)
 
     // start execution env if it's defined
-    val envClass: String = config.getExecutionEnv
+    val envClass: String = config.get(ExecutionEnvironment.ENVIRONMENT_CONFIG, "")
     if (!envClass.isEmpty) {
       val env: ExecutionEnvironment = ClassLoaderHelper.fromClassName(envClass)
-      val streamGraphBuilderClass: String = config.getStreamGraphBuilder
-      if (!streamGraphBuilderClass.isEmpty) {
-        val streamGraphBuilder: StreamGraphBuilder = ClassLoaderHelper.fromClassName(streamGraphBuilderClass)
-        env.run(streamGraphBuilder, config)
-      } else {
-        throw new SamzaException("No stream graph builder defined")
-      }
+      val graphBuilder: StreamGraphBuilder = Class.forName(config.get(StreamGraphBuilder.BUILDER_CLASS_CONFIG)).newInstance.asInstanceOf[StreamGraphBuilder]
+      env.run(graphBuilder, config)
     } else {
       new JobRunner(rewriteConfig(config)).run()
     }