You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/01/22 20:42:32 UTC

samza git commit: SAMZA-2059: Persist configuration in coordinator stream for standalone.

Repository: samza
Updated Branches:
  refs/heads/master e753e3312 -> 4301c2b4f


SAMZA-2059: Persist configuration in coordinator stream for standalone.

Prior to Samza 1.0, users plugged in the properties of an I/O system through a configuration file. Samza employed rewriters in the user-defined order to compute the configuration of a job.

Post Samza 1.0, we introduced new abstractions viz` StreamDescriptor` and `SystemDescriptor` in samza, with the purpose of performing configuration expansion for predefined systems at run-time.

Configuration computed at run-time is not persisted at a centralized storage in samza-standalone. This breaks the functionality of the tools viz checkpoint-tool, coordinator-stream-writer, etc in samza standalone. This patch addresses this problem by storing the configuration in coordinator stream for standalone.

In the follow up PR's:
1. We'll switch from zookeeper to coordinator-stream as JobModel storage layer in standalone
2.  Samza tools(checkpoint-tool) will be migrated to read the configuration from coordinator stream rather than from disk.

Author: Shanthoosh Venkataraman <sp...@usc.edu>
Author: shanthoosh <sv...@linkedin.com>
Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Prateek Maheshwari <pm...@apache.org>

Closes #879 from shanthoosh/standalone-coordinator-stream-for-config and squashes the following commits:

c989a59b [shanthoosh] Merge branch 'master' into standalone-coordinator-stream-for-config
d6290f9e [Shanthoosh Venkataraman] Addressing review comments.
8d0ae13d [Shanthoosh Venkataraman] Fix typo in java doc.
f194a04e [Shanthoosh Venkataraman] SAMZA-2059: Storing configuration in coordinator stream for standalone.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4301c2b4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4301c2b4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4301c2b4

Branch: refs/heads/master
Commit: 4301c2b4f3d922405e4dccb1c65bae819ed36bb0
Parents: e753e33
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Tue Jan 22 12:42:11 2019 -0800
Committer: Shanthoosh Venkataraman <sp...@usc.edu>
Committed: Tue Jan 22 12:42:11 2019 -0800

----------------------------------------------------------------------
 .../stream/CoordinatorStreamValueSerde.java     |  7 +++
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 49 ++++++++++++++++++++
 .../processor/TestZkStreamProcessorBase.java    |  2 +
 .../processor/TestZkLocalApplicationRunner.java | 33 +++++++++++++
 4 files changed, 91 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4301c2b4/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index ddde105..82dcf81 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -24,6 +24,7 @@ import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
 import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
 import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
 import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
 import org.apache.samza.serializers.JsonSerde;
@@ -58,6 +59,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
     } else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
       SetChangelogMapping changelogMapping = new SetChangelogMapping(message);
       return String.valueOf(changelogMapping.getPartition());
+    } else if (type.equalsIgnoreCase(SetConfig.TYPE)) {
+      SetConfig setConfig = new SetConfig(message);
+      return setConfig.getConfigValue();
     } else if (type.equalsIgnoreCase(SetTaskModeMapping.TYPE)) {
       SetTaskModeMapping setTaskModeMapping = new SetTaskModeMapping(message);
       return String.valueOf(setTaskModeMapping.getTaskMode());
@@ -80,6 +84,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
     } else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
       SetChangelogMapping changelogMapping = new SetChangelogMapping(SOURCE, "", Integer.valueOf(value));
       return messageSerde.toBytes(changelogMapping.getMessageMap());
+    } else if (type.equalsIgnoreCase(SetConfig.TYPE)) {
+      SetConfig setConfig = new SetConfig(SOURCE, "", value);
+      return messageSerde.toBytes(setConfig.getMessageMap());
     } else {
       throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/4301c2b4/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 8371070..cedca3c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -44,18 +44,25 @@ import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.runtime.LocationId;
 import org.apache.samza.runtime.LocationIdProvider;
 import org.apache.samza.runtime.LocationIdProviderFactory;
 import org.apache.samza.storage.ChangelogStreamManager;
 import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
 import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CoordinatorStreamUtil;
 import org.apache.samza.util.SystemClock;
 import org.apache.samza.util.Util;
 import org.apache.samza.zk.ZkUtils.ProcessorNode;
@@ -258,6 +265,7 @@ public class ZkJobCoordinator implements JobCoordinator {
 
       // Pass in null Coordinator consumer and producer because ZK doesn't have coordinator streams.
       ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions);
+      storeConfigInCoordinatorStream();
       hasCreatedStreams = true;
     }
 
@@ -281,6 +289,47 @@ public class ZkJobCoordinator implements JobCoordinator {
   }
 
   /**
+   * Stores the configuration of the job in the coordinator stream.
+   */
+  private void storeConfigInCoordinatorStream() {
+    MetadataStore metadataStore = null;
+    try {
+      // Creates the coordinator stream if it does not exists.
+      createCoordinatorStream();
+
+      MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
+      metadataStore = metadataStoreFactory.getMetadataStore(SetConfig.TYPE, config, metrics.getMetricsRegistry());
+      metadataStore.init();
+      CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+      for (Map.Entry<String, String> entry : config.entrySet()) {
+        byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
+        metadataStore.put(entry.getKey(), serializedValue);
+      }
+    } finally {
+      if (metadataStore != null) {
+        LOG.info("Stopping the coordinator system producer.");
+        metadataStore.close();
+      }
+    }
+  }
+
+  /**
+   * Creates a coordinator stream kafka topic.
+   */
+  private void createCoordinatorStream() {
+    SystemAdmin coordinatorSystemAdmin = null;
+    SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+    coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
+    String streamName = coordinatorSystemStream.getStream();
+    StreamSpec coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem());
+    if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
+      LOG.info("Created coordinator stream: {}.", streamName);
+    } else {
+      LOG.info("Coordinator stream: {} already exists.", streamName);
+    }
+  }
+
+  /**
    * Generate new JobModel when becoming a leader or the list of processor changed.
    */
   private JobModel generateNewJobModel(List<ProcessorNode> processorNodes) {

http://git-wip-us.apache.org/repos/asf/samza/blob/4301c2b4/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 3760edb..7e236d1 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -188,6 +188,8 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
     configs.put("app.messageCount", String.valueOf(messageCount));
     configs.put("app.outputTopic", outputTopic);
     configs.put("app.outputSystem", testSystem);
+    configs.put("job.coordinator.system", testSystem);
+    configs.put("job.coordinator.replication.factor", "1");
     configs.put(ZkConfig.ZK_CONNECT, zkConnect());
 
     configs.put("job.systemstreampartition.grouper.factory",

http://git-wip-us.apache.org/repos/asf/samza/blob/4301c2b4/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index e2b458e..298abae 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -51,15 +52,20 @@ import org.apache.samza.config.TaskConfigJava;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.util.Util;
 import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.apache.samza.zk.ZkKeyBuilder;
 import org.apache.samza.zk.ZkUtils;
@@ -191,6 +197,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
   private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
       String appName, String appId) {
+    String coordinatorSystemName = "coordinatorSystem";
     Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
         .put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS)
         .put(TaskConfig.INPUT_STREAMS(), inputTopic)
@@ -211,10 +218,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
         .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
         .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
         .put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true")
+        .put("job.coordinator.system", coordinatorSystemName)
+        .put("job.coordinator.replication.factor", "1")
         .build();
     Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
 
     applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
+    applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(coordinatorSystemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
     return applicationConfig;
   }
 
@@ -700,6 +710,29 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
 
     // Validate that the input partition count is 100 in the new JobModel.
     Assert.assertEquals(100, ssps.size());
+
+    // Validate that configuration is stored in coordinator stream.
+    MapConfig config = getConfigFromCoordinatorStream(applicationConfig1);
+
+    // Execution plan and serialized DAG of a samza job is stored in the config of coordinator stream. Thus, direct equals comparison between
+    // the application configuration and the coordinator config will fail. Iterating through the entire configuration bag and verify that expected
+    // configuration is present in the coordinator configuration.
+    for (Map.Entry<String, String> entry : applicationConfig1.entrySet()) {
+      Assert.assertTrue(config.containsKey(entry.getKey()));
+    }
+  }
+
+  private MapConfig getConfigFromCoordinatorStream(Config config) {
+    MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
+    MetadataStore metadataStore = metadataStoreFactory.getMetadataStore("set-config", config, new MetricsRegistryMap());
+    metadataStore.init();
+    Map<String, String> configMap = new HashMap<>();
+    CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde("set-config");
+    metadataStore.all().forEach((key, value) -> {
+        String deserializedValue = jsonSerde.fromBytes(value);
+        configMap.put(key, deserializedValue);
+      });
+    return new MapConfig(configMap);
   }
 
   private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {