You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2020/02/26 21:29:48 UTC
[samza] branch master updated: SAMZA-2471: Simplify CommandLine
(#1291)
This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new e52aecd SAMZA-2471: Simplify CommandLine (#1291)
e52aecd is described below
commit e52aecd770d9c6e046034420758e17b867f00313
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Wed Feb 26 13:29:38 2020 -0800
SAMZA-2471: Simplify CommandLine (#1291)
---
bin/setup-int-test.sh | 2 +-
.../stream/CoordinatorStreamWriter.java | 2 +-
.../samza/runtime/ApplicationRunnerMain.java | 29 ++---------------
.../scala/org/apache/samza/util/CommandLine.scala | 36 +++++-----------------
.../samza/runtime/TestApplicationRunnerMain.java | 24 +++++----------
.../scala/org/apache/samza/job/TestJobRunner.scala | 18 ++++-------
6 files changed, 27 insertions(+), 84 deletions(-)
diff --git a/bin/setup-int-test.sh b/bin/setup-int-test.sh
index b33ff27..130be5d 100755
--- a/bin/setup-int-test.sh
+++ b/bin/setup-int-test.sh
@@ -43,7 +43,7 @@ $KAFKA_DIR/bin/kafka-topics.sh --zookeeper localhost:2181 --partitions 1 --repli
# Start the jobs
for job in checker joiner emitter watcher
do
- $SAMZA_DIR/bin/run-job.sh --config-loader-factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory --config-loader-properties path=$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
+ $SAMZA_DIR/bin/run-job.sh --config job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory --config job.config.loader.properties.path=$SAMZA_DIR/config/join/$job.samza --config job.foo=$job
done
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
index 903f99a..3d837ac 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamWriter.java
@@ -103,7 +103,7 @@ public class CoordinatorStreamWriter {
* Main function for using the CoordinatorStreamWriter. The main function starts a CoordinatorStreamWriter
* and sends control messages.
* To run the code use the following command:
- * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh --config-loader-factory={config--loader-factory} --config-loader-properties={properties needed for config loader to load config} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
+ * {path to samza deployment}/samza/bin/run-coordinator-stream-writer.sh --config job.config.loader.factory={config--loader-factory} --config job.config.loader.properties{properties needed for config loader to load config} --type={type of the message} --key={[optional] key of the message} --value={[optional] value of the message}
*
* @param args input arguments for running the writer. These arguments are:
* "config-factory" = The config file factory
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
index 4eff4ec..220b68b 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
@@ -19,14 +19,9 @@
package org.apache.samza.runtime;
-import java.util.Collections;
-import java.util.Map;
-import java.util.stream.Collectors;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigLoaderFactory;
-import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.util.CommandLine;
@@ -38,40 +33,22 @@ import org.apache.samza.util.CommandLine;
public class ApplicationRunnerMain {
public static class ApplicationRunnerCommandLine extends CommandLine {
- public OptionSpec<String> operationOpt =
+ OptionSpec<String> operationOpt =
parser().accepts("operation", "The operation to perform; run, status, kill.")
.withRequiredArg()
.ofType(String.class)
.describedAs("operation=run")
.defaultsTo("run");
- public ApplicationRunnerOperation getOperation(OptionSet options) {
+ ApplicationRunnerOperation getOperation(OptionSet options) {
String rawOp = options.valueOf(operationOpt);
return ApplicationRunnerOperation.fromString(rawOp);
}
@Override
public Config loadConfig(OptionSet options) {
- // Set up the job parameters.
- String configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt());
- Map<String, String> configLoaderProperties =
- options.valuesOf(configLoaderPropertiesOpt())
- .stream()
- .collect(Collectors.toMap(
- kv -> ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + kv.key,
- kv -> kv.value));
-
- Map<String, String> configOverrides = options.valuesOf(configOverrideOpt())
- .stream()
- .collect(Collectors.toMap(
- kv -> kv.key,
- kv -> kv.value));
-
// ConfigLoader is not supposed to be invoked to load full job config during job submission.
- return new MapConfig(
- Collections.singletonMap(JobConfig.CONFIG_LOADER_FACTORY, configLoaderFactoryClassName),
- configLoaderProperties,
- configOverrides);
+ return new MapConfig(getConfigOverrides(options));
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
index 5ab5e99..cfd2425 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CommandLine.scala
@@ -19,13 +19,11 @@
package org.apache.samza.util
-import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser, OptionSet}
import joptsimple.util.KeyValuePair
-import org.apache.samza.config.{Config, ConfigLoaderFactory, JobConfig, MapConfig}
-import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory
+import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser, OptionSet}
+import org.apache.samza.config.{Config, ConfigLoaderFactory, MapConfig}
import scala.collection.JavaConverters._
-import scala.collection.mutable
/**
* Defines a basic set of command-line options for Samza tasks. Tools can use this
@@ -33,18 +31,6 @@ import scala.collection.mutable
*/
class CommandLine {
val parser = new OptionParser()
- val configLoaderFactoryOpt: ArgumentAcceptingOptionSpec[String] =
- parser.accepts("config-loader-factory", "The config loader factory to use to read full job config file.")
- .withRequiredArg
- .ofType(classOf[java.lang.String])
- .describedAs("com.foo.bar.ClassName")
- .defaultsTo(classOf[PropertiesConfigLoaderFactory].getName)
- val configLoaderPropertiesOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
- parser.accepts("config-loader-properties", "A config loader property in the form key=value. Config loader properties will be passed to " +
- "designated config loader factory to load full job config.")
- .withRequiredArg
- .ofType(classOf[KeyValuePair])
- .describedAs("key=value")
val configOverrideOpt: ArgumentAcceptingOptionSpec[KeyValuePair] =
parser.accepts("config", "A configuration value in the form key=value. Command line properties override any configuration values given.")
.withRequiredArg
@@ -54,19 +40,13 @@ class CommandLine {
var configLoaderFactory: ConfigLoaderFactory = _
def loadConfig(options: OptionSet): Config = {
- // Set up the job parameters.
- val configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt)
- val configLoaderProperties = options.valuesOf(configLoaderPropertiesOpt).asScala
- .map(kv => (ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + kv.key, kv.value))
- .toMap
- val configOverrides = options.valuesOf(configOverrideOpt).asScala
+ ConfigUtil.loadConfig(new MapConfig(getConfigOverrides(options)))
+ }
+
+ def getConfigOverrides(options: OptionSet): Config = {
+ new MapConfig(options.valuesOf(configOverrideOpt).asScala
.map(kv => (kv.key, kv.value))
.toMap
- val original = mutable.HashMap[String, String]()
- original += JobConfig.CONFIG_LOADER_FACTORY -> configLoaderFactoryClassName
- original ++= configLoaderProperties
- original ++= configOverrides
-
- ConfigUtil.loadConfig(new MapConfig(original.asJava))
+ .asJava)
}
}
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index ef8dd34..6176c5f 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -41,10 +41,8 @@ public class TestApplicationRunnerMain {
public void TestRunOperation() {
assertEquals(0, TestApplicationRunnerInvocationCounts.runCount);
ApplicationRunnerMain.main(new String[]{
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass().getResource("/test.properties").getPath(),
+ "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+ "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
});
@@ -56,10 +54,8 @@ public class TestApplicationRunnerMain {
public void TestKillOperation() {
assertEquals(0, TestApplicationRunnerInvocationCounts.killCount);
ApplicationRunnerMain.main(new String[]{
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass().getResource("/test.properties").getPath(),
+ "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+ "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
"--operation=kill"
@@ -72,10 +68,8 @@ public class TestApplicationRunnerMain {
public void TestStatusOperation() {
assertEquals(0, TestApplicationRunnerInvocationCounts.statusCount);
ApplicationRunnerMain.main(new String[]{
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass().getResource("/test.properties").getPath(),
+ "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+ "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
"--operation=status"
@@ -88,10 +82,8 @@ public class TestApplicationRunnerMain {
public void TestLoadConfig() {
ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine();
OptionSet options = cmdLine.parser().parse(
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass().getResource("/test.properties").getPath(),
+ "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+ "-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
"-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()));
diff --git a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
index fd3c6ce..4a508f0 100644
--- a/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
+++ b/samza-core/src/test/scala/org/apache/samza/job/TestJobRunner.scala
@@ -43,10 +43,8 @@ class TestJobRunner {
assertEquals(0, TestJobRunner.processCount)
JobRunner.main(Array(
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass.getResource("/test.properties").getPath))
+ "--config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+ "--config", "job.config.loader.properties.path=" + getClass.getResource("/test.properties").getPath))
assertEquals(1, TestJobRunner.processCount)
}
@@ -56,10 +54,8 @@ class TestJobRunner {
assertEquals(0, TestJobRunner.killCount)
JobRunner.main(Array(
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass.getResource("/test.properties").getPath,
+ "--config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+ "--config", "job.config.loader.properties.path=" + getClass.getResource("/test.properties").getPath,
"--operation=kill"))
assertEquals(1, TestJobRunner.killCount)
}
@@ -70,10 +66,8 @@ class TestJobRunner {
assertEquals(0, TestJobRunner.getStatusCount)
JobRunner.main(Array(
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass.getResource("/test.properties").getPath,
+ "-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
+ "-config", "job.config.loader.properties.path=" + getClass.getResource("/test.properties").getPath,
"--operation=status"))
assertEquals(1, TestJobRunner.getStatusCount)
}