You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/08 16:26:22 UTC
[samza] branch master updated: SAMZA-2181: Ensure consistency of
coordinator store creation and initialization (#1017)
This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d931c34 SAMZA-2181: Ensure consistency of coordinator store creation and initialization (#1017)
d931c34 is described below
commit d931c3433347e37b38c8d033c186fde6add040bd
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Wed May 8 09:26:17 2019 -0700
SAMZA-2181: Ensure consistency of coordinator store creation and initialization (#1017)
* SAMZA-2181: Ensure consistency of coordinator store creation and initialization
---
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 36 ++++++++-------------
.../scala/org/apache/samza/job/JobRunner.scala | 8 +----
.../apache/samza/util/CoordinatorStreamUtil.scala | 23 ++++++++++++--
.../samza/util/TestCoordinatorStreamUtil.scala | 37 ++++++++++++++++++++++
4 files changed, 71 insertions(+), 33 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index aa72d1c..e27d615 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -19,10 +19,9 @@
package org.apache.samza.zk;
import com.google.common.annotations.VisibleForTesting;
-
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -45,19 +44,17 @@ import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.coordinator.MetadataResourceUtil;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
import org.apache.samza.coordinator.stream.messages.SetConfig;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.runtime.LocationId;
import org.apache.samza.runtime.LocationIdProvider;
import org.apache.samza.runtime.LocationIdProviderFactory;
import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
@@ -289,28 +286,28 @@ public class ZkJobCoordinator implements JobCoordinator {
* Stores the configuration of the job in the coordinator stream.
*/
private void loadMetadataResources(JobModel jobModel) {
- MetadataStore metadataStore = null;
+ CoordinatorStreamStore coordinatorStreamStore = null;
try {
// Creates the coordinator stream if it does not exists.
createCoordinatorStream();
- MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
- metadataStore = metadataStoreFactory.getMetadataStore(SetConfig.TYPE, config, metrics.getMetricsRegistry());
- metadataStore.init();
+ coordinatorStreamStore = new CoordinatorStreamStore(config, metrics.getMetricsRegistry());
+ coordinatorStreamStore.init();
MetadataResourceUtil metadataResourceUtil =
new MetadataResourceUtil(jobModel, metrics.getMetricsRegistry());
metadataResourceUtil.createResources();
CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+ NamespaceAwareCoordinatorStreamStore configStore =
+ new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE);
for (Map.Entry<String, String> entry : config.entrySet()) {
byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
- String configKey = CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, entry.getKey());
- metadataStore.put(configKey, serializedValue);
+ configStore.put(entry.getKey(), serializedValue);
}
} finally {
- if (metadataStore != null) {
- LOG.info("Stopping the coordinator system producer.");
- metadataStore.close();
+ if (coordinatorStreamStore != null) {
+ LOG.info("Stopping the coordinator stream metadata store.");
+ coordinatorStreamStore.close();
}
}
}
@@ -319,16 +316,9 @@ public class ZkJobCoordinator implements JobCoordinator {
* Creates a coordinator stream kafka topic.
*/
private void createCoordinatorStream() {
- SystemAdmin coordinatorSystemAdmin = null;
SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
- coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
- String streamName = coordinatorSystemStream.getStream();
- StreamSpec coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem());
- if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
- LOG.info("Created coordinator stream: {}.", streamName);
- } else {
- LOG.info("Coordinator stream: {} already exists.", streamName);
- }
+ SystemAdmin coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
+ CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin);
}
/**
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 5f4338c..1e489ed 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -85,14 +85,8 @@ class JobRunner(config: Config) extends Logging {
info("Creating coordinator stream")
val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
- val streamName = coordinatorSystemStream.getStream
- val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
coordinatorSystemAdmin.start()
- if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
- info("Created coordinator stream %s." format streamName)
- } else {
- info("Coordinator stream %s already exists." format streamName)
- }
+ CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
coordinatorSystemAdmin.stop()
if (resetJobConfig) {
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 485a55c..60955ca 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -22,18 +22,19 @@
package org.apache.samza.util
import java.util
+
import org.apache.commons.lang3.StringUtils
import org.apache.samza.SamzaException
-import org.apache.samza.config._
-import org.apache.samza.system.{SystemFactory, SystemStream}
import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config._
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
import org.apache.samza.coordinator.stream.messages.SetConfig
+import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemFactory, SystemStream}
import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
-import scala.collection.immutable.Map
import scala.collection.JavaConverters._
+import scala.collection.immutable.Map
object CoordinatorStreamUtil extends Logging {
/**
@@ -53,6 +54,22 @@ object CoordinatorStreamUtil extends Logging {
}
/**
+ * Creates a coordinator stream.
+ * @param coordinatorSystemStream the {@see SystemStream} that describes the stream to create.
+ * @param coordinatorSystemAdmin the {@see SystemAdmin} used to create the stream.
+ */
+ def createCoordinatorStream(coordinatorSystemStream: SystemStream, coordinatorSystemAdmin: SystemAdmin): Unit = {
+ // TODO: This logic should be part of the final coordinator stream metadata store abstraction. See SAMZA-2182
+ val streamName = coordinatorSystemStream.getStream
+ val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
+ if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
+ info("Created coordinator stream: %s." format streamName)
+ } else {
+ info("Coordinator stream: %s already exists." format streamName)
+ }
+ }
+
+ /**
* Get the coordinator system stream from the configuration
* @param config
* @return
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
new file mode 100644
index 0000000..76b077a
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util
+
+import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemStream}
+import org.junit.Test
+import org.mockito.Matchers.any
+import org.mockito.Mockito
+
+class TestCoordinatorStreamUtil {
+
+ @Test
+ def testCreateCoordinatorStream {
+ val systemStream = Mockito.spy(new SystemStream("testSystem", "testStream"))
+ val systemAdmin = Mockito.mock(classOf[SystemAdmin])
+
+ CoordinatorStreamUtil.createCoordinatorStream(systemStream, systemAdmin)
+ Mockito.verify(systemStream).getStream
+ Mockito.verify(systemAdmin).createStream(any(classOf[StreamSpec]))
+ }
+}