You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by lh...@apache.org on 2019/11/14 22:27:49 UTC
[samza] branch master updated: SAMZA-2381: Refactor JobRunner#run
and split into multiple util methods. (#1217)
This is an automated email from the ASF dual-hosted git repository.
lhaiesp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new f6295aa SAMZA-2381: Refactor JobRunner#run and split into multiple util methods. (#1217)
f6295aa is described below
commit f6295aa1d5552011bbb607fb4dd8789945f7efc6
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Thu Nov 14 14:27:41 2019 -0800
SAMZA-2381: Refactor JobRunner#run and split into multiple util methods. (#1217)
1. Move coordinator stream related logic to CoordinatorStreamUtil#writeConfigToCoordinatorStream
2. Move diagnostics stream related logic to DiagnosticsUtil#createDiagnosticsStream
3. Move job submission related logic to JobRunner#submit()
---
.../org/apache/samza/util/DiagnosticsUtil.java | 35 ++++++++++
.../scala/org/apache/samza/job/JobRunner.scala | 81 +++++-----------------
.../apache/samza/util/CoordinatorStreamUtil.scala | 58 +++++++++++++---
.../samza/util/TestCoordinatorStreamUtil.scala | 21 +++++-
4 files changed, 116 insertions(+), 79 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
index 2f5d74b..4d2a3bc 100644
--- a/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/util/DiagnosticsUtil.java
@@ -30,6 +30,7 @@ import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.StorageConfig;
+import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.SystemConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.diagnostics.DiagnosticsManager;
@@ -42,6 +43,9 @@ import org.apache.samza.metrics.reporter.MetricsSnapshotReporter;
import org.apache.samza.runtime.LocalContainerRunner;
import org.apache.samza.serializers.JsonSerde;
import org.apache.samza.serializers.MetricsSnapshotSerdeV2;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStream;
@@ -52,6 +56,7 @@ import scala.Option;
public class DiagnosticsUtil {
private static final Logger log = LoggerFactory.getLogger(DiagnosticsUtil.class);
+ private static final String DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id";
// Write a file in the samza.log.dir named {exec-env-container-id}.metadata that contains
// metadata about the container such as containerId, jobName, jobId, hostname, timestamp, version info, and others.
@@ -148,4 +153,34 @@ public class DiagnosticsUtil {
return diagnosticsManagerReporterPair;
}
+
+ public static void createDiagnosticsStream(Config config) {
+ if (!new JobConfig(config).getDiagnosticsEnabled()) {
+ return;
+ }
+ // if diagnostics is enabled, create diagnostics stream if it doesnt exist
+
+ SystemAdmins systemAdmins = new SystemAdmins(config);
+ String diagnosticsSystemStreamName = new MetricsConfig(config)
+ .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)
+ .orElseThrow(() -> new ConfigException("Missing required config: " +
+ String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
+ MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)));
+
+ SystemStream diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName);
+ SystemAdmin diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem());
+ StreamSpec diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream(),
+ diagnosticsSystemStream.getSystem(), new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID));
+
+ log.info("Creating diagnostics stream {}", diagnosticsSystemStream.getStream());
+ diagnosticsSysAdmin.start();
+
+ if (diagnosticsSysAdmin.createStream(diagnosticsStreamSpec)) {
+ log.info("Created diagnostics stream {}", diagnosticsSystemStream.getStream());
+ } else {
+ log.info("Diagnostics stream {} already exists", diagnosticsSystemStream.getStream());
+ }
+
+ diagnosticsSysAdmin.stop();
+ }
}
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index b9241d1..52b7faa 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -66,79 +66,30 @@ object JobRunner extends Logging {
class JobRunner(config: Config) extends Logging {
/**
- * This function submits the samza job.
- * @param resetJobConfig This flag indicates whether or not to reset the job configurations when submitting the job.
+ * This function persist config in coordinator stream, create diagnostics stream if applicable and
+ * then submits the samza job.
+ * @param resetJobConfig This flag indicates whether or not to reset the job configurations in coordinator stream
+ * when submitting the job.
* If this value is set to true, all previously written configs to coordinator stream will be
* deleted, and only the configs in the input config file will have an affect. Otherwise, any
* config that is not deleted will have an affect.
* By default this value is set to true.
* @return The job submitted
*/
- def run(resetJobConfig: Boolean = true) = {
- debug("config: %s" format (config))
- val jobFactory: StreamJobFactory = getJobFactory
- val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
- val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
- val systemAdmins = new SystemAdmins(config)
-
- // Create the coordinator stream if it doesn't exist
- info("Creating coordinator stream")
- val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
- val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
- coordinatorSystemAdmin.start()
- CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
- coordinatorSystemAdmin.stop()
-
- if (resetJobConfig) {
- info("Storing config in coordinator stream.")
- coordinatorSystemProducer.register(JobRunner.SOURCE)
- coordinatorSystemProducer.start()
- coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
- }
- info("Loading old config from coordinator stream.")
- coordinatorSystemConsumer.register()
- coordinatorSystemConsumer.start()
- coordinatorSystemConsumer.bootstrap()
- coordinatorSystemConsumer.stop()
-
- val oldConfig = coordinatorSystemConsumer.getConfig
- if (resetJobConfig) {
- val keysToRemove = oldConfig.keySet.asScala.toSet.diff(config.keySet.asScala)
- info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
- keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
- }
- coordinatorSystemProducer.stop()
-
-
- // if diagnostics is enabled, create diagnostics stream if it doesnt exist
- if (new JobConfig(config).getDiagnosticsEnabled) {
- val DIAGNOSTICS_STREAM_ID = "samza-diagnostics-stream-id"
- val diagnosticsSystemStreamName = JavaOptionals.toRichOptional(
- new MetricsConfig(config)
- .getMetricsSnapshotReporterStream(MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS))
- .toOption
- .getOrElse(throw new ConfigException("Missing required config: " +
- String.format(MetricsConfig.METRICS_SNAPSHOT_REPORTER_STREAM,
- MetricsConfig.METRICS_SNAPSHOT_REPORTER_NAME_FOR_DIAGNOSTICS)))
-
- val diagnosticsSystemStream = StreamUtil.getSystemStreamFromNames(diagnosticsSystemStreamName)
- val diagnosticsSysAdmin = systemAdmins.getSystemAdmin(diagnosticsSystemStream.getSystem)
- val diagnosticsStreamSpec = new StreamSpec(DIAGNOSTICS_STREAM_ID, diagnosticsSystemStream.getStream,
- diagnosticsSystemStream.getSystem, new StreamConfig(config).getStreamProperties(DIAGNOSTICS_STREAM_ID))
-
- info("Creating diagnostics stream %s" format diagnosticsSystemStream.getStream)
- diagnosticsSysAdmin.start()
- if (diagnosticsSysAdmin.createStream(diagnosticsStreamSpec)) {
- info("Created diagnostics stream %s" format diagnosticsSystemStream.getStream)
- } else {
- info("Diagnostics stream %s already exists" format diagnosticsSystemStream.getStream)
- }
- diagnosticsSysAdmin.stop()
- }
-
+ def run(resetJobConfig: Boolean = true): StreamJob = {
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(config, resetJobConfig)
+ DiagnosticsUtil.createDiagnosticsStream(config)
+ submit()
+ }
+ /**
+ * This function submits the samza job.
+ *
+ * @return The job submitted
+ */
+ def submit(): StreamJob = {
// Create the actual job, and submit it.
- val job = jobFactory.getJob(config)
+ val job = getJobFactory.getJob(config)
job.submit()
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index f6636e7..810345e 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -25,9 +25,11 @@ import java.util
import org.apache.samza.SamzaException
import org.apache.samza.config._
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
-import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
-import org.apache.samza.coordinator.stream.messages.SetConfig
-import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemFactory, SystemStream}
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
+import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
+import org.apache.samza.job.JobRunner
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemAdmins, SystemFactory, SystemStream}
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.JavaConverters._
@@ -37,13 +39,12 @@ object CoordinatorStreamUtil extends Logging {
* Given a job's full config object, build a subset config which includes
* only the job name, job id, and system config for the coordinator stream.
*/
- def buildCoordinatorStreamConfig(config: Config) = {
+ def buildCoordinatorStreamConfig(config: Config): MapConfig = {
val jobConfig = new JobConfig(config)
- val buildConfigFactory = jobConfig.getCoordinatorStreamFactory();
+ val buildConfigFactory = jobConfig.getCoordinatorStreamFactory()
val coordinatorSystemConfig = Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config)
new MapConfig(coordinatorSystemConfig);
-
}
/**
@@ -64,10 +65,10 @@ object CoordinatorStreamUtil extends Logging {
/**
* Get the coordinator system stream from the configuration
- * @param config
+ * @param config Configuration to get coordinator system stream from.
* @return
*/
- def getCoordinatorSystemStream(config: Config) = {
+ def getCoordinatorSystemStream(config: Config): SystemStream = {
val jobConfig = new JobConfig(config)
val systemName = jobConfig.getCoordinatorSystemName
val (jobName, jobId) = getJobNameAndId(jobConfig)
@@ -77,10 +78,10 @@ object CoordinatorStreamUtil extends Logging {
/**
* Get the coordinator system factory from the configuration
- * @param config
+ * @param config Configuration to get coordinator system factory from.
* @return
*/
- def getCoordinatorSystemFactory(config: Config) = {
+ def getCoordinatorSystemFactory(config: Config): SystemFactory = {
val systemName = new JobConfig(config).getCoordinatorSystemName
val systemConfig = new SystemConfig(config)
val systemFactoryClassName = JavaOptionals.toRichOptional(systemConfig.getSystemFactory(systemName)).toOption
@@ -93,7 +94,7 @@ object CoordinatorStreamUtil extends Logging {
* for the job. The format of the stream name will be:
* __samza_coordinator_<JOBNAME>_<JOBID>.
*/
- def getCoordinatorStreamName(jobName: String, jobId: String) = {
+ def getCoordinatorStreamName(jobName: String, jobId: String): String = {
"__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-"))
}
@@ -133,4 +134,39 @@ object CoordinatorStreamUtil extends Logging {
}
new MapConfig(configMap)
}
+
+ def writeConfigToCoordinatorStream(config: Config, resetJobConfig: Boolean = true) {
+ debug("config: %s" format (config))
+ val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
+ val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
+ val systemAdmins = new SystemAdmins(config)
+
+ // Create the coordinator stream if it doesn't exist
+ info("Creating coordinator stream")
+ val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
+ val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
+ coordinatorSystemAdmin.start()
+ CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
+ coordinatorSystemAdmin.stop()
+
+ if (resetJobConfig) {
+ info("Storing config in coordinator stream.")
+ coordinatorSystemProducer.register(JobRunner.SOURCE)
+ coordinatorSystemProducer.start()
+ coordinatorSystemProducer.writeConfig(JobRunner.SOURCE, config)
+ }
+ info("Loading old config from coordinator stream.")
+ coordinatorSystemConsumer.register()
+ coordinatorSystemConsumer.start()
+ coordinatorSystemConsumer.bootstrap()
+ coordinatorSystemConsumer.stop()
+
+ val oldConfig = coordinatorSystemConsumer.getConfig
+ if (resetJobConfig) {
+ val keysToRemove = oldConfig.keySet.asScala.toSet.diff(config.keySet.asScala)
+ info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
+ keysToRemove.foreach(key => { coordinatorSystemProducer.send(new Delete(JobRunner.SOURCE, key, SetConfig.TYPE)) })
+ }
+ coordinatorSystemProducer.stop()
+ }
}
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
index dac1fe0..f8a9f40 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
@@ -32,7 +32,7 @@ import org.apache.samza.config.MapConfig
class TestCoordinatorStreamUtil {
@Test
- def testCreateCoordinatorStream {
+ def testCreateCoordinatorStream() {
val systemStream = Mockito.spy(new SystemStream("testSystem", "testStream"))
val systemAdmin = Mockito.mock(classOf[SystemAdmin])
@@ -42,7 +42,7 @@ class TestCoordinatorStreamUtil {
}
@Test
- def testBuildCoordinatorStreamConfig: Unit = {
+ def testBuildCoordinatorStreamConfig() {
val addConfig = new util.HashMap[String, String]
addConfig.put("job.name", "test-job-name")
addConfig.put("job.id", "i001")
@@ -57,7 +57,7 @@ class TestCoordinatorStreamUtil {
}
@Test
- def testReadConfigFromCoordinatorStream {
+ def testReadConfigFromCoordinatorStream() {
val keyForNonBlankVal = "app.id"
val nonBlankVal = "1"
val keyForEmptyVal = "task.opt"
@@ -84,4 +84,19 @@ class TestCoordinatorStreamUtil {
Assert.assertEquals(configFromCoordinatorStream.get(keyForEmptyVal), emptyVal)
Assert.assertFalse(configFromCoordinatorStream.containsKey(keyForNullVal))
}
+
+ @Test
+ def testWriteConfigToCoordinatorStream() {
+ val addConfig = new util.HashMap[String, String]
+ addConfig.put("job.name", "test-job-name")
+ addConfig.put("job.id", "i001")
+ addConfig.put("job.coordinator.system", "samzatest")
+ addConfig.put("systems.samzatest.test","test")
+ addConfig.put("test.only","nothing")
+ addConfig.put("systems.samzatest.samza.factory", "org.apache.samza.system.MockSystemFactory")
+ val config = new MapConfig(addConfig)
+ val configMap = CoordinatorStreamUtil.buildCoordinatorStreamConfig(config)
+
+ CoordinatorStreamUtil.writeConfigToCoordinatorStream(configMap)
+ }
}