You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2020/02/26 21:29:48 UTC

[samza] branch master updated: SAMZA-2471: Simplify CommandLine (#1291)

This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new e52aecd  SAMZA-2471: Simplify CommandLine (#1291)
e52aecd is described below

commit e52aecd770d9c6e046034420758e17b867f00313
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Wed Feb 26 13:29:38 2020 -0800

    SAMZA-2471: Simplify CommandLine (#1291)
---
 bin/setup-int-test.sh                              |  2 +-
 .../stream/CoordinatorStreamWriter.java            |  2 +-
 .../samza/runtime/ApplicationRunnerMain.java       | 29 ++---------------
 .../scala/org/apache/samza/util/CommandLine.scala  | 36 +++++-----------------
 .../samza/runtime/TestApplicationRunnerMain.java   | 24 +++++----------
 .../scala/org/apache/samza/job/TestJobRunner.scala | 18 ++++-------
 6 files changed, 27 insertions(+), 84 deletions(-)

diff --git a/bin/setup-int-test.sh b/bin/setup-int-test.sh
index b33ff27..130be5d 100755
--- a/bin/setup-int-test.sh
+++ b/bin/setup-int-test.sh
@@ -43,7 +43,7 @@ $KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --repli
 # Start the jobs
 for job in checker joiner emitter watcher
 do
-    $SAMZA_DIR/bin/run-job.sh --config-loader-factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory --config-loader-properties path=$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
+    $SAMZA_DIR/bin/run-job.sh --config job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory --config job.config.loader.properties.path=$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
 done
 
 
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index 903f99a..3d837ac 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -103,7 +103,7 @@ public class CoordinatorStreamWriter {
    * Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter
    * and sends control messages.
    * To run the code use the following command:
-   * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh  --config-loader-factory={config--loader-factory} --config-loader-properties={properties needed for config loader to load config} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
+   * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh  --config job.config.loader.factory={config--loader-factory} --config job.config.loader.properties{properties needed for config loader to load config} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
    *
    * @param args input arguments for running the writer. These arguments are:
    *             "config-factory" = The config file factory
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
index 4eff4ec..220b68b 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
@@ -19,14 +19,9 @@
 
 package org.apache.samza.runtime;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.stream.Collectors;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigLoaderFactory;
-import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.util.CommandLine;
 
@@ -38,40 +33,22 @@ import org.apache.samza.util.CommandLine;
 public class ApplicationRunnerMain {
 
   public static class ApplicationRunnerCommandLine extends CommandLine {
-    public OptionSpec<String> operationOpt =
+    OptionSpec<String> operationOpt =
         parser().accepts("operation", "The operation to perform; run, status, kill.")
             .withRequiredArg()
             .ofType(String.class)
             .describedAs("operation=run")
             .defaultsTo("run");
 
-    public ApplicationRunnerOperation getOperation(OptionSet options) {
+    ApplicationRunnerOperation getOperation(OptionSet options) {
       String rawOp = options.valueOf(operationOpt);
       return ApplicationRunnerOperation.fromString(rawOp);
     }
 
     @Override
     public Config loadConfig(OptionSet options) {
-      // Set up the job parameters.
-      String configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt());
-      Map<String, String> configLoaderProperties =
-          options.valuesOf(configLoaderPropertiesOpt())
-          .stream()
-          .collect(Collectors.toMap(
-              kv -> ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + kv.key,
-              kv -> kv.value));
-
-      Map<String, String> configOverrides = options.valuesOf(configOverrideOpt())
-          .stream()
-          .collect(Collectors.toMap(
-              kv -> kv.key,
-              kv -> kv.value));
-
       // ConfigLoader is not supposed to be invoked to load full job config during job submission.
-      return new MapConfig(
-          Collections.singletonMap(JobConfig.CONFIG_LOADER_FACTORY, configLoaderFactoryClassName),
-          configLoaderProperties,
-          configOverrides);
+      return new MapConfig(getConfigOverrides(options));
     }
   }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
index 5ab5e99..cfd2425 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
@@ -19,13 +19,11 @@
 
 package org.apache.samza.util
 
-import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser, OptionSet}
 import joptsimple.util.KeyValuePair
-import org.apache.samza.config.{Config, ConfigLoaderFactory, JobConfig, MapConfig}
-import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
+import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser, OptionSet}
+import org.apache.samza.config.{Config, ConfigLoaderFactory, MapConfig}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 /**
  * Defines a basic set of command-line options for Samza tasks. Tools can use this
@@ -33,18 +31,6 @@ import scala.collection.mutable
  */
 class CommandLine {
   val parser = new OptionParser()
-  val configLoaderFactoryOpt: ArgumentAcceptingOptionSpec[String] =
-    parser.accepts("config-loader-factory", "The config loader factory to use to read full job config file.")
-          .withRequiredArg
-          .ofType(classOf[java.lang.String])
-          .describedAs("com.foo.bar.ClassName")
-          .defaultsTo(classOf[PropertiesConfigLoaderFactory].getName)
-  val configLoaderPropertiesOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
-    parser.accepts("config-loader-properties", "A config loader property in the form key=value. Config loader properties will be passed to " +
-                                               "designated config loader factory to load full job config.")
-          .withRequiredArg
-          .ofType(classOf[KeyValuePair])
-          .describedAs("key=value")
   val configOverrideOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
     parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
           .withRequiredArg
@@ -54,19 +40,13 @@ class CommandLine {
   var configLoaderFactory: ConfigLoaderFactory = _
 
   def loadConfig(options: OptionSet): Config = {
-    // Set up the job parameters.
-    val configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt)
-    val configLoaderProperties = options.valuesOf(configLoaderPropertiesOpt).asScala
-      .map(kv => (ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + kv.key, kv.value))
-      .toMap
-    val configOverrides = options.valuesOf(configOverrideOpt).asScala
+    ConfigUtil.loadConfig(new MapConfig(getConfigOverrides(options)))
+  }
+
+  def getConfigOverrides(options: OptionSet): Config = {
+    new MapConfig(options.valuesOf(configOverrideOpt).asScala
       .map(kv => (kv.key, kv.value))
       .toMap
-    val original = mutable.HashMap[String, String]()
-    original += JobConfig.CONFIG_LOADER_FACTORY -> configLoaderFactoryClassName
-    original ++= configLoaderProperties
-    original ++= configOverrides
-
-    ConfigUtil.loadConfig(new MapConfig(original.asJava))
+      .asJava)
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index ef8dd34..6176c5f 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -41,10 +41,8 @@ public class TestApplicationRunnerMain {
   public void TestRunOperation() {
     assertEquals(0, TestApplicationRunnerInvocationCounts.runCount);
     ApplicationRunnerMain.main(new String[]{
-        "--config-loader-factory",
-        "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        "--config-loader-properties",
-        "path=" + getClass().getResource("/test.properties").getPath(),
+        "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+        "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
         "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
     });
@@ -56,10 +54,8 @@ public class TestApplicationRunnerMain {
   public void TestKillOperation() {
     assertEquals(0, TestApplicationRunnerInvocationCounts.killCount);
     ApplicationRunnerMain.main(new String[]{
-        "--config-loader-factory",
-        "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        "--config-loader-properties",
-        "path=" + getClass().getResource("/test.properties").getPath(),
+        "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+        "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
         "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
         "--operation=kill"
@@ -72,10 +68,8 @@ public class TestApplicationRunnerMain {
   public void TestStatusOperation() {
     assertEquals(0, TestApplicationRunnerInvocationCounts.statusCount);
     ApplicationRunnerMain.main(new String[]{
-        "--config-loader-factory",
-        "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        "--config-loader-properties",
-        "path=" + getClass().getResource("/test.properties").getPath(),
+        "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+        "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
         "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
         "--operation=status"
@@ -88,10 +82,8 @@ public class TestApplicationRunnerMain {
   public void TestLoadConfig() {
     ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine();
     OptionSet options = cmdLine.parser().parse(
-        "--config-loader-factory",
-        "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        "--config-loader-properties",
-        "path=" + getClass().getResource("/test.properties").getPath(),
+        "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+        "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
         "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
         "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()));
 
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index fd3c6ce..4a508f0 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -43,10 +43,8 @@ class TestJobRunner {
 
     assertEquals(0, TestJobRunner.processCount)
     JobRunner.main(Array(
-      "--config-loader-factory",
-      "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-      "--config-loader-properties",
-      "path=" + getClass.getResource("/test.properties").getPath))
+      "--config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+      "--config", "job.config.loader.properties.path=" + getClass.getResource("/test.properties").getPath))
     assertEquals(1, TestJobRunner.processCount)
   }
 
@@ -56,10 +54,8 @@ class TestJobRunner {
 
     assertEquals(0, TestJobRunner.killCount)
     JobRunner.main(Array(
-      "--config-loader-factory",
-      "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-      "--config-loader-properties",
-      "path=" + getClass.getResource("/test.properties").getPath,
+      "--config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+      "--config", "job.config.loader.properties.path=" + getClass.getResource("/test.properties").getPath,
       "--operation=kill"))
     assertEquals(1, TestJobRunner.killCount)
   }
@@ -70,10 +66,8 @@ class TestJobRunner {
 
     assertEquals(0, TestJobRunner.getStatusCount)
     JobRunner.main(Array(
-      "--config-loader-factory",
-      "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-      "--config-loader-properties",
-      "path=" + getClass.getResource("/test.properties").getPath,
+      "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+      "-config", "job.config.loader.properties.path=" + getClass.getResource("/test.properties").getPath,
       "--operation=status"))
     assertEquals(1, TestJobRunner.getStatusCount)
   }