You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/05 21:32:13 UTC

samza git commit: SAMZA-1926: Fix standalone configurations in configuration table.

Repository: samza
Updated Branches:
  refs/heads/master 4baaddbbb -> 531b35e9f


SAMZA-1926: Fix standalone configurations in configuration table.

Changes:
* Fix the default value of debounce time configuration.
* Remove the `coordination.utils.factory` configuration from the table(Infer that based upon job.coordinator.factory configuration).  Remove the definition of `coordination.utils.factory` from the configuration in unit tests.
* Default the configuration `job.coordinator.factory` to `ZkJobCoordinatorFactory` if it is not defined by the user.

Author: Shanthoosh Venkataraman <sp...@usc.edu>

Reviewers: Jagadish<ja...@apache.org>, Prateek M<pm...@linkedin.com>

Closes #675 from shanthoosh/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/531b35e9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/531b35e9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/531b35e9

Branch: refs/heads/master
Commit: 531b35e9f0f2a26933bb445f2b5a1301580093de
Parents: 4baaddb
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Fri Oct 5 14:30:42 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Oct 5 14:30:42 2018 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 22 ++------
 .../samza/config/JobCoordinatorConfig.java      | 34 ++++++++----
 .../samza/config/TestJobCoordinatorConfig.java  | 58 --------------------
 .../samza/sql/testutil/SamzaSqlTestConfig.java  |  1 -
 .../apache/samza/test/framework/TestRunner.java |  3 -
 .../EndOfStreamIntegrationTest.java             |  2 -
 .../WatermarkIntegrationTest.java               |  2 -
 .../samza/test/framework/SchedulingTest.java    |  1 -
 .../operator/TestRepartitionJoinWindowApp.java  |  3 -
 .../test/operator/TestRepartitionWindowApp.java |  1 -
 .../apache/samza/test/table/TestLocalTable.java |  2 -
 11 files changed, 27 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 26b4661..35ddcab 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -448,9 +448,11 @@
                 </tr>
                  <tr>
                     <td class="property" id="job.coordinator.factory">job.coordinator.factory</td>
-                    <td class="default"></td>
+                    <td class="default">org.apache.samza.zk.ZkJobCoordinatorFactory</td>
                     <td class="description">
-			Class to use for job coordination. Currently available values are:
+                        The fully-qualified name of the Java class which determines the factory class which will build the JobCoordinator.
+                        The user can specify a custom implementation of the JobCoordinatorFactory where a custom logic is implemented for distributed coordination of stream processors. <br>
+                        Samza supports the following coordination modes out of the box.
                        <dl>
                             <dt><code>org.apache.samza.standalone.PassthroughJobCoordinatorFactory</code></dt>
                             <dd>Fixed partition mapping. No Zoookeeper. </dd>
@@ -461,20 +463,6 @@
                         Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a>
                     </td>
                 </tr>
-                <tr>
-                    <td class="property" id="job.coordination.utils.factory">job.coordination.utils.factory</td>
-                    <td class="default">org.apache.samza.zk.ZkCoordinationUtilsFactory</td>
-                    <td class="description">
-                        Class to use to create CoordinationUtils. Currently available values are:
-                        <dl>
-                            <dt><code>org.apache.samza.zk.ZkCoordinationUtilsFactory</code></dt>
-                            <dd>ZooKeeper based coordination utils.</dd>
-                            <dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt>
-                            <dd>Azure based coordination utils.</dd>
-                        These coordination utils are currently used for intermediate stream creation.
-                        </dl>
-                    </td>
-                </tr>
 
                 <tr>
                     <td class="property" id="job.logged.store.base.dir">job.logged.store.base.dir</td>
@@ -539,7 +527,7 @@
                 </tr>
                 <tr>
                     <td class="property" id="job.debounce.time.ms">job.debounce.time.ms</td>
-                    <td class="default"> 2000 </td>
+                    <td class="default"> 20000 </td>
                     <td class="description">
                         How long the Leader processor will wait before recalculating the JobModel on change of registered processors.
                     </td>

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index 2322727..60c43c3 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -22,33 +22,44 @@ package org.apache.samza.config;
 import com.google.common.base.Strings;
 import org.apache.samza.SamzaException;
 import org.apache.samza.coordinator.CoordinationUtilsFactory;
+import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.util.Util;
 import org.apache.samza.zk.ZkCoordinationUtilsFactory;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
 
 public class JobCoordinatorConfig extends MapConfig {
   public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
-  public static final String JOB_COORDINATION_UTILS_FACTORY = "job.coordination.utils.factory";
-  public final static String DEFAULT_COORDINATION_UTILS_FACTORY = ZkCoordinationUtilsFactory.class.getName();
+  public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
+  private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
+  private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";
 
   public JobCoordinatorConfig(Config config) {
     super(config);
   }
 
   public String getJobCoordinationUtilsFactoryClassName() {
-    String className = get(JOB_COORDINATION_UTILS_FACTORY, DEFAULT_COORDINATION_UTILS_FACTORY);
+    String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_FACTORY);
 
-    if (Strings.isNullOrEmpty(className)) {
-      throw new SamzaException("Empty config for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className);
+    String coordinationUtilsFactory;
+    if (AZURE_COORDINATOR_FACTORY.equals(coordinatorFactory)) {
+      coordinationUtilsFactory = AZURE_COORDINATION_UTILS_FACTORY;
+    } else if (PassthroughJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) {
+      coordinationUtilsFactory = PassthroughCoordinationUtilsFactory.class.getName();
+    } else if (ZkJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) {
+      coordinationUtilsFactory = ZkCoordinationUtilsFactory.class.getName();
+    } else {
+      throw new SamzaException(String.format("Coordination factory: %s defined by the config: %s is invalid.", coordinatorFactory, JOB_COORDINATOR_FACTORY));
     }
 
     try {
-      Class.forName(className);
+      Class.forName(coordinationUtilsFactory);
     } catch (ClassNotFoundException e) {
       throw new SamzaException(
-          "Failed to validate config value for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className, e);
+          "Failed to validate config value for " + JOB_COORDINATOR_FACTORY + " = " + coordinationUtilsFactory, e);
     }
 
-    return className;
+    return coordinationUtilsFactory;
   }
 
   public CoordinationUtilsFactory getCoordinationUtilsFactory() {
@@ -61,10 +72,9 @@ public class JobCoordinatorConfig extends MapConfig {
   public String getJobCoordinatorFactoryClassName() {
     String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
     if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
-      throw new ConfigException(
-          String.format("Missing config - %s. Cannot start StreamProcessor!", JOB_COORDINATOR_FACTORY));
+      return ZkJobCoordinatorFactory.class.getName();
+    } else {
+      return jobCoordinatorFactoryClassName;
     }
-
-    return jobCoordinatorFactoryClassName;
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
deleted file mode 100644
index 2ef92b5..0000000
--- a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config;
-
-import java.util.HashMap;
-import java.util.Map;
-import junit.framework.Assert;
-import org.apache.samza.SamzaException;
-import org.apache.samza.zk.ZkCoordinationUtilsFactory;
-import org.junit.Test;
-
-
-public class TestJobCoordinatorConfig {
-
-  private final static String NONEXISTING_FACTORY_CLASS = "AnotherFactory";
-  private final static String ANOTHER_FACTORY_CLASS = TestJobCoordinatorConfig.class.getName(); // any valid name
-
-  @Test
-  public void testJobCoordinationUtilsFactoryConfig() {
-
-    Map<String, String> map = new HashMap<>();
-    JobCoordinatorConfig jConfig = new JobCoordinatorConfig(new MapConfig(map));
-
-    // test default value
-    Assert.assertEquals(ZkCoordinationUtilsFactory.class.getName(), jConfig.getJobCoordinationUtilsFactoryClassName());
-
-    map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, ANOTHER_FACTORY_CLASS);
-    jConfig = new JobCoordinatorConfig(new MapConfig(map));
-    Assert.assertEquals(ANOTHER_FACTORY_CLASS, jConfig.getJobCoordinationUtilsFactoryClassName());
-
-    // failure case
-    map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, NONEXISTING_FACTORY_CLASS);
-    jConfig = new JobCoordinatorConfig(new MapConfig(map));
-    try {
-      jConfig.getJobCoordinationUtilsFactoryClassName();
-      Assert.fail("Failed to validate loading of fake class: " + NONEXISTING_FACTORY_CLASS);
-    } catch (SamzaException e) {
-      // expected
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index a96fd08..97168d8 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -74,7 +74,6 @@ public class SamzaSqlTestConfig {
     staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
     staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
     staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
-    staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 
     staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index a1103dd..add3bf6 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -44,7 +44,6 @@ import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.operators.KV;
 import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.system.EndOfStreamMessage;
 import org.apache.samza.system.IncomingMessageEnvelope;
@@ -71,7 +70,6 @@ import org.junit.Assert;
  *
  * The following configs are set by default
  *  <ol>
- *    <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li>
  *    <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li>
  *    <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
  *    <li>"job.name" = "test-samza"</li>
@@ -98,7 +96,6 @@ public class TestRunner {
     this.inMemoryScope = RandomStringUtils.random(10, true, true);
     configs.put(JobConfig.JOB_NAME(), JOB_NAME);
     configs.put(JobConfig.PROCESSOR_ID(), "1");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
     addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index abdba01..4c8884d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -38,7 +38,6 @@ import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.test.controlmessages.TestData.PageView;
 import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
@@ -80,7 +79,6 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
 
     configs.put(JobConfig.JOB_NAME(), "test-eos-job");
     configs.put(JobConfig.PROCESSOR_ID(), "1");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 3c86a37..e0097bd 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -58,7 +58,6 @@ import org.apache.samza.serializers.IntegerSerdeFactory;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
 import org.apache.samza.serializers.StringSerdeFactory;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemAdmin;
@@ -133,7 +132,6 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
 
     configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
     configs.put(JobConfig.PROCESSOR_ID(), "1");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
 

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
index 658492a..7e89fa9 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
@@ -52,7 +52,6 @@ public class SchedulingTest extends StreamApplicationIntegrationTestHarness {
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
     configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory");
     configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
 
     runApplication(new TestSchedulingApp(), "SchedulingTest", configs);

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index 144f125..340f0e7 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -82,7 +82,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
     String appName = "UserPageAdClickCounter";
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
     configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put("systems.kafka.samza.delete.committed.messages", "false");
@@ -112,7 +111,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
     final String appName = "UserPageAdClickCounter2";
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
     configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put("systems.kafka.samza.delete.committed.messages", "true");
@@ -160,7 +158,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
     String outputTopicName = "user-ad-click-counts";
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
     configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
     configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1);

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 2e1de96..2f08fed 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -61,7 +61,6 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa
 
     Map<String, String> configs = new HashMap<>();
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
     configs.put(JobConfig.PROCESSOR_ID(), "0");
     configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
 

http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index 4d0d83a..da8af9e 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -48,7 +48,6 @@ import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.serializers.IntegerSerde;
 import org.apache.samza.serializers.KVSerde;
 import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
 import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
 import org.apache.samza.storage.kv.Entry;
 import org.apache.samza.storage.kv.KeyValueStore;
@@ -308,7 +307,6 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
 
     configs.put(JobConfig.JOB_NAME(), "test-table-job");
     configs.put(JobConfig.PROCESSOR_ID(), "1");
-    configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
     configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
     configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());