You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/05/08 16:26:22 UTC

[samza] branch master updated: SAMZA-2181: Ensure consistency of coordinator store creation and initialization (#1017)

This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new d931c34  SAMZA-2181: Ensure consistency of coordinator store creation and initialization (#1017)
d931c34 is described below

commit d931c3433347e37b38c8d033c186fde6add040bd
Author: Daniel Nishimura <dn...@gmail.com>
AuthorDate: Wed May 8 09:26:17 2019 -0700

    SAMZA-2181: Ensure consistency of coordinator store creation and initialization (#1017)
    
    * SAMZA-2181: Ensure consistency of coordinator store creation and initialization
---
 .../java/org/apache/samza/zk/ZkJobCoordinator.java | 36 ++++++++-------------
 .../scala/org/apache/samza/job/JobRunner.scala     |  8 +----
 .../apache/samza/util/CoordinatorStreamUtil.scala  | 23 ++++++++++++--
 .../samza/util/TestCoordinatorStreamUtil.scala     | 37 ++++++++++++++++++++++
 4 files changed, 71 insertions(+), 33 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index aa72d1c..e27d615 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -19,10 +19,9 @@
 package org.apache.samza.zk;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -45,19 +44,17 @@ import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.coordinator.MetadataResourceUtil;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
+import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
 import org.apache.samza.coordinator.stream.messages.SetConfig;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.job.model.TaskModel;
-import org.apache.samza.metadatastore.MetadataStore;
-import org.apache.samza.metadatastore.MetadataStoreFactory;
 import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.runtime.LocationId;
 import org.apache.samza.runtime.LocationIdProvider;
 import org.apache.samza.runtime.LocationIdProviderFactory;
 import org.apache.samza.system.StreamMetadataCache;
-import org.apache.samza.system.StreamSpec;
 import org.apache.samza.system.SystemAdmin;
 import org.apache.samza.system.SystemAdmins;
 import org.apache.samza.system.SystemStream;
@@ -289,28 +286,28 @@ public class ZkJobCoordinator implements JobCoordinator {
    * Stores the configuration of the job in the coordinator stream.
    */
   private void loadMetadataResources(JobModel jobModel) {
-    MetadataStore metadataStore = null;
+    CoordinatorStreamStore coordinatorStreamStore = null;
     try {
       // Creates the coordinator stream if it does not exists.
       createCoordinatorStream();
-      MetadataStoreFactory metadataStoreFactory = Util.getObj(new JobConfig(config).getMetadataStoreFactory(), MetadataStoreFactory.class);
-      metadataStore = metadataStoreFactory.getMetadataStore(SetConfig.TYPE, config, metrics.getMetricsRegistry());
-      metadataStore.init();
+      coordinatorStreamStore = new CoordinatorStreamStore(config, metrics.getMetricsRegistry());
+      coordinatorStreamStore.init();
 
       MetadataResourceUtil metadataResourceUtil =
           new MetadataResourceUtil(jobModel, metrics.getMetricsRegistry());
       metadataResourceUtil.createResources();
 
       CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
+      NamespaceAwareCoordinatorStreamStore configStore =
+          new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE);
       for (Map.Entry<String, String> entry : config.entrySet()) {
         byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
-        String configKey = CoordinatorStreamStore.serializeCoordinatorMessageKeyToJson(SetConfig.TYPE, entry.getKey());
-        metadataStore.put(configKey, serializedValue);
+        configStore.put(entry.getKey(), serializedValue);
       }
     } finally {
-      if (metadataStore != null) {
-        LOG.info("Stopping the coordinator system producer.");
-        metadataStore.close();
+      if (coordinatorStreamStore != null) {
+        LOG.info("Stopping the coordinator stream metadata store.");
+        coordinatorStreamStore.close();
       }
     }
   }
@@ -319,16 +316,9 @@ public class ZkJobCoordinator implements JobCoordinator {
    * Creates a coordinator stream kafka topic.
    */
   private void createCoordinatorStream() {
-    SystemAdmin coordinatorSystemAdmin = null;
     SystemStream coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config);
-    coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
-    String streamName = coordinatorSystemStream.getStream();
-    StreamSpec coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem());
-    if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
-      LOG.info("Created coordinator stream: {}.", streamName);
-    } else {
-      LOG.info("Coordinator stream: {} already exists.", streamName);
-    }
+    SystemAdmin coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem());
+    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin);
   }
 
   /**
diff --git a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
index 5f4338c..1e489ed 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/JobRunner.scala
@@ -85,14 +85,8 @@ class JobRunner(config: Config) extends Logging {
     info("Creating coordinator stream")
     val coordinatorSystemStream = CoordinatorStreamUtil.getCoordinatorSystemStream(config)
     val coordinatorSystemAdmin = systemAdmins.getSystemAdmin(coordinatorSystemStream.getSystem)
-    val streamName = coordinatorSystemStream.getStream
-    val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
     coordinatorSystemAdmin.start()
-    if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
-      info("Created coordinator stream %s." format streamName)
-    } else {
-      info("Coordinator stream %s already exists." format streamName)
-    }
+    CoordinatorStreamUtil.createCoordinatorStream(coordinatorSystemStream, coordinatorSystemAdmin)
     coordinatorSystemAdmin.stop()
 
     if (resetJobConfig) {
diff --git a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 485a55c..60955ca 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -22,18 +22,19 @@
 package org.apache.samza.util
 
 import java.util
+
 import org.apache.commons.lang3.StringUtils
 import org.apache.samza.SamzaException
-import org.apache.samza.config._
-import org.apache.samza.system.{SystemFactory, SystemStream}
 import org.apache.samza.config.JobConfig.Config2Job
+import org.apache.samza.config._
 import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, NamespaceAwareCoordinatorStreamStore}
 import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde
 import org.apache.samza.coordinator.stream.messages.SetConfig
+import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemFactory, SystemStream}
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 
-import scala.collection.immutable.Map
 import scala.collection.JavaConverters._
+import scala.collection.immutable.Map
 
 object CoordinatorStreamUtil extends Logging {
   /**
@@ -53,6 +54,22 @@ object CoordinatorStreamUtil extends Logging {
   }
 
   /**
+    * Creates a coordinator stream.
+    * @param coordinatorSystemStream the {@see SystemStream} that describes the stream to create.
+    * @param coordinatorSystemAdmin the {@see SystemAdmin} used to create the stream.
+    */
+  def createCoordinatorStream(coordinatorSystemStream: SystemStream, coordinatorSystemAdmin: SystemAdmin): Unit = {
+    // TODO: This logic should be part of the final coordinator stream metadata store abstraction. See SAMZA-2182
+    val streamName = coordinatorSystemStream.getStream
+    val coordinatorSpec = StreamSpec.createCoordinatorStreamSpec(streamName, coordinatorSystemStream.getSystem)
+    if (coordinatorSystemAdmin.createStream(coordinatorSpec)) {
+      info("Created coordinator stream: %s." format streamName)
+    } else {
+      info("Coordinator stream: %s already exists." format streamName)
+    }
+  }
+
+  /**
     * Get the coordinator system stream from the configuration
     * @param config
     * @return
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
new file mode 100644
index 0000000..76b077a
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestCoordinatorStreamUtil.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.util
+
+import org.apache.samza.system.{StreamSpec, SystemAdmin, SystemStream}
+import org.junit.Test
+import org.mockito.Matchers.any
+import org.mockito.Mockito
+
+class TestCoordinatorStreamUtil {
+
+  @Test
+  def testCreateCoordinatorStream  {
+    val systemStream = Mockito.spy(new SystemStream("testSystem", "testStream"))
+    val systemAdmin = Mockito.mock(classOf[SystemAdmin])
+
+    CoordinatorStreamUtil.createCoordinatorStream(systemStream, systemAdmin)
+    Mockito.verify(systemStream).getStream
+    Mockito.verify(systemAdmin).createStream(any(classOf[StreamSpec]))
+  }
+}