You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ra...@apache.org on 2020/05/05 00:05:39 UTC

[samza] branch master updated: SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from metadata store. (#1354)

This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 10915fb  SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from metadata store. (#1354)
10915fb is described below

commit 10915fb3cda40ae3733d82f1290bd854af113af6
Author: Ke Wu <ke...@icloud.com>
AuthorDate: Mon May 4 17:04:21 2020 -0700

    SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from metadata store. (#1354)
    
    * SAMZA-2518: Update JobCoordinatorLaunchUtil to fetch launch config from metadata store.
    
    Design:
    https://cwiki.apache.org/confluence/display/SAMZA/SEP-23%3A+Simplify+Job+Runner
    
    Changes:
    1. Add readLaunchConfigFromCoordinatorStream() in CoordinatorStreamUtil
    2. Add an extra step in JobCoordinatorLaunchUtil to invoke readLaunchConfigFromCoordinatorStream()
    
    API Changes:
    None
    
    Upgrade Instructions:
    None
    
    Usage Instructions:
    None
    
    Tests:
    Unit Tests
    
    * Update to address comments
    
    * Simplify implementation
    
    Co-authored-by: Ke Wu <kw...@linkedin.com>
---
 .../clustermanager/JobCoordinatorLaunchUtil.java   | 12 +++++---
 .../java/org/apache/samza/config/JobConfig.java    |  2 +-
 .../apache/samza/util/CoordinatorStreamUtil.scala  | 32 +++++++++++++++++-----
 .../TestJobCoordinatorLaunchUtil.java              | 12 +++++---
 .../samza/util/TestCoordinatorStreamUtil.scala     | 27 +++++++++++++++++-
 5 files changed, 68 insertions(+), 17 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
index fc1d34e..6915614 100644
--- a/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
+++ b/samza-core/src/main/java/org/apache/samza/clustermanager/JobCoordinatorLaunchUtil.java
@@ -26,6 +26,7 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.application.descriptors.ApplicationDescriptorUtil;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
 import org.apache.samza.execution.RemoteJobPlanner;
 import org.apache.samza.metadatastore.MetadataStore;
@@ -57,14 +58,17 @@ public class JobCoordinatorLaunchUtil {
       throw new SamzaException("Only support single remote job is supported.");
     }
 
-    Config finalConfig = jobConfigs.get(0);
+    Config fullConfig = jobConfigs.get(0);
+    MetricsRegistryMap metrics = new MetricsRegistryMap();
+    MetadataStore
+        metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(fullConfig), metrics);
+    // Reads extra launch config from metadata store.
+    Config launchConfig = CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(fullConfig, metadataStore);
+    Config finalConfig = new MapConfig(launchConfig, fullConfig);
 
     // This needs to be consistent with RemoteApplicationRunner#run where JobRunner#submit to be called instead of JobRunner#run
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(finalConfig, true);
     DiagnosticsUtil.createDiagnosticsStream(finalConfig);
-    MetricsRegistryMap metrics = new MetricsRegistryMap();
-    MetadataStore
-        metadataStore = new CoordinatorStreamStore(CoordinatorStreamUtil.buildCoordinatorStreamConfig(finalConfig), metrics);
     // MetadataStore will be closed in ClusterBasedJobCoordinator#onShutDown
     // initialization of MetadataStore can be moved to ClusterBasedJobCoordinator after we clean up
     // ClusterBasedJobCoordinator#createFromMetadataStore
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index 93b4b60..dc03644 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -351,7 +351,7 @@ public class JobConfig extends MapConfig {
    * @param configParam the config param to determine
    * @return true if the config is related to autosizing, false otherwise
    */
-  public boolean isAutosizingConfig(String configParam) {
+  public static boolean isAutosizingConfig(String configParam) {
     return configParam.startsWith(JOB_AUTOSIZING_CONFIG_PREFIX);
   }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 518639f..bf6aeb3 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -24,10 +24,11 @@ import java.util
 
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
-import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore
 import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamValueSerde}
 import org.apache.samza.coordinator.stream.messages.{Delete, SetConfig}
 import org.apache.samza.job.JobRunner
+import org.apache.samza.metadatastore.MetadataStore
 import org.apache.samza.metrics.MetricsRegistryMap
 import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemAdmins, SystemFactory, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
@@ -44,7 +45,7 @@ object CoordinatorStreamUtil extends Logging {
     val buildConfigFactory = jobConfig.getCoordinatorStreamFactory
     val coordinatorSystemConfig = Class.forName(buildConfigFactory).newInstance().asInstanceOf[CoordinatorStreamConfigFactory].buildCoordinatorStreamConfig(config)
 
-    new MapConfig(coordinatorSystemConfig);
+    new MapConfig(coordinatorSystemConfig)
   }
 
   /**
@@ -111,12 +112,29 @@ object CoordinatorStreamUtil extends Logging {
   }
 
   /**
+   * Reads and returns launch config persisted in coordinator stream. Only job.auto sizing configs are currently supported.
+   * @param config full job config
+   * @param metadataStore an instance of the instantiated MetadataStore
+   * @return empty config if auto sizing is disabled, otherwise auto sizing related configs.
+   */
+  def readLaunchConfigFromCoordinatorStream(config: Config, metadataStore: MetadataStore): Config = {
+    if (!config.getBoolean(JobConfig.JOB_AUTOSIZING_ENABLED, false)) {
+      new MapConfig()
+    } else {
+      val config = readConfigFromCoordinatorStream(metadataStore)
+      val launchConfig = config.asScala.filterKeys(key => JobConfig.isAutosizingConfig(key)).asJava
+
+      new MapConfig(launchConfig)
+    }
+  }
+
+  /**
     * Reads and returns the complete configuration stored in the coordinator stream.
-    * @param coordinatorStreamStore an instance of the instantiated {@link CoordinatorStreamStore}.
+    * @param metadataStore an instance of the instantiated {@link CoordinatorStreamStore}.
     * @return the configuration read from the coordinator stream.
     */
-  def readConfigFromCoordinatorStream(coordinatorStreamStore: CoordinatorStreamStore): Config = {
-    val namespaceAwareCoordinatorStreamStore: NamespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE)
+  def readConfigFromCoordinatorStream(metadataStore: MetadataStore): Config = {
+    val namespaceAwareCoordinatorStreamStore: NamespaceAwareCoordinatorStreamStore = new NamespaceAwareCoordinatorStreamStore(metadataStore, SetConfig.TYPE)
     val configFromCoordinatorStream: util.Map[String, Array[Byte]] = namespaceAwareCoordinatorStreamStore.all
     val configMap: util.Map[String, String] = new util.HashMap[String, String]
     for ((key: String, valueAsBytes: Array[Byte]) <- configFromCoordinatorStream.asScala) {
@@ -136,7 +154,7 @@ object CoordinatorStreamUtil extends Logging {
   }
 
   def writeConfigToCoordinatorStream(config: Config, resetJobConfig: Boolean = true) {
-    debug("config: %s" format (config))
+    debug("config: %s" format config)
     val coordinatorSystemConsumer = new CoordinatorStreamSystemConsumer(config, new MetricsRegistryMap)
     val coordinatorSystemProducer = new CoordinatorStreamSystemProducer(config, new MetricsRegistryMap)
     val systemAdmins = new SystemAdmins(config)
@@ -168,7 +186,7 @@ object CoordinatorStreamUtil extends Logging {
       val jobConfig = new JobConfig(config)
       if (jobConfig.getAutosizingEnabled) {
         // If autosizing is enabled, we retain auto-sizing related configs
-        keysToRemove = keysToRemove.filter(configKey => !jobConfig.isAutosizingConfig(configKey))
+        keysToRemove = keysToRemove.filter(configKey => !JobConfig.isAutosizingConfig(configKey))
       }
 
       info("Deleting old configs that are no longer defined: %s".format(keysToRemove))
diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
index 827e312..4bf0aaa 100644
--- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
+++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestJobCoordinatorLaunchUtil.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.application.MockStreamApplication;
+import org.apache.samza.config.Config;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.loaders.PropertiesConfigLoaderFactory;
@@ -57,8 +58,10 @@ public class TestJobCoordinatorLaunchUtil {
     config.put(JobConfig.CONFIG_LOADER_FACTORY, PropertiesConfigLoaderFactory.class.getName());
     config.put(PropertiesConfigLoaderFactory.CONFIG_LOADER_PROPERTIES_PREFIX + "path",
         getClass().getResource("/test.properties").getPath());
-    JobConfig originalConfig = new JobConfig(ConfigUtil.loadConfig(new MapConfig(config)));
-    JobConfig fullJobConfig = new JobConfig(new MapConfig(originalConfig, Collections.singletonMap("isAfterPlanning", "true")));
+    Config originalConfig = new JobConfig(ConfigUtil.loadConfig(new MapConfig(config)));
+    Config fullConfig = new MapConfig(originalConfig, Collections.singletonMap("isAfterPlanning", "true"));
+    Config autoSizingConfig = new MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, "10"));
+    Config finalConfig = new MapConfig(autoSizingConfig, fullConfig);
 
     RemoteJobPlanner mockJobPlanner = mock(RemoteJobPlanner.class);
     CoordinatorStreamStore mockCoordinatorStreamStore = mock(CoordinatorStreamStore.class);
@@ -66,14 +69,15 @@ public class TestJobCoordinatorLaunchUtil {
 
     PowerMockito.mockStatic(CoordinatorStreamUtil.class);
     PowerMockito.doReturn(new MapConfig()).when(CoordinatorStreamUtil.class, "buildCoordinatorStreamConfig", any());
+    PowerMockito.doReturn(autoSizingConfig).when(CoordinatorStreamUtil.class, "readLaunchConfigFromCoordinatorStream", any(), any());
     PowerMockito.whenNew(CoordinatorStreamStore.class).withAnyArguments().thenReturn(mockCoordinatorStreamStore);
     PowerMockito.whenNew(RemoteJobPlanner.class).withAnyArguments().thenReturn(mockJobPlanner);
     PowerMockito.whenNew(ClusterBasedJobCoordinator.class).withAnyArguments().thenReturn(mockJC);
-    when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(fullJobConfig));
+    when(mockJobPlanner.prepareJobs()).thenReturn(Collections.singletonList(new JobConfig(fullConfig)));
 
     JobCoordinatorLaunchUtil.run(new MockStreamApplication(), originalConfig);
 
-    verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(fullJobConfig));
+    verifyNew(ClusterBasedJobCoordinator.class).withArguments(any(MetricsRegistryMap.class), eq(mockCoordinatorStreamStore), eq(finalConfig));
     verify(mockJC, times(1)).run();
   }
 }
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
index f8a9f40..f520c6d 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
@@ -19,6 +19,7 @@
 package org.apache.samza.util
 
 import java.util
+import java.util.Collections
 
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
@@ -27,7 +28,8 @@ import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemStream}
 import org.junit.{Assert, Test}
 import org.mockito.Matchers.any
 import org.mockito.Mockito
-import org.apache.samza.config.MapConfig
+import org.apache.samza.config.{JobConfig, MapConfig}
+import org.apache.samza.metadatastore.MetadataStore
 
 class TestCoordinatorStreamUtil {
 
@@ -99,4 +101,27 @@ class TestCoordinatorStreamUtil {
 
     CoordinatorStreamUtil.writeConfigToCoordinatorStream(configMap)
   }
+
+  @Test
+  def testReadLaunchConfigFromCoordinatorStream() {
+    // Empty config when auto sizing is disabled.
+    Assert.assertEquals(new MapConfig(),  CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(new MapConfig(), null))
+
+    val valueSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE)
+    val config = new MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_ENABLED, "true"))
+    val expected = new MapConfig(Collections.singletonMap(JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT, "20"))
+    val mockMetadataStore = Mockito.mock(classOf[MetadataStore])
+    val configMap = new util.HashMap[String, Array[Byte]]() {
+      put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
+        JobConfig.JOB_ID),
+        valueSerde.toBytes("321"))
+      put(CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE,
+        JobConfig.JOB_AUTOSIZING_CONTAINER_COUNT),
+        valueSerde.toBytes("20"))
+    }
+    Mockito.when(mockMetadataStore.all()).thenReturn(configMap)
+
+    // Verify the launch config is expected
+    Assert.assertEquals(expected, CoordinatorStreamUtil.readLaunchConfigFromCoordinatorStream(config, mockMetadataStore))
+  }
 }