You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2020/09/11 00:24:34 UTC
[samza] branch master updated: SAMZA-2558: Refactor app.runner.class
This is an automated email from the ASF dual-hosted git repository.
nickpan47 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 3fcced3 SAMZA-2558: Refactor app.runner.class
3fcced3 is described below
commit 3fcced3fc2c7b9b344958fe6a1a59ba65aa88d69
Author: Ke Wu <kw...@linkedin.com>
AuthorDate: Thu Sep 10 17:24:14 2020 -0700
SAMZA-2558: Refactor app.runner.class
Issues: app.runner.class config is hard coded across multple places in Samza.
Changes:
1. Move ApplicationRunners.java from samza-api to samza-core module while the package name stays.
2. Add app.runner.class and getAppRunnerClass method to ApplicationConfig.
3. Update occurrence of app.runner.class to ApplicationConfig.APP_RUNNER_CLASS.
API Changes:
Usage of ApplicationRunners need to depend on samza-core module instead of samza-api.
Upgrade Instructions:
Depend on samza-core instead of samza-api module.
Usage Instructions:
N/A
Author: Ke Wu <kw...@linkedin.com>
Reviewers: Yi Pan <ni...@gmail.com>
Closes #1400 from kw2542/SAMZA-2558
---
.../org/apache/samza/config/ApplicationConfig.java | 6 ++++++
.../org/apache/samza/runtime/ApplicationRunners.java | 18 +++++-------------
.../samza/runtime/TestApplicationRunnerMain.java | 10 +++++-----
.../apache/samza/runtime/TestApplicationRunners.java | 3 ++-
.../samza/sql/runner/SamzaSqlApplicationRunner.java | 6 +++---
.../sql/runner/TestSamzaSqlApplicationRunner.java | 7 ++++---
.../controlmessages/EndOfStreamIntegrationTest.java | 3 ++-
.../test/controlmessages/WatermarkIntegrationTest.java | 3 ++-
.../StreamApplicationIntegrationTestHarness.java | 7 ++++---
.../test/processor/TestZkLocalApplicationRunner.java | 2 +-
.../apache/samza/test/startpoint/TestStartpoint.java | 2 +-
.../tools/benchmark/SystemConsumerWithSamzaBench.java | 2 +-
12 files changed, 36 insertions(+), 33 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 7fda7e1..ea2e943 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -59,6 +59,9 @@ public class ApplicationConfig extends MapConfig {
public static final String APP_RUN_ID = "app.run.id";
public static final String APP_MAIN_CLASS = "app.main.class";
public static final String APP_MAIN_ARGS = "app.main.args";
+ public static final String APP_RUNNER_CLASS = "app.runner.class";
+
+ private static final String DEFAULT_APP_RUNNER = "org.apache.samza.runtime.RemoteApplicationRunner";
public ApplicationConfig(Config config) {
super(config);
@@ -108,4 +111,7 @@ public class ApplicationConfig extends MapConfig {
return Optional.ofNullable(get(APP_MAIN_CLASS));
}
+ public String getAppRunnerClass() {
+ return get(APP_RUNNER_CLASS, DEFAULT_APP_RUNNER);
+ }
}
diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
similarity index 84%
rename from samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
rename to samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
index cd1d06b..974c494 100644
--- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/ApplicationRunners.java
@@ -20,6 +20,7 @@ package org.apache.samza.runtime;
import java.lang.reflect.Constructor;
import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
@@ -40,14 +41,7 @@ import org.apache.samza.config.ConfigException;
* }
* }</pre>
*/
-public class ApplicationRunners {
-
- private static final String APP_RUNNER_CFG = "app.runner.class";
- private static final String DEFAULT_APP_RUNNER = "org.apache.samza.runtime.RemoteApplicationRunner";
-
- private ApplicationRunners() {
-
- }
+public final class ApplicationRunners {
/**
* Get the {@link ApplicationRunner} that runs the {@code userApp}
@@ -56,8 +50,8 @@ public class ApplicationRunners {
* @param config the configuration for this application
* @return the {@link ApplicationRunner} object that will run the {@code userApp}
*/
- public static final ApplicationRunner getApplicationRunner(SamzaApplication userApp, Config config) {
- String appRunnerClassName = getAppRunnerClass(config);
+ public static ApplicationRunner getApplicationRunner(SamzaApplication<?> userApp, Config config) {
+ String appRunnerClassName = new ApplicationConfig(config).getAppRunnerClass();
try {
Class<?> runnerClass = Class.forName(appRunnerClassName);
if (!ApplicationRunner.class.isAssignableFrom(runnerClass)) {
@@ -75,8 +69,6 @@ public class ApplicationRunners {
}
}
- private static String getAppRunnerClass(Config config) {
- return config.getOrDefault(APP_RUNNER_CFG, DEFAULT_APP_RUNNER);
+ private ApplicationRunners() {
}
-
}
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index 6176c5f..28656db 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -44,7 +44,7 @@ public class TestApplicationRunnerMain {
"-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
"-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
- "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
+ "-config", String.format("%s=%s", ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()),
});
assertEquals(1, TestApplicationRunnerInvocationCounts.runCount);
@@ -57,7 +57,7 @@ public class TestApplicationRunnerMain {
"-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
"-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
- "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
+ "-config", String.format("%s=%s", ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()),
"--operation=kill"
});
@@ -71,7 +71,7 @@ public class TestApplicationRunnerMain {
"-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
"-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
- "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()),
+ "-config", String.format("%s=%s", ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()),
"--operation=status"
});
@@ -85,7 +85,7 @@ public class TestApplicationRunnerMain {
"-config", "job.config.loader.factory=org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
"-config", "job.config.loader.properties.path=" + getClass().getResource("/test.properties").getPath(),
"-config", String.format("%s=%s", ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName()),
- "-config", String.format("app.runner.class=%s", TestApplicationRunnerInvocationCounts.class.getName()));
+ "-config", String.format("%s=%s", ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()));
Config actual = cmdLine.loadConfig(options);
@@ -93,7 +93,7 @@ public class TestApplicationRunnerMain {
JobConfig.CONFIG_LOADER_FACTORY, "org.apache.samza.config.loaders.PropertiesConfigLoaderFactory",
ConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path", getClass().getResource("/test.properties").getPath(),
ApplicationConfig.APP_CLASS, MockStreamApplication.class.getName(),
- "app.runner.class", TestApplicationRunnerInvocationCounts.class.getName()));
+ ApplicationConfig.APP_RUNNER_CLASS, TestApplicationRunnerInvocationCounts.class.getName()));
assertEquals(expected, actual);
}
diff --git a/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
similarity index 94%
rename from samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
rename to samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
index 139ed69..1f0f284 100644
--- a/samza-api/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunners.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.samza.application.SamzaApplication;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ExternalContext;
@@ -41,7 +42,7 @@ public class TestApplicationRunners {
@Test
public void testGetAppRunner() {
Map<String, String> configMap = new HashMap<>();
- configMap.put("app.runner.class", MockApplicationRunner.class.getName());
+ configMap.put(ApplicationConfig.APP_RUNNER_CLASS, MockApplicationRunner.class.getName());
Config config = new MapConfig(configMap);
StreamApplication app = mock(StreamApplication.class);
ApplicationRunner appRunner = ApplicationRunners.getApplicationRunner(app, config);
diff --git a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
index 23d7be5..b1b9001 100644
--- a/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
+++ b/samza-sql/src/main/java/org/apache/samza/sql/runner/SamzaSqlApplicationRunner.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.Validate;
import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.context.ExternalContext;
@@ -54,7 +55,6 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
private final ApplicationRunner runner;
- public static final String RUNNER_CONFIG = "app.runner.class";
public static final String CFG_FMT_SAMZA_STREAM_SYSTEM = "streams.%s.samza.system";
/**
@@ -110,9 +110,9 @@ public class SamzaSqlApplicationRunner implements ApplicationRunner {
newConfig.putAll(config);
if (localRunner) {
- newConfig.put(RUNNER_CONFIG, LocalApplicationRunner.class.getName());
+ newConfig.put(ApplicationConfig.APP_RUNNER_CLASS, LocalApplicationRunner.class.getName());
} else {
- newConfig.put(RUNNER_CONFIG, RemoteApplicationRunner.class.getName());
+ newConfig.put(ApplicationConfig.APP_RUNNER_CLASS, RemoteApplicationRunner.class.getName());
}
LOG.info("New Samza configs: " + newConfig);
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
index 272ca4a..2dda4fe 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/runner/TestSamzaSqlApplicationRunner.java
@@ -20,6 +20,7 @@
package org.apache.samza.sql.runner;
import java.util.Map;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.runtime.LocalApplicationRunner;
@@ -36,15 +37,15 @@ public class TestSamzaSqlApplicationRunner {
Map<String, String> configs = SamzaSqlTestConfig.fetchStaticConfigsWithFactories(10);
String sql1 = "Insert into testavro.outputTopic(id,long_value) select id, MyTest(id) as long_value from testavro.SIMPLE1";
configs.put(SamzaSqlApplicationConfig.CFG_SQL_STMT, sql1);
- configs.put(SamzaSqlApplicationRunner.RUNNER_CONFIG, SamzaSqlApplicationRunner.class.getName());
+ configs.put(ApplicationConfig.APP_RUNNER_CLASS, SamzaSqlApplicationRunner.class.getName());
MapConfig samzaConfig = new MapConfig(configs);
Config newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(true, samzaConfig);
- Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), LocalApplicationRunner.class.getName());
+ Assert.assertEquals(newConfigs.get(ApplicationConfig.APP_RUNNER_CLASS), LocalApplicationRunner.class.getName());
// Check whether five new configs added.
Assert.assertEquals(newConfigs.size(), configs.size() + 5);
newConfigs = SamzaSqlApplicationRunner.computeSamzaConfigs(false, samzaConfig);
- Assert.assertEquals(newConfigs.get(SamzaSqlApplicationRunner.RUNNER_CONFIG), RemoteApplicationRunner.class.getName());
+ Assert.assertEquals(newConfigs.get(ApplicationConfig.APP_RUNNER_CLASS), RemoteApplicationRunner.class.getName());
// Check whether five new configs added.
Assert.assertEquals(newConfigs.size(), configs.size() + 5);
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index b0630b2..e53f701 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.Random;
import org.apache.samza.application.descriptors.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
@@ -72,7 +73,7 @@ public class EndOfStreamIntegrationTest extends IntegrationTestHarness {
int partitionCount = 4;
Map<String, String> configs = new HashMap<>();
- configs.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+ configs.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
configs.put("systems.test.samza.factory", ArraySystemFactory.class.getName());
configs.put("streams.PageView.samza.system", "test");
configs.put("streams.PageView.source", Base64Serializer.serialize(pageviews));
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 25d31ea..849f64c 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -19,6 +19,7 @@
package org.apache.samza.test.controlmessages;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.TaskConfig;
import scala.collection.JavaConverters;
@@ -123,7 +124,7 @@ public class WatermarkIntegrationTest extends IntegrationTestHarness {
@Test
public void testWatermark() throws Exception {
Map<String, String> configs = new HashMap<>();
- configs.put("app.runner.class", MockLocalApplicationRunner.class.getName());
+ configs.put(ApplicationConfig.APP_RUNNER_CLASS, MockLocalApplicationRunner.class.getName());
configs.put("systems.test.samza.factory", TestSystemFactory.class.getName());
configs.put("streams.PageView.samza.system", "test");
configs.put("streams.PageView.partitionCount", String.valueOf(PARTITION_COUNT));
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 2bcc661..0d1e49a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.samza.application.SamzaApplication;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.config.MapConfig;
@@ -176,9 +177,9 @@ public class StreamApplicationIntegrationTestHarness extends IntegrationTestHarn
String appName,
Map<String, String> overriddenConfigs) {
Map<String, String> configMap = new HashMap<>();
- configMap.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
- configMap.put("app.name", appName);
- configMap.put("app.class", streamApplication.getClass().getCanonicalName());
+ configMap.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
+ configMap.put(ApplicationConfig.APP_NAME, appName);
+ configMap.put(ApplicationConfig.APP_CLASS, streamApplication.getClass().getCanonicalName());
configMap.put("serializers.registry.json.class", "org.apache.samza.serializers.JsonSerdeFactory");
configMap.put("serializers.registry.string.class", "org.apache.samza.serializers.StringSerdeFactory");
configMap.put("systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory");
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index fef4836..4e69372 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -220,7 +220,7 @@ public class TestZkLocalApplicationRunner extends IntegrationTestHarness {
config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY);
config.put(ApplicationConfig.APP_NAME, appName);
config.put(ApplicationConfig.APP_ID, appId);
- config.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+ config.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
config.put(String.format("systems.%s.samza.factory", TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY);
config.put(JobConfig.JOB_NAME, appName);
config.put(JobConfig.JOB_ID, appId);
diff --git a/samza-test/src/test/java/org/apache/samza/test/startpoint/TestStartpoint.java b/samza-test/src/test/java/org/apache/samza/test/startpoint/TestStartpoint.java
index d71028d..876690a 100644
--- a/samza-test/src/test/java/org/apache/samza/test/startpoint/TestStartpoint.java
+++ b/samza-test/src/test/java/org/apache/samza/test/startpoint/TestStartpoint.java
@@ -449,7 +449,7 @@ public class TestStartpoint extends IntegrationTestHarness {
config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY);
config.put(ApplicationConfig.APP_NAME, appName);
config.put(ApplicationConfig.APP_ID, appId);
- config.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+ config.put(ApplicationConfig.APP_RUNNER_CLASS, "org.apache.samza.runtime.LocalApplicationRunner");
config.put(String.format("systems.%s.samza.factory", TEST_SYSTEM), TEST_SYSTEM_FACTORY);
config.put(JobConfig.JOB_NAME, appName);
config.put(JobConfig.JOB_ID, appId);
diff --git a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
index bdb8fce..5b6a274 100644
--- a/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
+++ b/samza-tools/src/main/java/org/apache/samza/tools/benchmark/SystemConsumerWithSamzaBench.java
@@ -63,7 +63,7 @@ public class SystemConsumerWithSamzaBench extends AbstractSamzaBench {
@Override
public void addMoreSystemConfigs(Properties props) {
- props.put("app.runner.class", LocalApplicationRunner.class.getName());
+ props.put(ApplicationConfig.APP_RUNNER_CLASS, LocalApplicationRunner.class.getName());
List<Integer> partitions = IntStream.range(startPartition, endPartition).boxed().collect(Collectors.toList());
props.put(ApplicationConfig.APP_NAME, "SamzaBench");
props.put(JobConfig.PROCESSOR_ID, "1");