You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:37 UTC

[21/50] [abbrv] samza git commit: SAMZA-793 static config rewriter should be invoked in JobRunner

SAMZA-793 static config rewriter should be invoked in JobRunner


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bc7a07a1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bc7a07a1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bc7a07a1

Branch: refs/heads/samza-sql
Commit: bc7a07a1a0b8d610db2aec47d8c8363263478274
Parents: 8c7e2eb
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Sun Oct 11 22:27:43 2015 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 17 10:19:23 2015 -0800

----------------------------------------------------------------------
 .../samza/coordinator/JobCoordinator.scala      | 29 ++----------------
 .../scala/org/apache/samza/job/JobRunner.scala  | 31 +++++++++++++++++---
 2 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bc7a07a1/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index ef40c35..112ec1c 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -21,8 +21,8 @@ package org.apache.samza.coordinator
 
 
 import org.apache.samza.config.StorageConfig
-import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
-import org.apache.samza.config.{Config, ConfigRewriter}
+import org.apache.samza.job.model.{JobModel, TaskModel}
+import org.apache.samza.config.Config
 import org.apache.samza.SamzaException
 import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
@@ -90,7 +90,7 @@ object JobCoordinator extends Logging {
 
     val streamMetadataCache = new StreamMetadataCache(systemAdmins)
 
-    val jobCoordinator = getJobCoordinator(rewriteConfig(config), changelogManager, localityManager, streamMetadataCache)
+    val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache)
     createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
 
     jobCoordinator
@@ -141,29 +141,6 @@ object JobCoordinator extends Logging {
   }
 
   /**
-   * Re-writes configuration using a ConfigRewriter, if one is defined. If
-   * there is no ConfigRewriter defined for the job, then this method is a
-   * no-op.
-   *
-   * @param config The config to re-write.
-   */
-  def rewriteConfig(config: Config): Config = {
-    def rewrite(c: Config, rewriterName: String): Config = {
-      val klass = config
-        .getConfigRewriterClass(rewriterName)
-        .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
-      val rewriter = Util.getObj[ConfigRewriter](klass)
-      info("Re-writing config with " + rewriter)
-      rewriter.rewrite(rewriterName, c)
-    }
-
-    config.getConfigRewriters match {
-      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
-      case _ => config
-    }
-  }
-
-  /**
    * The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels
    * which catchup with the latest content from the coordinator stream.
    */

http://git-wip-us.apache.org/repos/asf/samza/blob/bc7a07a1/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index d6109ec..a3613ff 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.job
 
 import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
+import org.apache.samza.config.{ConfigRewriter, Config}
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.ApplicationStatus.Running
@@ -30,17 +30,40 @@ import org.apache.samza.util.Logging
 import org.apache.samza.util.Util
 import scala.collection.JavaConversions._
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemProducer, CoordinatorStreamSystemFactory}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 
 
-object JobRunner {
+object JobRunner extends Logging {
   val SOURCE = "job-runner"
 
+  /**
+   * Re-writes configuration using a ConfigRewriter, if one is defined. If
+   * there is no ConfigRewriter defined for the job, then this method is a
+   * no-op.
+   *
+   * @param config The config to re-write.
+   */
+  def rewriteConfig(config: Config): Config = {
+    def rewrite(c: Config, rewriterName: String): Config = {
+      val klass = config
+        .getConfigRewriterClass(rewriterName)
+        .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+      val rewriter = Util.getObj[ConfigRewriter](klass)
+      info("Re-writing config with " + rewriter)
+      rewriter.rewrite(rewriterName, c)
+    }
+
+    config.getConfigRewriters match {
+      case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+      case _ => config
+    }
+  }
+
   def main(args: Array[String]) {
     val cmdline = new CommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
-    new JobRunner(config).run()
+    new JobRunner(rewriteConfig(config)).run()
   }
 }