You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/02/07 23:58:26 UTC

[samza] branch 1.4.0 updated: Revert "SAMZA-2441: Update ApplicationRunnerMain#ApplicationRunnerCommandLine not to load local file (#1265)" (#1268)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch 1.4.0
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/1.4.0 by this push:
     new bbd5429  Revert "SAMZA-2441: Update ApplicationRunnerMain#ApplicationRunnerCommandLine not to load local file (#1265)" (#1268)
bbd5429 is described below

commit bbd5429eb14821867dcddc2775a8787c3ef31b81
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Fri Feb 7 15:58:17 2020 -0800

    Revert "SAMZA-2441: Update ApplicationRunnerMain#ApplicationRunnerCommandLine not to load local file (#1265)" (#1268)
    
    This reverts commit 5a58877897fae4a0e819c70ee48ee2d7edcb01a2.
    
    Reverting this change for the 1.4.0 branch since it is backwards
    incompatible and we don't want to release this for 1.4.
---
 .../samza/runtime/ApplicationRunnerMain.java       | 32 +-------------------
 .../scala/org/apache/samza/job/JobRunner.scala     | 24 +++++----------
 .../samza/runtime/TestApplicationRunnerMain.java   | 35 +++-------------------
 3 files changed, 13 insertions(+), 78 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
index 4eff4ec..c1aa0a1 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunnerMain.java
@@ -19,15 +19,9 @@
 
 package org.apache.samza.runtime;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.stream.Collectors;
 import joptsimple.OptionSet;
 import joptsimple.OptionSpec;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigLoaderFactory;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.util.CommandLine;
 
 
@@ -49,33 +43,9 @@ public class ApplicationRunnerMain {
       String rawOp = options.valueOf(operationOpt);
       return ApplicationRunnerOperation.fromString(rawOp);
     }
-
-    @Override
-    public Config loadConfig(OptionSet options) {
-      // Set up the job parameters.
-      String configLoaderFactoryClassName = options.valueOf(configLoaderFactoryOpt());
-      Map<String, String> configLoaderProperties =
-          options.valuesOf(configLoaderPropertiesOpt())
-          .stream()
-          .collect(Collectors.toMap(
-              kv -> ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + kv.key,
-              kv -> kv.value));
-
-      Map<String, String> configOverrides = options.valuesOf(configOverrideOpt())
-          .stream()
-          .collect(Collectors.toMap(
-              kv -> kv.key,
-              kv -> kv.value));
-
-      // ConfigLoader is not supposed to be invoked to load full job config during job submission.
-      return new MapConfig(
-          Collections.singletonMap(JobConfig.CONFIG_LOADER_FACTORY, configLoaderFactoryClassName),
-          configLoaderProperties,
-          configOverrides);
-    }
   }
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws Exception {
     ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerCommandLine();
     OptionSet options = cmdLine.parser().parse(args);
     Config orgConfig = cmdLine.loadConfig(options);
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index b76ec95..52b7faa 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -20,33 +20,25 @@
 package org.apache.samza.job
 
 
-import joptsimple.{OptionSet, OptionSpec}
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
+import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer}
+import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
+import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.runtime.ApplicationRunnerMain.ApplicationRunnerCommandLine
 import org.apache.samza.runtime.ApplicationRunnerOperation
+import org.apache.samza.system.{StreamSpec, SystemAdmins}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util._
 
+import scala.collection.JavaConverters._
+
 
 object JobRunner extends Logging {
   val SOURCE = "job-runner"
 
-  class JobRunnerCommandLine extends CommandLine {
-    var operationOpt: OptionSpec[String] =
-      parser.accepts("operation", "The operation to perform; run, status, kill.")
-        .withRequiredArg
-        .ofType(classOf[String])
-        .describedAs("operation=run")
-        .defaultsTo("run")
-
-    def getOperation(options: OptionSet): ApplicationRunnerOperation = {
-      val rawOp = options.valueOf(operationOpt)
-      ApplicationRunnerOperation.fromString(rawOp)
-    }
-  }
-
   def main(args: Array[String]) {
-    val cmdline = new JobRunnerCommandLine
+    val cmdline = new ApplicationRunnerCommandLine
     val options = cmdline.parser.parse(args: _*)
     val config = cmdline.loadConfig(options)
     val operation = cmdline.getOperation(options)
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index ef8dd34..2b13409 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -18,16 +18,11 @@
  */
 package org.apache.samza.runtime;
 
-import com.google.common.collect.ImmutableMap;
 import java.time.Duration;
-import joptsimple.OptionSet;
-import org.apache.samza.application.MockStreamApplication;
 import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.application.MockStreamApplication;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.ConfigLoaderFactory;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ExternalContext;
 import org.apache.samza.job.ApplicationStatus;
 import org.junit.Test;
@@ -38,7 +33,7 @@ import static org.junit.Assert.assertEquals;
 public class TestApplicationRunnerMain {
 
   @Test
-  public void TestRunOperation() {
+  public void TestRunOperation() throws Exception {
     assertEquals(0, TestApplicationRunnerInvocationCounts.runCount);
     ApplicationRunnerMain.main(new String[]{
         "--config-loader-factory",
@@ -53,7 +48,7 @@ public class TestApplicationRunnerMain {
   }
 
   @Test
-  public void TestKillOperation() {
+  public void TestKillOperation() throws Exception {
     assertEquals(0, TestApplicationRunnerInvocationCounts.killCount);
     ApplicationRunnerMain.main(new String[]{
         "--config-loader-factory",
@@ -69,7 +64,7 @@ public class TestApplicationRunnerMain {
   }
 
   @Test
-  public void TestStatusOperation() {
+  public void TestStatusOperation() throws Exception {
     assertEquals(0, TestApplicationRunnerInvocationCounts.statusCount);
     ApplicationRunnerMain.main(new String[]{
         "--config-loader-factory",
@@ -84,28 +79,6 @@ public class TestApplicationRunnerMain {
     assertEquals(1, TestApplicationRunnerInvocationCounts.statusCount);
   }
 
-  @Test
-  public void TestLoadConfig() {
-    ApplicationRunnerMain.ApplicationRunnerCommandLine cmdLine = new ApplicationRunnerMain.ApplicationRunnerCommandLine();
-    OptionSet options = cmdLine.parser().parse(
-        "--config-loader-factory",
-        "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        "--config-loader-properties",
-        "path=" + getClass().getResource("/test.properties").getPath(),
-        "-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
-        "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()));
-
-    Config actual = cmdLine.loadConfig(options);
-
-    Config expected = new MapConfig(ImmutableMap.of(
-        JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
-        ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", getClass().getResource("/test.properties").getPath(),
-        ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName(),
-        "app.runner.class", TestApplicationRunnerInvocationCounts.class.getName()));
-
-    assertEquals(expected, actual);
-  }
-
   public static class TestApplicationRunnerInvocationCounts implements ApplicationRunner {
     protected static int runCount = 0;
     protected static int killCount = 0;