You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/08/30 23:56:39 UTC
samza git commit: SAMZA-1413: Config for CoordinationUtilsFactory
class name.
Repository: samza
Updated Branches:
refs/heads/master dd07e0742 -> fb39a5142
SAMZA-1413: Config for CoordinationUtilsFactory class name.
Author: Boris Shkolnik <bo...@apache.org>
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fj...@linkedin.com>
Closes #290 from sborya/configUtilsFactoryClassName
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fb39a514
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fb39a514
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fb39a514
Branch: refs/heads/master
Commit: fb39a5142283df4dd2c097063d8d85c92a564392
Parents: dd07e07
Author: Boris Shkolnik <bo...@apache.org>
Authored: Wed Aug 30 16:56:28 2017 -0700
Committer: navina <na...@apache.org>
Committed: Wed Aug 30 16:56:28 2017 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 14 ++++-
.../coordinator/AzureCoordinationUtils.java | 1 -
.../samza/config/JobCoordinatorConfig.java | 20 +++----
.../samza/config/TestJobCoordinatorConfig.java | 58 ++++++++++++++++++++
4 files changed, 80 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fb39a514/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index dc1df30..9b4e279 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -429,7 +429,19 @@
Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a>
</td>
</tr>
-
+ <tr>
+ <td class="property" id="job.coordination.utils.factory">job.coordination.utils.factory</td>
+ <td class="default">org.apache.samza.zk.ZkCoordinationUtilsFactory</td>
+ <td class="description">
+ Class to use to create CoordinationUtils. Currently available values are:
+ <dl>
+ <dt><code>org.apache.samza.zk.ZkCoordinationUtilsFactory</code></dt>
+ <dd>ZooKeeper based coordination utils.</dd>
+ <dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt>
+ <dd>Azure based coordination utils.</dd>
+ These coordination utils are currently used for intermediate stream creation.
+ </td>
+ </tr>
<tr>
<!-- change link to StandAlone design/tutorial doc. SAMZA-1299 -->
<th colspan="3" class="section" id="ZkBasedJobCoordination"><a href="../index.html">Zookeeper-based job configuration</a></th>
http://git-wip-us.apache.org/repos/asf/samza/blob/fb39a514/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index 2a42514..b689f3e 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -54,6 +54,5 @@ public class AzureCoordinationUtils implements CoordinationUtils {
@Override
public void close() {
-
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fb39a514/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index a04038a..700a107 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -22,8 +22,6 @@ package org.apache.samza.config;
import com.google.common.base.Strings;
import org.apache.samza.SamzaException;
import org.apache.samza.zk.ZkCoordinationUtilsFactory;
-import org.apache.samza.zk.ZkJobCoordinatorFactory;
-
public class JobCoordinatorConfig extends MapConfig {
public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
@@ -35,20 +33,20 @@ public class JobCoordinatorConfig extends MapConfig {
}
public String getJobCoordinationUtilsFactoryClassName() {
- String jobCoordinatorFactoryClassName = get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "");
-
- String className = get(JOB_COORDINATION_UTILS_FACTORY, "");
+ String className = get(JOB_COORDINATION_UTILS_FACTORY, DEFAULT_COORDINATION_UTILS_FACTORY);
- if (!Strings.isNullOrEmpty(className)) {
- return className;
+ if (Strings.isNullOrEmpty(className)) {
+ throw new SamzaException("Empty config for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className);
}
- // TODO: we will need a better way to package the configs with application runner
- if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
- return DEFAULT_COORDINATION_UTILS_FACTORY;
+ try {
+ Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new SamzaException(
+ "Failed to validate config value for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className, e);
}
- throw new SamzaException("Cannot determine which CoordinationUtilsFactory to load");
+ return className;
}
public String getJobCoordinatorFactoryClassName() {
http://git-wip-us.apache.org/repos/asf/samza/blob/fb39a514/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
new file mode 100644
index 0000000..2ef92b5
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import junit.framework.Assert;
+import org.apache.samza.SamzaException;
+import org.apache.samza.zk.ZkCoordinationUtilsFactory;
+import org.junit.Test;
+
+
+public class TestJobCoordinatorConfig {
+
+ private final static String NONEXISTING_FACTORY_CLASS = "AnotherFactory";
+ private final static String ANOTHER_FACTORY_CLASS = TestJobCoordinatorConfig.class.getName(); // any valid name
+
+ @Test
+ public void testJobCoordinationUtilsFactoryConfig() {
+
+ Map<String, String> map = new HashMap<>();
+ JobCoordinatorConfig jConfig = new JobCoordinatorConfig(new MapConfig(map));
+
+ // test default value
+ Assert.assertEquals(ZkCoordinationUtilsFactory.class.getName(), jConfig.getJobCoordinationUtilsFactoryClassName());
+
+ map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, ANOTHER_FACTORY_CLASS);
+ jConfig = new JobCoordinatorConfig(new MapConfig(map));
+ Assert.assertEquals(ANOTHER_FACTORY_CLASS, jConfig.getJobCoordinationUtilsFactoryClassName());
+
+ // failure case
+ map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, NONEXISTING_FACTORY_CLASS);
+ jConfig = new JobCoordinatorConfig(new MapConfig(map));
+ try {
+ jConfig.getJobCoordinationUtilsFactoryClassName();
+ Assert.fail("Failed to validate loading of fake class: " + NONEXISTING_FACTORY_CLASS);
+ } catch (SamzaException e) {
+ // expected
+ }
+ }
+}