You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/02/14 17:57:47 UTC

[GitHub] [samza] kw2542 opened a new pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

kw2542 opened a new pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278
 
 
   Design:
   https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner
   
   Changes:
   1. Update ProcessJobFactory to load full job config, execute planning and write full job config back to coordiantor stream, which was done by RemoteApplicationRunner
   2. Update ThreadJobFactory to load full job config, execute planning and write full job config back to coordiantor stream, which was done by RemoteApplicationRunner
   
   API Changes:
   None
   
   Upgrade Instructions:
   None
   
   Usage Instructions:
   None
   
   Tests
   1. Local deploy a yarn job and verified it is working properly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] prateekm commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379572083
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,44 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
  * Creates a stand alone ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    val originalConfig = ConfigUtil.loadConfig(submissionConfig)
 
 Review comment:
   It'll be useful to explain in the PR description why this change is necessary (under Changes), and what has changed from a user perspective (in the API changes section).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379694597
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
 ##########
 @@ -45,20 +47,36 @@ import scala.collection.mutable
   * Creates a new Thread job with the given config
   */
 class ThreadJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
+    var config = submissionConfig
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
 
 Review comment:
   Similar to above about `RemoteJobPlanner`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379694707
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
 ##########
 @@ -45,20 +47,36 @@ import scala.collection.mutable
   * Creates a new Thread job with the given config
   */
 class ThreadJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
+    var config = submissionConfig
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
 
 Review comment:
   This block seems duplicated with `ProcessJobFactory`. Any benefit in sharing code?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 merged pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379695208
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,48 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
- * Creates a stand alone ProcessJob with the specified config.
+ * Creates a ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    var config = submissionConfig
+
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
 
 Review comment:
   Agree, it is interesting that StreamJobFactory is only used in JobRunner, and JobRunner is only used in RemoteApplicationRunner, i.e. StreamJobFactory is only intended to be used for remote job, so RemoteJobPlanner technically fits here. I agree we have excessive layers in our structure which should be simplified to avoid future confusion. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] prateekm commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379571147
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,44 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
  * Creates a stand alone ProcessJob with the specified config.
 
 Review comment:
   Minor: Let's remove "stand alone"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379695208
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,48 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
- * Creates a stand alone ProcessJob with the specified config.
+ * Creates a ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    var config = submissionConfig
+
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
 
 Review comment:
   Agree, it is interesting that StreamFactory is only used in JobRunner, and JobRunner is only used in RemoteApplicationRunner, looks like we have excessive layers in our structure which should be simplified to avoid future confusion. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] prateekm commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379572585
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,44 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
  * Creates a stand alone ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+    // Execute planning
+    val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
+    val jobConfigs = planner.prepareJobs
+
+    if (jobConfigs.size != 1) {
+      throw new SamzaException("Only single process job is supported.")
 
 Review comment:
   "Only single stage jobs are supported with ProcessJobFactory."

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379694518
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -87,13 +106,13 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     val commandBuilder = ReflectionUtil.getObj(commandBuilderClass, classOf[CommandBuilder])
 
     // JobCoordinator is stopped by ProcessJob when it exits
 
 Review comment:
   Could you please update this comment? It looks like it was out-of-date before you made your changes, but would be nice to clarify it now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379579249
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -51,15 +70,11 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     val coordinatorStreamStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap())
     coordinatorStreamStore.init()
 
-    val configFromCoordinatorStream: Config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
-
     val changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
-
-    val coordinator = JobModelManager(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
-      coordinatorStreamStore, metricsRegistry)
+    val coordinator = JobModelManager(config, changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, metricsRegistry)
 
 Review comment:
   updated, nice catch, not sure why it was named coordinator before.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379581912
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,44 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
  * Creates a stand alone ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    val originalConfig = ConfigUtil.loadConfig(submissionConfig)
 
 Review comment:
   Updated.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379694087
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,48 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
- * Creates a stand alone ProcessJob with the specified config.
+ * Creates a ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    var config = submissionConfig
+
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
+      val jobConfigs = planner.prepareJobs
+
+      if (jobConfigs.size != 1) {
+        throw new SamzaException("Only single process job is supported.")
+      }
+
+      // This is the full job config
+      config = jobConfigs.get(0)
+      // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
+      CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
+      DiagnosticsUtil.createDiagnosticsStream(config)
 
 Review comment:
   Similar to above: does this only need to be consistent with `RemoteApplicationRunner`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379695319
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,48 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
- * Creates a stand alone ProcessJob with the specified config.
+ * Creates a ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    var config = submissionConfig
+
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
+      val jobConfigs = planner.prepareJobs
+
+      if (jobConfigs.size != 1) {
+        throw new SamzaException("Only single process job is supported.")
+      }
+
+      // This is the full job config
+      config = jobConfigs.get(0)
+      // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
+      CoordinatorStreamUtil.writeConfigToCoordinatorStream(config)
+      DiagnosticsUtil.createDiagnosticsStream(config)
 
 Review comment:
   Yes, StreamFactory is only used by JobRunner, which is only used by RemoteApplicationRunner.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] prateekm commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
prateekm commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379570977
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -51,15 +70,11 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
     val coordinatorStreamStore: CoordinatorStreamStore = new CoordinatorStreamStore(config, new MetricsRegistryMap())
     coordinatorStreamStore.init()
 
-    val configFromCoordinatorStream: Config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
-
     val changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
-
-    val coordinator = JobModelManager(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(),
-      coordinatorStreamStore, metricsRegistry)
+    val coordinator = JobModelManager(config, changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, metricsRegistry)
 
 Review comment:
   Let's just call this jobModelManager

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
kw2542 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379695518
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
 ##########
 @@ -45,20 +47,36 @@ import scala.collection.mutable
   * Creates a new Thread job with the given config
   */
 class ThreadJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
     info("Creating a ThreadJob, which is only meant for debugging.")
+    var config = submissionConfig
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
 
 Review comment:
   I am planning to simplify and refactor all of them after I introduces beam related planning.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1278: SAMZA-2458: Update ProcessJobFactory and ThreadJobFactory to load full job config
URL: https://github.com/apache/samza/pull/1278#discussion_r379693905
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -22,25 +22,48 @@ package org.apache.samza.job.local
 import java.util
 
 import org.apache.samza.SamzaException
+import org.apache.samza.application.ApplicationUtil
+import org.apache.samza.application.descriptors.ApplicationDescriptorUtil
 import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping
 import org.apache.samza.coordinator.{JobModelManager, MetadataResourceUtil}
+import org.apache.samza.execution.RemoteJobPlanner
 import org.apache.samza.job.model.JobModelUtil
 import org.apache.samza.job.{CommandBuilder, ShellCommandBuilder, StreamJob, StreamJobFactory}
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.startpoint.StartpointManager
 import org.apache.samza.storage.ChangelogStreamManager
-import org.apache.samza.util.{CoordinatorStreamUtil, Logging, ReflectionUtil}
+import org.apache.samza.util.{ConfigUtil, CoordinatorStreamUtil, DiagnosticsUtil, Logging, ReflectionUtil}
 
 import scala.collection.JavaConversions._
 
 /**
- * Creates a stand alone ProcessJob with the specified config.
+ * Creates a ProcessJob with the specified config.
  */
 class ProcessJobFactory extends StreamJobFactory with Logging {
-  def getJob(config: Config): StreamJob = {
+  def getJob(submissionConfig: Config): StreamJob = {
+    var config = submissionConfig
+
+    if (new JobConfig(submissionConfig).getConfigLoaderFactory.isPresent) {
+      val originalConfig = ConfigUtil.loadConfig(submissionConfig)
+
+      // Execute planning
+      val planner = new RemoteJobPlanner(ApplicationDescriptorUtil.getAppDescriptor(ApplicationUtil.fromConfig(originalConfig), originalConfig))
 
 Review comment:
   From the naming of `RemoteJobPlanner`, this seems out of place. I believe `RemoteJobPlanner` was intended for use in `RemoteApplicationRunner`, but theoretically, any `ApplicationRunner` can be used with any `StreamJobFactory`.
   I guess the layers of abstraction are changing with the simplification of the job runners, but maybe we should rename some classes/interfaces and make sure the logic is in the right place as well.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services