You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/02/18 22:06:10 UTC

[samza] branch master updated: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config (#1278)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 93c1dd5  SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config (#1278)
93c1dd5 is described below

commit 93c1dd5d460113df66800e61642b7f34da2bae59
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Tue Feb 18 14:06:00 2020 -0800

    SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config (#1278)
    
    Design:
    https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner
    
    Changes:
    1. Update ProcessJobFactory to load full job config, execute planning and write full job config back to coordiantor stream, which was done by RemoteApplicationRunner
    2. Update ThreadJobFactory to load full job config, execute planning and write full job config back to coordiantor stream, which was done by RemoteApplicationRunner
---
 .../apache/samza/job/local/ProcessJobFactory.scala | 47 +++++++++++++++-------
 .../apache/samza/job/local/ThreadJobFactory.scala  | 38 ++++++++++++-----
 2 files changed, 61 insertions(+), 24 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
index 36f1457..ca82892 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
@@ -22,25 +22,48 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
- * Creates a stand alone ProcessJob with the specified config.
+ * Creates a ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    var config = submissionConfig
+
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
+      val jobConfigs = planner.prepareJobs
+
+      if (jobConfigs.size != 1) {
+        throw new SamzaException("Only single process job is supported.")
+      }
+
+      // This is the full job config
+      config = jobConfigs.get(0)
+      // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
+      CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
+      DiagnosticsUtil.createDiagnosticsStream(config)
+    }
+
     val containerCount = new JobConfig(config).getContainerCount
 
     if (containerCount > 1) {
@@ -51,15 +74,11 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     val coordinatorStreamStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap())
     coordinatorStreamStore.init()
 
-    val configFromCoordinatorStream: Config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
-
     val changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
-
-    val coordinator = JobModelManager(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
-      coordinatorStreamStore, metricsRegistry)
-    val jobModel = coordinator.jobModel
-
+    val jobModelManager = JobModelManager(config, changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, metricsRegistry)
+    val jobModel = jobModelManager.jobModel
     val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer]
+
     for (containerModel <- jobModel.getContainers.values) {
       for (taskModel <- containerModel.getTasks.values) {
         taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
@@ -86,14 +105,14 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     info("Using command builder class %s" format commandBuilderClass)
     val commandBuilder = ReflectionUtil.getObj(commandBuilderClass, classOf[CommandBuilder])
 
-    // JobCoordinator is stopped by ProcessJob when it exits
-    coordinator.start
+    // Start JobModelManager which will be stopped by ProcessJob when it exits
+    jobModelManager.start
 
     commandBuilder
       .setConfig(config)
       .setId("0")
-      .setUrl(coordinator.server.getUrl)
+      .setUrl(jobModelManager.server.getUrl)
 
-    new ProcessJob(commandBuilder, coordinator)
+    new ProcessJob(commandBuilder, jobModelManager)
   }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 068499c..deea95a 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -19,16 +19,18 @@
 
 package org.apache.samza.job.local
 
+import org.apache.samza.SamzaException
 import org.apache.samza.application.ApplicationUtil
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.JobConfig._
 import org.apache.samza.config.ShellCommandConfig._
-import org.apache.samza.config.{Config, JobConfig, TaskConfig}
+import org.apache.samza.config.{Config, JobConfig}
 import org.apache.samza.container.{SamzaContainer, SamzaContainerListener, TaskName}
 import org.apache.samza.context.{ExternalContext, JobContextImpl}
-import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
+import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter}
@@ -36,7 +38,7 @@ import org.apache.samza.runtime.ProcessorContext
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
 import org.apache.samza.task.{TaskFactory, TaskFactoryUtil}
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable
@@ -45,20 +47,36 @@ import scala.collection.mutable
   * Creates a new Thread job with the given config
   */
 class ThreadJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
+    var config = submissionConfig
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
+      val jobConfigs = planner.prepareJobs
+
+      if (jobConfigs.size != 1) {
+        throw new SamzaException("Only single stage job is supported.")
+      }
+
+      // This is the full job config
+      config = jobConfigs.get(0)
+      // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
+      CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
+      DiagnosticsUtil.createDiagnosticsStream(config)
+    }
 
     val metricsRegistry = new MetricsRegistryMap()
     val coordinatorStreamStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap())
     coordinatorStreamStore.init()
 
-    val configFromCoordinatorStream: Config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
-
     val changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
 
-    val coordinator = JobModelManager(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
+    val jobModelManager = JobModelManager(config, changelogStreamManager.readPartitionMapping(),
       coordinatorStreamStore, metricsRegistry)
-    val jobModel = coordinator.jobModel
+    val jobModel = jobModelManager.jobModel
 
     val taskPartitionMappings: mutable.Map[TaskName, Integer] = mutable.Map[TaskName, Integer]()
     for (containerModel <- jobModel.getContainers.values) {
@@ -122,7 +140,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
     }
 
     try {
-      coordinator.start
+      jobModelManager.start
       val container = SamzaContainer(
         containerId,
         jobModel,
@@ -138,7 +156,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
       val threadJob = new ThreadJob(container)
       threadJob
     } finally {
-      coordinator.stop
+      jobModelManager.stop
       if (jmxServer != null) {
         jmxServer.stop
       }