You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/02/07 23:58:26 UTC
[samza] branch 1.4.0 updated: Revert "SAMZA-2441: Update
ApplicationRunnerMain#ApplicationRunnerCommandLine not to load local file
(#1265)" (#1268)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch 1.4.0
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.4.0 by this push:
new bbd5429 Revert "SAMZA-2441: Update ApplicationRunnerMain#ApplicationRunnerCommandLine not to load local file (#1265)" (#1268)
bbd5429 is described below
commit bbd5429eb14821867dcddc2775a8787c3ef31b81
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Fri Feb 7 15:58:17 2020 -0800
Revert "SAMZA-2441: Update ApplicationRunnerMain#ApplicationRunnerCommandLine not to load local file (#1265)" (#1268)
This reverts commit 5a58877897fae4a0e819c70ee48ee2d7edcb01a2.
Reverting this change for the 1.4.0 branch since it is backwards
incompatible and we don't want to release this for 1.4.
---
.../samza/runtime/ApplicationRunnerMain.java | 32 +-------------------
.../scala/org/apache/samza/job/JobRunner.scala | 24 +++++----------
.../samza/runtime/TestApplicationRunnerMain.java | 35 +++-------------------
3 files changed, 13 insertions(+), 78 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
index 4eff4ec..c1aa0a1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
@@ -19,15 +19,9 @@
package org.apache.samza.runtime;
-import java.util.Collections;
-import java.util.Map;
-import java.util.stream.Collectors;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigLoaderFactory;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.util.CommandLine;
@@ -49,33 +43,9 @@ public class ApplicationRunnerMain {
String rawOp = options.valueOf(operationOpt);
return ApplicationRunnerOperation.fromString(rawOp);
}
-
- @Override
- public Config loadConfig(OptionSet options) {
- // Set up the job parameters.
- String configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt());
- Map<String, String> configLoaderProperties =
- options.valuesOf(configLoaderPropertiesOpt())
- .stream()
- .collect(Collectors.toMap(
- kv -> ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + kv.key,
- kv -> kv.value));
-
- Map<String, String> configOverrides = options.valuesOf(configOverrideOpt())
- .stream()
- .collect(Collectors.toMap(
- kv -> kv.key,
- kv -> kv.value));
-
- // ConfigLoader is not supposed to be invoked to load full job config during job submission.
- return new MapConfig(
- Collections.singletonMap(JobConfig.CONFIG_LOADER_FACTORY, configLoaderFactoryClassName),
- configLoaderProperties,
- configOverrides);
- }
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerCommandLine();
OptionSet options = cmdLine.parser().parse(args);
Config orgConfig = cmdLine.loadConfig(options);
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index b76ec95..52b7faa 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,33 +20,25 @@
package org.apache.samza.job
-import joptsimple.{OptionSet, OptionSpec}
import org.apache.samza.SamzaException
import org.apache.samza.config._
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer}
+import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine
import org.apache.samza.runtime.ApplicationRunnerOperation
+import org.apache.samza.system.{StreamSpec, SystemAdmins}
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import org.apache.samza.util._
+import scala.collection.JavaConverters._
+
object JobRunner extends Logging {
val SOURCE = "job-runner"
- class JobRunnerCommandLine extends CommandLine {
- var operationOpt: OptionSpec[String] =
- parser.accepts("operation", "The operation to perform; run, status, kill.")
- .withRequiredArg
- .ofType(classOf[String])
- .describedAs("operation=run")
- .defaultsTo("run")
-
- def getOperation(options: OptionSet): ApplicationRunnerOperation = {
- val rawOp = options.valueOf(operationOpt)
- ApplicationRunnerOperation.fromString(rawOp)
- }
- }
-
def main(args: Array[String]) {
- val cmdline = new JobRunnerCommandLine
+ val cmdline = new ApplicationRunnerCommandLine
val options = cmdline.parser.parse(args: _*)
val config = cmdline.loadConfig(options)
val operation = cmdline.getOperation(options)
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index ef8dd34..2b13409 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -18,16 +18,11 @@
*/
package org.apache.samza.runtime;
-import com.google.common.collect.ImmutableMap;
import java.time.Duration;
-import joptsimple.OptionSet;
-import org.apache.samza.application.MockStreamApplication;
import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.MockStreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigLoaderFactory;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ExternalContext;
import org.apache.samza.job.ApplicationStatus;
import org.junit.Test;
@@ -38,7 +33,7 @@ import static org.junit.Assert.assertEquals;
public class TestApplicationRunnerMain {
@Test
- public void TestRunOperation() {
+ public void TestRunOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.runCount);
ApplicationRunnerMain.main(new String[]{
"--config-loader-factory",
@@ -53,7 +48,7 @@ public class TestApplicationRunnerMain {
}
@Test
- public void TestKillOperation() {
+ public void TestKillOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.killCount);
ApplicationRunnerMain.main(new String[]{
"--config-loader-factory",
@@ -69,7 +64,7 @@ public class TestApplicationRunnerMain {
}
@Test
- public void TestStatusOperation() {
+ public void TestStatusOperation() throws Exception {
assertEquals(0, TestApplicationRunnerInvocationCounts.statusCount);
ApplicationRunnerMain.main(new String[]{
"--config-loader-factory",
@@ -84,28 +79,6 @@ public class TestApplicationRunnerMain {
assertEquals(1, TestApplicationRunnerInvocationCounts.statusCount);
}
- @Test
- public void TestLoadConfig() {
- ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine();
- OptionSet options = cmdLine.parser().parse(
- "--config-loader-factory",
- "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- "--config-loader-properties",
- "path=" + getClass().getResource("/test.properties").getPath(),
- "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
- "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()));
-
- Config actual = cmdLine.loadConfig(options);
-
- Config expected = new MapConfig(ImmutableMap.of(
- JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
- ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", getClass().getResource("/test.properties").getPath(),
- ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName(),
- "app.runner.class", TestApplicationRunnerInvocationCounts.class.getName()));
-
- assertEquals(expected, actual);
- }
-
public static class TestApplicationRunnerInvocationCounts implements ApplicationRunner {
protected static int runCount = 0;
protected static int killCount = 0;