You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:37 UTC
[21/50] [abbrv] samza git commit: SAMZA-793 static config rewriter
should be invoked in JobRunner
SAMZA-793 static config rewriter should be invoked in JobRunner
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bc7a07a1
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bc7a07a1
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bc7a07a1
Branch: refs/heads/samza-sql
Commit: bc7a07a1a0b8d610db2aec47d8c8363263478274
Parents: 8c7e2eb
Author: Yi Pan (Data Infrastructure) <yi...@linkedin.com>
Authored: Sun Oct 11 22:27:43 2015 -0700
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 17 10:19:23 2015 -0800
----------------------------------------------------------------------
.../samza/coordinator/JobCoordinator.scala | 29 ++----------------
.../scala/org/apache/samza/job/JobRunner.scala | 31 +++++++++++++++++---
2 files changed, 30 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/bc7a07a1/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index ef40c35..112ec1c 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -21,8 +21,8 @@ package org.apache.samza.coordinator
import org.apache.samza.config.StorageConfig
-import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
-import org.apache.samza.config.{Config, ConfigRewriter}
+import org.apache.samza.job.model.{JobModel, TaskModel}
+import org.apache.samza.config.Config
import org.apache.samza.SamzaException
import org.apache.samza.container.grouper.task.TaskNameGrouperFactory
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
@@ -90,7 +90,7 @@ object JobCoordinator extends Logging {
val streamMetadataCache = new StreamMetadataCache(systemAdmins)
- val jobCoordinator = getJobCoordinator(rewriteConfig(config), changelogManager, localityManager, streamMetadataCache)
+ val jobCoordinator = getJobCoordinator(config, changelogManager, localityManager, streamMetadataCache)
createChangeLogStreams(config, jobCoordinator.jobModel.maxChangeLogStreamPartitions, streamMetadataCache)
jobCoordinator
@@ -141,29 +141,6 @@ object JobCoordinator extends Logging {
}
/**
- * Re-writes configuration using a ConfigRewriter, if one is defined. If
- * there is no ConfigRewriter defined for the job, then this method is a
- * no-op.
- *
- * @param config The config to re-write.
- */
- def rewriteConfig(config: Config): Config = {
- def rewrite(c: Config, rewriterName: String): Config = {
- val klass = config
- .getConfigRewriterClass(rewriterName)
- .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
- val rewriter = Util.getObj[ConfigRewriter](klass)
- info("Re-writing config with " + rewriter)
- rewriter.rewrite(rewriterName, c)
- }
-
- config.getConfigRewriters match {
- case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
- case _ => config
- }
- }
-
- /**
* The method intializes the jobModel and creates a JobModel generator which can be used to generate new JobModels
* which catchup with the latest content from the coordinator stream.
*/
http://git-wip-us.apache.org/repos/asf/samza/blob/bc7a07a1/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index d6109ec..a3613ff 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,7 +20,7 @@
package org.apache.samza.job
import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
+import org.apache.samza.config.{ConfigRewriter, Config}
import org.apache.samza.config.JobConfig.Config2Job
import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
import org.apache.samza.job.ApplicationStatus.Running
@@ -30,17 +30,40 @@ import org.apache.samza.util.Logging
import org.apache.samza.util.Util
import scala.collection.JavaConversions._
import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemProducer, CoordinatorStreamSystemFactory}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
-object JobRunner {
+object JobRunner extends Logging {
val SOURCE = "job-runner"
+ /**
+ * Re-writes configuration using a ConfigRewriter, if one is defined. If
+ * there is no ConfigRewriter defined for the job, then this method is a
+ * no-op.
+ *
+ * @param config The config to re-write.
+ */
+ def rewriteConfig(config: Config): Config = {
+ def rewrite(c: Config, rewriterName: String): Config = {
+ val klass = config
+ .getConfigRewriterClass(rewriterName)
+ .getOrElse(throw new SamzaException("Unable to find class config for config rewriter %s." format rewriterName))
+ val rewriter = Util.getObj[ConfigRewriter](klass)
+ info("Re-writing config with " + rewriter)
+ rewriter.rewrite(rewriterName, c)
+ }
+
+ config.getConfigRewriters match {
+ case Some(rewriters) => rewriters.split(",").foldLeft(config)(rewrite(_, _))
+ case _ => config
+ }
+ }
+
def main(args: Array[String]) {
val cmdline = new CommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
- new JobRunner(config).run()
+ new JobRunner(rewriteConfig(config)).run()
}
}