You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/01/22 20:42:32 UTC
samza git commit: SAMZA-2059: Persist configuration in coordinator
stream for standalone.
Repository: samza
Updated Branches:
refs/heads/master e753e3312 -> 4301c2b4f
SAMZA-2059: Persist configuration in coordinator stream for standalone.
Prior to Samza 1.0, users plugged in the properties of an I/O system through a configuration file. Samza employed rewriters in the user-defined order to compute the configuration of a job.
Post Samza 1.0, we introduced new abstractions viz` StreamDescriptor` and `SystemDescriptor` in samza, with the purpose of performing configuration expansion for predefined systems at run-time.
Configuration computed at run-time is not persisted at a centralized storage in samza-standalone. This breaks the functionality of the tools viz checkpoint-tool, coordinator-stream-writer, etc in samza standalone. This patch addresses this problem by storing the configuration in coordinator stream for standalone.
In the follow up PR's:
1. We'll switch from zookeeper to coordinator-stream as JobModel storage layer in standalone
2. Samza tools(checkpoint-tool) will be migrated to read the configuration from coordinator stream rather than from disk.
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Author: shanthoosh <sv...@linkedin.com>
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Prateek Maheshwari <pm...@apache.org>
Closes #879 from shanthoosh/standalone-coordinator-stream-for-config and squashes the following commits:
c989a59b [shanthoosh] Merge branch 'master' into standalone-coordinator-stream-for-config
d6290f9e [Shanthoosh Venkataraman] Addressing review comments.
8d0ae13d [Shanthoosh Venkataraman] Fix typo in java doc.
f194a04e [Shanthoosh Venkataraman] SAMZA-2059: Storing configuration in coordinator stream for standalone.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4301c2b4
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4301c2b4
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4301c2b4
Branch: refs/heads/master
Commit: 4301c2b4f3d922405e4dccb1c65bae819ed36bb0
Parents: e753e33
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Tue Jan 22 12:42:11 2019 -0800
Committer: Shanthoosh Venkataraman <sp...@usc.edu>
Committed: Tue Jan 22 12:42:11 2019 -0800
----------------------------------------------------------------------
.../stream/CoordinatorStreamValueSerde.java | 7 +++
.../org/apache/samza/zk/ZkJobCoordinator.java | 49 ++++++++++++++++++++
.../processor/TestZkStreamProcessorBase.java | 2 +
.../processor/TestZkLocalApplicationRunner.java | 33 +++++++++++++
4 files changed, 91 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4301c2b4/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
index ddde105..82dcf81 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamValueSerde.java
@@ -24,6 +24,7 @@ import org.apache.samza.coordinator.stream.messages.CoordinatorStreamMessage;
import org.apache.samza.coordinator.stream.messages.SetChangelogMapping;
import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping;
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
import org.apache.samza.serializers.JsonSerde;
@@ -58,6 +59,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
} else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
SetChangelogMapping changelogMapping = new SetChangelogMapping(message);
return String.valueOf(changelogMapping.getPartition());
+ } else if (type.equalsIgnoreCase(SetConfig.TYPE)) {
+ SetConfig setConfig = new SetConfig(message);
+ return setConfig.getConfigValue();
} else if (type.equalsIgnoreCase(SetTaskModeMapping.TYPE)) {
SetTaskModeMapping setTaskModeMapping = new SetTaskModeMapping(message);
return String.valueOf(setTaskModeMapping.getTaskMode());
@@ -80,6 +84,9 @@ public class CoordinatorStreamValueSerde implements Serde<String> {
} else if (type.equalsIgnoreCase(SetChangelogMapping.TYPE)) {
SetChangelogMapping changelogMapping = new SetChangelogMapping(SOURCE, "", Integer.valueOf(value));
return messageSerde.toBytes(changelogMapping.getMessageMap());
+ } else if (type.equalsIgnoreCase(SetConfig.TYPE)) {
+ SetConfig setConfig = new SetConfig(SOURCE, "", value);
+ return messageSerde.toBytes(setConfig.getMessageMap());
} else {
throw new SamzaException(String.format("Unknown coordinator stream message type: %s", type));
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4301c2b4/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 8371070..cedca3c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -44,18 +44,25 @@ import org.apache.samza.coordinator.JobCoordinatorListener;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
+import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.runtime.LocationId;
import org.apache.samza.runtime.LocationIdProvider;
import org.apache.samza.runtime.LocationIdProviderFactory;
import org.apache.samza.storage.ChangelogStreamManager;
import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.CoordinatorStreamUtil;
import org.apache.samza.util.SystemClock;
import org.apache.samza.util.Util;
import org.apache.samza.zk.ZkUtils.ProcessorNode;
@@ -258,6 +265,7 @@ public class ZkJobCoordinator implements JobCoordinator {
// Pass in null Coordinator consumer and producer because ZK doesn't have coordinator streams.
ChangelogStreamManager.createChangelogStreams(config, jobModel.maxChangeLogStreamPartitions);
+ storeConfigInCoordinatorStream();
hasCreatedStreams = true;
}
@@ -281,6 +289,47 @@ public class ZkJobCoordinator implements JobCoordinator {
}
/**
+ * Stores the configuration of the job in the coordinator stream.
+ */
+ private void storeConfigInCoordinatorStream() {
+ MetadataStore metadataStore = null;
+ try {
+ // Creates the coordinator stream if it does not exists.
+ createCoordinatorStream();
+
+ MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
+ metadataStore = metadataStoreFactory.getMetadataStore(SetConfig.TYPE, config, metrics.getMetricsRegistry());
+ metadataStore.init();
+ CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+ for (Map.Entry<String, String> entry : config.entrySet()) {
+ byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
+ metadataStore.put(entry.getKey(), serializedValue);
+ }
+ } finally {
+ if (metadataStore != null) {
+ LOG.info("Stopping the coordinator system producer.");
+ metadataStore.close();
+ }
+ }
+ }
+
+ /**
+ * Creates a coordinator stream kafka topic.
+ */
+ private void createCoordinatorStream() {
+ SystemAdmin coordinatorSystemAdmin = null;
+ SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
+ coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
+ String streamName = coordinatorSystemStream.getStream();
+ StreamSpec coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem());
+ if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
+ LOG.info("Created coordinator stream: {}.", streamName);
+ } else {
+ LOG.info("Coordinator stream: {} already exists.", streamName);
+ }
+ }
+
+ /**
* Generate new JobModel when becoming a leader or the list of processor changed.
*/
private JobModel generateNewJobModel(List<ProcessorNode> processorNodes) {
http://git-wip-us.apache.org/repos/asf/samza/blob/4301c2b4/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
index 3760edb..7e236d1 100644
--- a/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
+++ b/samza-test/src/test/java/org/apache/samza/processor/TestZkStreamProcessorBase.java
@@ -188,6 +188,8 @@ public class TestZkStreamProcessorBase extends StandaloneIntegrationTestHarness
configs.put("app.messageCount", String.valueOf(messageCount));
configs.put("app.outputTopic", outputTopic);
configs.put("app.outputSystem", testSystem);
+ configs.put("job.coordinator.system", testSystem);
+ configs.put("job.coordinator.replication.factor", "1");
configs.put(ZkConfig.ZK_CONNECT, zkConnect());
configs.put("job.systemstreampartition.grouper.factory",
http://git-wip-us.apache.org/repos/asf/samza/blob/4301c2b4/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index e2b458e..298abae 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -25,6 +25,7 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -51,15 +52,20 @@ import org.apache.samza.config.TaskConfigJava;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.SamzaException;
import org.apache.samza.container.TaskName;
+import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.test.StandaloneIntegrationTestHarness;
import org.apache.samza.test.StandaloneTestUtils;
import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.util.Util;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkKeyBuilder;
import org.apache.samza.zk.ZkUtils;
@@ -191,6 +197,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
String appName, String appId) {
+ String coordinatorSystemName = "coordinatorSystem";
Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS)
.put(TaskConfig.INPUT_STREAMS(), inputTopic)
@@ -211,10 +218,13 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true")
+ .put("job.coordinator.system", coordinatorSystemName)
+ .put("job.coordinator.replication.factor", "1")
.build();
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
+ applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(coordinatorSystemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
return applicationConfig;
}
@@ -700,6 +710,29 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
// Validate that the input partition count is 100 in the new JobModel.
Assert.assertEquals(100, ssps.size());
+
+ // Validate that configuration is stored in coordinator stream.
+ MapConfig config = getConfigFromCoordinatorStream(applicationConfig1);
+
+ // Execution plan and serialized DAG of a samza job is stored in the config of coordinator stream. Thus, direct equals comparison between
+ // the application configuration and the coordinator config will fail. Iterating through the entire configuration bag and verify that expected
+ // configuration is present in the coordinator configuration.
+ for (Map.Entry<String, String> entry : applicationConfig1.entrySet()) {
+ Assert.assertTrue(config.containsKey(entry.getKey()));
+ }
+ }
+
+ private MapConfig getConfigFromCoordinatorStream(Config config) {
+ MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
+ MetadataStore metadataStore = metadataStoreFactory.getMetadataStore("set-config", config, new MetricsRegistryMap());
+ metadataStore.init();
+ Map<String, String> configMap = new HashMap<>();
+ CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde("set-config");
+ metadataStore.all().forEach((key, value) -> {
+ String deserializedValue = jsonSerde.fromBytes(value);
+ configMap.put(key, deserializedValue);
+ });
+ return new MapConfig(configMap);
}
private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {