You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2019/11/14 22:27:49 UTC

[samza] branch master updated: SAMZA-2381: Refactor JobRunner#run and split into multiple util methods. (#1217)

This is an automated email from the ASF dual-hosted git repository.

lhaiesp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new f6295aa  SAMZA-2381: Refactor JobRunner#run and split into multiple util methods. (#1217)
f6295aa is described below

commit f6295aa1d5552011bbb607fb4dd8789945f7efc6
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Thu Nov 14 14:27:41 2019 -0800

    SAMZA-2381: Refactor JobRunner#run and split into multiple util methods. (#1217)
    
    1. Move coordinator stream related logic to CoordinatorStreamUtil#writeConfigToCoordinatorStream
    2. Move diagnostics stream related logic to DiagnosticsUtil#createDiagnosticsStream
    3. Move job submission related logic to JobRunner#submit()
---
 .../org/apache/samza/util/DiagnosticsUtil.java     | 35 ++++++++++
 .../scala/org/apache/samza/job/JobRunner.scala     | 81 +++++-----------------
 .../apache/samza/util/CoordinatorStreamUtil.scala  | 58 +++++++++++++---
 .../samza/util/TestCoordinatorStreamUtil.scala     | 21 +++++-
 4 files changed, 116 insertions(+), 79 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 2f5d74b..4d2a3bc 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -30,6 +30,7 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.StreamConfig;
 import org.apache.samza.config.SystemConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.diagnostics.DiagnosticsManager;
@@ -42,6 +43,9 @@ import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
 import org.apache.samza.runtime.LocalContainerRunner;
 import org.apache.samza.serializers.JsonSerde;
 import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemFactory;
 import org.apache.samza.system.SystemProducer;
 import org.apache.samza.system.SystemStream;
@@ -52,6 +56,7 @@ import scala.Option;
 
 public class DiagnosticsUtil {
   private static final Logger log = LoggerFactory.getLogger(DiagnosticsUtil.class);
+  private static final String DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id";
 
   // Write a file in the samza.log.dir named {exec-env-container-id}.metadata that contains
   // metadata about the container such as containerId, jobName, jobId, hostname, timestamp, version info, and others.
@@ -148,4 +153,34 @@ public class DiagnosticsUtil {
 
     return diagnosticsManagerReporterPair;
   }
+
+  public static void createDiagnosticsStream(Config config) {
+    if (!new JobConfig(config).getDiagnosticsEnabled()) {
+      return;
+    }
+    // if diagnostics is enabled, create diagnostics stream if it doesnt exist
+
+    SystemAdmins systemAdmins = new SystemAdmins(config);
+    String diagnosticsSystemStreamName = new MetricsConfig(config)
+        .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
+        .orElseThrow(() -> new ConfigException("Missing required config: " +
+            String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
+                MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)));
+
+    SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName);
+    SystemAdmin diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem());
+    StreamSpec diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream(),
+        diagnosticsSystemStream.getSystem(), new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID));
+
+    log.info("Creating diagnostics stream {}", diagnosticsSystemStream.getStream());
+    diagnosticsSysAdmin.start();
+
+    if (diagnosticsSysAdmin.createStream(diagnosticsStreamSpec)) {
+      log.info("Created diagnostics stream {}", diagnosticsSystemStream.getStream());
+    } else {
+      log.info("Diagnostics stream {} already exists", diagnosticsSystemStream.getStream());
+    }
+
+    diagnosticsSysAdmin.stop();
+  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index b9241d1..52b7faa 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -66,79 +66,30 @@ object JobRunner extends Logging {
 class JobRunner(config: Config) extends Logging {
 
   /**
-   * This function submits the samza job.
-   * @param resetJobConfig This flag indicates whether or not to reset the job configurations when submitting the job.
+   * This function persist config in coordinator stream, create diagnostics stream if applicable and
+   * then submits the samza job.
+   * @param resetJobConfig This flag indicates whether or not to reset the job configurations in coordinator stream
+   *                       when submitting the job.
    *                       If this value is set to true, all previously written configs to coordinator stream will be
    *                       deleted, and only the configs in the input config file will have an affect. Otherwise, any
    *                       config that is not deleted will have an affect.
    *                       By default this value is set to true.
    * @return The job submitted
    */
-  def run(resetJobConfig: Boolean = true) = {
-    debug("config: %s" format (config))
-    val jobFactory: StreamJobFactory = getJobFactory
-    val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
-    val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
-    val systemAdmins = new SystemAdmins(config)
-
-    // Create the coordinator stream if it doesn't exist
-    info("Creating coordinator stream")
-    val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
-    val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
-    coordinatorSystemAdmin.start()
-    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
-    coordinatorSystemAdmin.stop()
-
-    if (resetJobConfig) {
-      info("Storing config in coordinator stream.")
-      coordinatorSystemProducer.register(JobRunner.SOURCE)
-      coordinatorSystemProducer.start()
-      coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
-    }
-    info("Loading old config from coordinator stream.")
-    coordinatorSystemConsumer.register()
-    coordinatorSystemConsumer.start()
-    coordinatorSystemConsumer.bootstrap()
-    coordinatorSystemConsumer.stop()
-
-    val oldConfig = coordinatorSystemConsumer.getConfig
-    if (resetJobConfig) {
-      val keysToRemove = oldConfig.keySet.asScala.toSet.diff(config.keySet.asScala)
-      info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
-      keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
-    }
-    coordinatorSystemProducer.stop()
-
-
-    // if diagnostics is enabled, create diagnostics stream if it doesnt exist
-    if (new JobConfig(config).getDiagnosticsEnabled) {
-      val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id"
-      val diagnosticsSystemStreamName = JavaOptionals.toRichOptional(
-        new MetricsConfig(config)
-          .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS))
-        .toOption
-        .getOrElse(throw new ConfigException("Missing required config: " +
-          String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
-            MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)))
-
-      val diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName)
-      val diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem)
-      val diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream,
-        diagnosticsSystemStream.getSystem, new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID))
-
-      info("Creating diagnostics stream %s" format diagnosticsSystemStream.getStream)
-      diagnosticsSysAdmin.start()
-      if (diagnosticsSysAdmin.createStream(diagnosticsStreamSpec)) {
-        info("Created diagnostics stream %s" format diagnosticsSystemStream.getStream)
-      } else {
-        info("Diagnostics stream %s already exists" format diagnosticsSystemStream.getStream)
-      }
-      diagnosticsSysAdmin.stop()
-    }
-
+  def run(resetJobConfig: Boolean = true): StreamJob = {
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, resetJobConfig)
+    DiagnosticsUtil.createDiagnosticsStream(config)
+    submit()
+  }
 
+  /**
+   * This function submits the samza job.
+   *
+   * @return The job submitted
+   */
+  def submit(): StreamJob = {
     // Create the actual job, and submit it.
-    val job = jobFactory.getJob(config)
+    val job = getJobFactory.getJob(config)
 
     job.submit()
 
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index f6636e7..810345e 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -25,9 +25,11 @@ import java.util
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
-import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
-import org.apache.samza.coordinator.stream.messages.SetConfig
-import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemFactory, SystemStream}
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
+import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
+import org.apache.samza.job.JobRunner
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemAdmins, SystemFactory, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 
 import scala.collection.JavaConverters._
@@ -37,13 +39,12 @@ object CoordinatorStreamUtil extends Logging {
     * Given a job's full config object, build a subset config which includes
     * only the job name, job id, and system config for the coordinator stream.
     */
-  def buildCoordinatorStreamConfig(config: Config) = {
+  def buildCoordinatorStreamConfig(config: Config): MapConfig = {
     val jobConfig = new JobConfig(config)
-    val buildConfigFactory = jobConfig.getCoordinatorStreamFactory();
+    val buildConfigFactory = jobConfig.getCoordinatorStreamFactory()
     val coordinatorSystemConfig = Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config)
 
     new MapConfig(coordinatorSystemConfig);
-
   }
 
   /**
@@ -64,10 +65,10 @@ object CoordinatorStreamUtil extends Logging {
 
   /**
     * Get the coordinator system stream from the configuration
-    * @param config
+    * @param config Configuration to get coordinator system stream from.
     * @return
     */
-  def getCoordinatorSystemStream(config: Config) = {
+  def getCoordinatorSystemStream(config: Config): SystemStream = {
     val jobConfig = new JobConfig(config)
     val systemName = jobConfig.getCoordinatorSystemName
     val (jobName, jobId) = getJobNameAndId(jobConfig)
@@ -77,10 +78,10 @@ object CoordinatorStreamUtil extends Logging {
 
   /**
     * Get the coordinator system factory from the configuration
-    * @param config
+    * @param config Configuration to get coordinator system factory from.
     * @return
     */
-  def getCoordinatorSystemFactory(config: Config) = {
+  def getCoordinatorSystemFactory(config: Config): SystemFactory = {
     val systemName = new JobConfig(config).getCoordinatorSystemName
     val systemConfig = new SystemConfig(config)
     val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
@@ -93,7 +94,7 @@ object CoordinatorStreamUtil extends Logging {
     * for the job. The format of the stream name will be:
     * &#95;&#95;samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
     */
-  def getCoordinatorStreamName(jobName: String, jobId: String) = {
+  def getCoordinatorStreamName(jobName: String, jobId: String): String = {
     "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
   }
 
@@ -133,4 +134,39 @@ object CoordinatorStreamUtil extends Logging {
     }
     new MapConfig(configMap)
   }
+
+  def writeConfigToCoordinatorStream(config: Config, resetJobConfig: Boolean = true) {
+    debug("config: %s" format (config))
+    val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+    val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+    val systemAdmins = new SystemAdmins(config)
+
+    // Create the coordinator stream if it doesn't exist
+    info("Creating coordinator stream")
+    val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
+    val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+    coordinatorSystemAdmin.start()
+    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
+    coordinatorSystemAdmin.stop()
+
+    if (resetJobConfig) {
+      info("Storing config in coordinator stream.")
+      coordinatorSystemProducer.register(JobRunner.SOURCE)
+      coordinatorSystemProducer.start()
+      coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
+    }
+    info("Loading old config from coordinator stream.")
+    coordinatorSystemConsumer.register()
+    coordinatorSystemConsumer.start()
+    coordinatorSystemConsumer.bootstrap()
+    coordinatorSystemConsumer.stop()
+
+    val oldConfig = coordinatorSystemConsumer.getConfig
+    if (resetJobConfig) {
+      val keysToRemove = oldConfig.keySet.asScala.toSet.diff(config.keySet.asScala)
+      info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
+      keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
+    }
+    coordinatorSystemProducer.stop()
+  }
 }
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
index dac1fe0..f8a9f40 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
@@ -32,7 +32,7 @@ import org.apache.samza.config.MapConfig
 class TestCoordinatorStreamUtil {
 
   @Test
-  def testCreateCoordinatorStream  {
+  def testCreateCoordinatorStream() {
     val systemStream = Mockito.spy(new SystemStream("testSystem", "testStream"))
     val systemAdmin = Mockito.mock(classOf[SystemAdmin])
 
@@ -42,7 +42,7 @@ class TestCoordinatorStreamUtil {
   }
 
   @Test
-  def testBuildCoordinatorStreamConfig: Unit = {
+  def testBuildCoordinatorStreamConfig() {
     val addConfig = new util.HashMap[String, String]
     addConfig.put("job.name", "test-job-name")
     addConfig.put("job.id", "i001")
@@ -57,7 +57,7 @@ class TestCoordinatorStreamUtil {
   }
 
   @Test
-  def testReadConfigFromCoordinatorStream {
+  def testReadConfigFromCoordinatorStream() {
     val keyForNonBlankVal = "app.id"
     val nonBlankVal = "1"
     val keyForEmptyVal = "task.opt"
@@ -84,4 +84,19 @@ class TestCoordinatorStreamUtil {
     Assert.assertEquals(configFromCoordinatorStream.get(keyForEmptyVal), emptyVal)
     Assert.assertFalse(configFromCoordinatorStream.containsKey(keyForNullVal))
   }
+
+  @Test
+  def testWriteConfigToCoordinatorStream() {
+    val addConfig = new util.HashMap[String, String]
+    addConfig.put("job.name", "test-job-name")
+    addConfig.put("job.id", "i001")
+    addConfig.put("job.coordinator.system", "samzatest")
+    addConfig.put("systems.samzatest.test","test")
+    addConfig.put("test.only","nothing")
+    addConfig.put("systems.samzatest.samza.factory", "org.apache.samza.system.MockSystemFactory")
+    val config = new MapConfig(addConfig)
+    val configMap = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)
+
+    CoordinatorStreamUtil.writeConfigToCoordinatorStream(configMap)
+  }
 }