You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/05/14 23:03:08 UTC

[GitHub] [samza] rmatharu commented on a change in pull request #1001: SAMZA-2168: Remove redundant SystemAdmin creation in ApplicationMaster

rmatharu commented on a change in pull request #1001: SAMZA-2168: Remove redundant SystemAdmin creation in ApplicationMaster
URL: https://github.com/apache/samza/pull/1001#discussion_r284033079
 
 

 ##########
 File path: samza-core/src/main/scala/org/apache/samza/job/local/ProcessJobFactory.scala
 ##########
 @@ -52,54 +53,62 @@ class ProcessJobFactory extends StreamJobFactory with Logging {
 
     val configFromCoordinatorStream: Config = CoordinatorStreamUtil.readConfigFromCoordinatorStream(coordinatorStreamStore)
 
-    val changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
+    val systemAdmins = new SystemAdmins(configFromCoordinatorStream)
 
-    val coordinator = JobModelManager(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, metricsRegistry)
-    val jobModel = coordinator.jobModel
+    try {
+      systemAdmins.start()
+      val changelogStreamManager = new ChangelogStreamManager(new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetChangelogMapping.TYPE))
 
-    val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer]
-    for (containerModel <- jobModel.getContainers.values) {
-      for (taskModel <- containerModel.getTasks.values) {
-        taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
-      }
-    }
 
-    changelogStreamManager.writePartitionMapping(taskPartitionMappings)
+      val coordinator = JobModelManager(configFromCoordinatorStream, changelogStreamManager.readPartitionMapping(), coordinatorStreamStore, metricsRegistry, systemAdmins)
+      val jobModel = coordinator.jobModel
 
-    //create necessary checkpoint and changelog streams
-    val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
-    if (checkpointManager != null) {
-      checkpointManager.createResources()
-    }
-    ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions)
-
-    val containerModel = coordinator.jobModel.getContainers.get(0)
+      val taskPartitionMappings: util.Map[TaskName, Integer] = new util.HashMap[TaskName, Integer]
+      for (containerModel <- jobModel.getContainers.values) {
+        for (taskModel <- containerModel.getTasks.values) {
+          taskPartitionMappings.put(taskModel.getTaskName, taskModel.getChangelogPartition.getPartitionId)
+        }
+      }
 
-    val fwkPath = JobConfig.getFwkPath(config) // see if split deployment is configured
-    info("Process job. using fwkPath = " + fwkPath)
+      changelogStreamManager.writePartitionMapping(taskPartitionMappings)
 
-    val commandBuilder = {
-      config.getCommandClass match {
-        case Some(cmdBuilderClassName) => {
-          // A command class was specified, so we need to use a process job to
-          // execute the command in its own process.
-          Util.getObj(cmdBuilderClassName, classOf[CommandBuilder])
-        }
-        case _ => {
-          info("Defaulting to ShellCommandBuilder")
-          new ShellCommandBuilder
+      //create necessary checkpoint and changelog streams
+      val checkpointManager = new TaskConfigJava(jobModel.getConfig).getCheckpointManager(metricsRegistry)
+      if (checkpointManager != null) {
+        checkpointManager.createResources()
+      }
+      ChangelogStreamManager.createChangelogStreams(jobModel.getConfig, jobModel.maxChangeLogStreamPartitions, systemAdmins)
 
 Review comment:
   We don't seem to have a test for this method and seems to be called from quite a few critical places.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services