You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/30 23:56:39 UTC

samza git commit: SAMZA-1413: Config for CoordinationUtilsFactory class name.

Repository: samza
Updated Branches:
  refs/heads/master dd07e0742 -> fb39a5142


SAMZA-1413: Config for CoordinationUtilsFactory class name.

Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>

Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fj...@linkedin.com>

Closes #290 from sborya/configUtilsFactoryClassName


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fb39a514
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fb39a514
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fb39a514

Branch: refs/heads/master
Commit: fb39a5142283df4dd2c097063d8d85c92a564392
Parents: dd07e07
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Aug 30 16:56:28 2017 -0700
Committer: navina <na...@apache.org>
Committed: Wed Aug 30 16:56:28 2017 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 14 ++++-
 .../coordinator/AzureCoordinationUtils.java     |  1 -
 .../samza/config/JobCoordinatorConfig.java      | 20 +++----
 .../samza/config/TestJobCoordinatorConfig.java  | 58 ++++++++++++++++++++
 4 files changed, 80 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fb39a514/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index dc1df30..9b4e279 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -429,7 +429,19 @@
                         Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a>
                     </td>
                 </tr>
-
+                <tr>
+                    <td class="property" id="job.coordination.utils.factory">job.coordination.utils.factory</td>
+                    <td class="default">org.apache.samza.zk.ZkCoordinationUtilsFactory</td>
+                    <td class="description">
+                        Class to use to create CoordinationUtils. Currently available values are:
+                        <dl>
+                            <dt><code>org.apache.samza.zk.ZkCoordinationUtilsFactory</code></dt>
+                            <dd>ZooKeeper based coordination utils.</dd>
+                            <dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt>
+                            <dd>Azure based coordination utils.</dd>
+                        These coordination utils are currently used for intermediate stream creation.
+                    </td>
+                </tr>
                 <tr>
                                               <!-- change link to StandAlone design/tutorial doc. SAMZA-1299 -->
                 <th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th>

http://git-wip-us.apache.org/repos/asf/samza/blob/fb39a514/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index 2a42514..b689f3e 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -54,6 +54,5 @@ public class AzureCoordinationUtils implements CoordinationUtils {
 
   @Override
   public void close() {
-    
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/fb39a514/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index a04038a..700a107 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -22,8 +22,6 @@ package org.apache.samza.config;
 import com.google.common.base.Strings;
 import org.apache.samza.SamzaException;
 import org.apache.samza.zk.ZkCoordinationUtilsFactory;
-import org.apache.samza.zk.ZkJobCoordinatorFactory;
-
 
 public class JobCoordinatorConfig extends MapConfig {
   public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
@@ -35,20 +33,20 @@ public class JobCoordinatorConfig extends MapConfig {
   }
 
   public String getJobCoordinationUtilsFactoryClassName() {
-    String jobCoordinatorFactoryClassName = get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "");
-
-    String className = get(JOB_COORDINATION_UTILS_FACTORY, "");
+    String className = get(JOB_COORDINATION_UTILS_FACTORY, DEFAULT_COORDINATION_UTILS_FACTORY);
 
-    if (!Strings.isNullOrEmpty(className)) {
-      return className;
+    if (Strings.isNullOrEmpty(className)) {
+      throw new SamzaException("Empty config for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className);
     }
 
-    // TODO: we will need a better way to package the configs with application runner
-    if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
-      return DEFAULT_COORDINATION_UTILS_FACTORY;
+    try {
+      Class.forName(className);
+    } catch (ClassNotFoundException e) {
+      throw new SamzaException(
+          "Failed to validate config value for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className, e);
     }
 
-    throw new SamzaException("Cannot determine which CoordinationUtilsFactory to load");
+    return className;
   }
 
   public String getJobCoordinatorFactoryClassName() {

http://git-wip-us.apache.org/repos/asf/samza/blob/fb39a514/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
new file mode 100644
index 0000000..2ef92b5
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.samza.SamzaException;
+import org.apache.samza.zk.ZkCoordinationUtilsFactory;
+import org.junit.Test;
+
+
+public class TestJobCoordinatorConfig {
+
+  private final static String NONEXISTING_FACTORY_CLASS = "AnotherFactory";
+  private final static String ANOTHER_FACTORY_CLASS = TestJobCoordinatorConfig.class.getName(); // any valid name
+
+  @Test
+  public void testJobCoordinationUtilsFactoryConfig() {
+
+    Map<String, String> map = new HashMap<>();
+    JobCoordinatorConfig jConfig = new JobCoordinatorConfig(new MapConfig(map));
+
+    // test default value
+    Assert.assertEquals(ZkCoordinationUtilsFactory.class.getName(), jConfig.getJobCoordinationUtilsFactoryClassName());
+
+    map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, ANOTHER_FACTORY_CLASS);
+    jConfig = new JobCoordinatorConfig(new MapConfig(map));
+    Assert.assertEquals(ANOTHER_FACTORY_CLASS, jConfig.getJobCoordinationUtilsFactoryClassName());
+
+    // failure case
+    map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, NONEXISTING_FACTORY_CLASS);
+    jConfig = new JobCoordinatorConfig(new MapConfig(map));
+    try {
+      jConfig.getJobCoordinationUtilsFactoryClassName();
+      Assert.fail("Failed to validate loading of fake class: " + NONEXISTING_FACTORY_CLASS);
+    } catch (SamzaException e) {
+      // expected
+    }
+  }
+}