You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2020/05/05 00:05:39 UTC
[samza] branch master updated: SAMZA-2518: Update
JobCoordinatorLaunchUtil to fetch launch config from metadata store.
(#1354)
This is an automated email from the ASF dual-hosted git repository.
rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 10915fb SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from metadata store. (#1354)
10915fb is described below
commit 10915fb3cda40ae3733d82f1290bd854af113af6
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Mon May 4 17:04:21 2020 -0700
SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from metadata store. (#1354)
* SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from metadata store.
Design:
https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner
Changes:
1. Add readLaunchConfigFromCoordinatorStream() in CoordinatorStreamUtil
2. Add an extra step in JobCoordinatorLaunchUtil to invoke readLaunchConfigFromCoordinatorStream()
API Changes:
None
Upgrade Instructions:
None
Usage Instructions:
None
Tests:
Unit Tests
* Update to address comments
* Simplify implementation
Co-authored-by: Ke Wu <kw...@linkedin.com>
---
.../clustermanager/JobCoordinatorLaunchUtil.java | 12 +++++---
.../java/org/apache/samza/config/JobConfig.java | 2 +-
.../apache/samza/util/CoordinatorStreamUtil.scala | 32 +++++++++++++++++-----
.../TestJobCoordinatorLaunchUtil.java | 12 +++++---
.../samza/util/TestCoordinatorStreamUtil.scala | 27 +++++++++++++++++-
5 files changed, 68 insertions(+), 17 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index fc1d34e..6915614 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -26,6 +26,7 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.execution.RemoteJobPlanner;
import org.apache.samza.metadatastore.MetadataStore;
@@ -57,14 +58,17 @@ public class JobCoordinatorLaunchUtil {
throw new SamzaException("Only support single remote job is supported.");
}
- Config finalConfig = jobConfigs.get(0);
+ Config fullConfig = jobConfigs.get(0);
+ MetricsRegistryMap metrics = new MetricsRegistryMap();
+ MetadataStore
+ metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(fullConfig), metrics);
+ // Reads extra launch config from metadata store.
+ Config launchConfig = CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(fullConfig, metadataStore);
+ Config finalConfig = new MapConfig(launchConfig, fullConfig);
// This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
DiagnosticsUtil.createDiagnosticsStream(finalConfig);
- MetricsRegistryMap metrics = new MetricsRegistryMap();
- MetadataStore
- metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(finalConfig), metrics);
// MetadataStore will be closed in ClusterBasedJobCoordinator#onShutDown
// initialization of MetadataStore can be moved to ClusterBasedJobCoordinator after we clean up
// ClusterBasedJobCoordinator#createFromMetadataStore
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 93b4b60..dc03644 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -351,7 +351,7 @@ public class JobConfig extends MapConfig {
* @param configParam the config param to determine
* @return true if the config is related to autosizing, false otherwise
*/
- public boolean isAutosizingConfig(String configParam) {
+ public static boolean isAutosizingConfig(String configParam) {
return configParam.startsWith(JOB_AUTOSIZING_CONFIG_PREFIX);
}
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 518639f..bf6aeb3 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -24,10 +24,11 @@ import java.util
import org.apache.samza.SamzaException
import org.apache.samza.config._
-import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
import org.apache.samza.job.JobRunner
+import org.apache.samza.metadatastore.MetadataStore
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemAdmins, SystemFactory, SystemStream}
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
@@ -44,7 +45,7 @@ object CoordinatorStreamUtil extends Logging {
val buildConfigFactory = jobConfig.getCoordinatorStreamFactory
val coordinatorSystemConfig = Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config)
- new MapConfig(coordinatorSystemConfig);
+ new MapConfig(coordinatorSystemConfig)
}
/**
@@ -111,12 +112,29 @@ object CoordinatorStreamUtil extends Logging {
}
/**
+ * Reads and returns launch config persisted in coordinator stream. Only job.auto sizing configs are currently supported.
+ * @param config full job config
+ * @param metadataStore an instance of the instantiated MetadataStore
+ * @return empty config if auto sizing is disabled, otherwise auto sizing related configs.
+ */
+ def readLaunchConfigFromCoordinatorStream(config: Config, metadataStore: MetadataStore): Config = {
+ if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) {
+ new MapConfig()
+ } else {
+ val config = readConfigFromCoordinatorStream(metadataStore)
+ val launchConfig = config.asScala.filterKeys(key => JobConfig.isAutosizingConfig(key)).asJava
+
+ new MapConfig(launchConfig)
+ }
+ }
+
+ /**
* Reads and returns the complete configuration stored in the coordinator stream.
- * @param coordinatorStreamStore an instance of the instantiated {@link CoordinatorStreamStore}.
+ * @param metadataStore an instance of the instantiated {@link CoordinatorStreamStore}.
* @return the configuration read from the coordinator stream.
*/
- def readConfigFromCoordinatorStream(coordinatorStreamStore: CoordinatorStreamStore): Config = {
- val namespaceAwareCoordinatorStreamStore: NamespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE)
+ def readConfigFromCoordinatorStream(metadataStore: MetadataStore): Config = {
+ val namespaceAwareCoordinatorStreamStore: NamespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, SetConfig.TYPE)
val configFromCoordinatorStream: util.Map[String, Array[Byte]] = namespaceAwareCoordinatorStreamStore.all
val configMap: util.Map[String, String] = new util.HashMap[String, String]
for ((key: String, valueAsBytes: Array[Byte]) <- configFromCoordinatorStream.asScala) {
@@ -136,7 +154,7 @@ object CoordinatorStreamUtil extends Logging {
}
def writeConfigToCoordinatorStream(config: Config, resetJobConfig: Boolean = true) {
- debug("config: %s" format (config))
+ debug("config: %s" format config)
val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
val systemAdmins = new SystemAdmins(config)
@@ -168,7 +186,7 @@ object CoordinatorStreamUtil extends Logging {
val jobConfig = new JobConfig(config)
if (jobConfig.getAutosizingEnabled) {
// If autosizing is enabled, we retain auto-sizing related configs
- keysToRemove = keysToRemove.filter(configKey => !jobConfig.isAutosizingConfig(configKey))
+ keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey))
}
info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index 827e312..4bf0aaa 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.samza.application.MockStreamApplication;
+import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
@@ -57,8 +58,10 @@ public class TestJobCoordinatorLaunchUtil {
config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getName());
config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
getClass().getResource("/test.properties").getPath());
- JobConfig originalConfig = new JobConfig(ConfigUtil.loadConfig(new MapConfig(config)));
- JobConfig fullJobConfig = new JobConfig(new MapConfig(originalConfig, Collections.singletonMap("isAfterPlanning", "true")));
+ Config originalConfig = new JobConfig(ConfigUtil.loadConfig(new MapConfig(config)));
+ Config fullConfig = new MapConfig(originalConfig, Collections.singletonMap("isAfterPlanning", "true"));
+ Config autoSizingConfig = new MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, "10"));
+ Config finalConfig = new MapConfig(autoSizingConfig, fullConfig);
RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class);
@@ -66,14 +69,15 @@ public class TestJobCoordinatorLaunchUtil {
PowerMockito.mockStatic(CoordinatorStreamUtil.class);
PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
+ PowerMockito.doReturn(autoSizingConfig).when(CoordinatorStreamUtil.class, "readLaunchConfigFromCoordinatorStream", any(), any());
PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mockJC);
- when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));
+ when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(new JobConfig(fullConfig)));
JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig);
- verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig));
+ verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(finalConfig));
verify(mockJC, times(1)).run();
}
}
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
index f8a9f40..f520c6d 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
@@ -19,6 +19,7 @@
package org.apache.samza.util
import java.util
+import java.util.Collections
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
@@ -27,7 +28,8 @@ import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemStream}
import org.junit.{Assert, Test}
import org.mockito.Matchers.any
import org.mockito.Mockito
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config.{JobConfig, MapConfig}
+import org.apache.samza.metadatastore.MetadataStore
class TestCoordinatorStreamUtil {
@@ -99,4 +101,27 @@ class TestCoordinatorStreamUtil {
CoordinatorStreamUtil.writeConfigToCoordinatorStream(configMap)
}
+
+ @Test
+ def testReadLaunchConfigFromCoordinatorStream() {
+ // Empty config when auto sizing is disabled.
+ Assert.assertEquals(new MapConfig(), CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(new MapConfig(), null))
+
+ val valueSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE)
+ val config = new MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_ENABLED, "true"))
+ val expected = new MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, "20"))
+ val mockMetadataStore = Mockito.mock(classOf[MetadataStore])
+ val configMap = new util.HashMap[String, Array[Byte]]() {
+ put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
+ JobConfig.JOB_ID),
+ valueSerde.toBytes("321"))
+ put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
+ JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT),
+ valueSerde.toBytes("20"))
+ }
+ Mockito.when(mockMetadataStore.all()).thenReturn(configMap)
+
+ // Verify the launch config is expected
+ Assert.assertEquals(expected, CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(config, mockMetadataStore))
+ }
}