You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/10/05 21:32:13 UTC
samza git commit: SAMZA-1926: Fix standalone configurations in
configuration table.
Repository: samza
Updated Branches:
refs/heads/master 4baaddbbb -> 531b35e9f
SAMZA-1926: Fix standalone configurations in configuration table.
Changes:
* Fix the default value of debounce time configuration.
* Remove the `coordination.utils.factory` configuration from the table(Infer that based upon job.coordinator.factory configuration). Remove the definition of `coordination.utils.factory` from the configuration in unit tests.
* Default the configuration `job.coordinator.factory` to `ZkJobCoordinatorFactory` if it is not defined by the user.
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Reviewers: Jagadish<ja...@apache.org>, Prateek M<pm...@linkedin.com>
Closes #675 from shanthoosh/master
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/531b35e9
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/531b35e9
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/531b35e9
Branch: refs/heads/master
Commit: 531b35e9f0f2a26933bb445f2b5a1301580093de
Parents: 4baaddb
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Fri Oct 5 14:30:42 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Oct 5 14:30:42 2018 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 22 ++------
.../samza/config/JobCoordinatorConfig.java | 34 ++++++++----
.../samza/config/TestJobCoordinatorConfig.java | 58 --------------------
.../samza/sql/testutil/SamzaSqlTestConfig.java | 1 -
.../apache/samza/test/framework/TestRunner.java | 3 -
.../EndOfStreamIntegrationTest.java | 2 -
.../WatermarkIntegrationTest.java | 2 -
.../samza/test/framework/SchedulingTest.java | 1 -
.../operator/TestRepartitionJoinWindowApp.java | 3 -
.../test/operator/TestRepartitionWindowApp.java | 1 -
.../apache/samza/test/table/TestLocalTable.java | 2 -
11 files changed, 27 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 26b4661..35ddcab 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -448,9 +448,11 @@
</tr>
<tr>
<td class="property" id="job.coordinator.factory">job.coordinator.factory</td>
- <td class="default"></td>
+ <td class="default">org.apache.samza.zk.ZkJobCoordinatorFactory</td>
<td class="description">
- Class to use for job coordination. Currently available values are:
+ The fully-qualified name of the Java class which determines the factory class which will build the JobCoordinator.
+ The user can specify a custom implementation of the JobCoordinatorFactory where a custom logic is implemented for distributed coordination of stream processors. <br>
+ Samza supports the following coordination modes out of the box.
<dl>
<dt><code>org.apache.samza.standalone.PassthroughJobCoordinatorFactory</code></dt>
<dd>Fixed partition mapping. No Zoookeeper. </dd>
@@ -461,20 +463,6 @@
Required only for non-cluster-managed applications. Please see the required value for <a href=#task-name-grouper-factory>task-name-grouper-factory </a>
</td>
</tr>
- <tr>
- <td class="property" id="job.coordination.utils.factory">job.coordination.utils.factory</td>
- <td class="default">org.apache.samza.zk.ZkCoordinationUtilsFactory</td>
- <td class="description">
- Class to use to create CoordinationUtils. Currently available values are:
- <dl>
- <dt><code>org.apache.samza.zk.ZkCoordinationUtilsFactory</code></dt>
- <dd>ZooKeeper based coordination utils.</dd>
- <dt><code>org.apache.samza.coordinator.AzureCoordinationUtilsFactory</code></dt>
- <dd>Azure based coordination utils.</dd>
- These coordination utils are currently used for intermediate stream creation.
- </dl>
- </td>
- </tr>
<tr>
<td class="property" id="job.logged.store.base.dir">job.logged.store.base.dir</td>
@@ -539,7 +527,7 @@
</tr>
<tr>
<td class="property" id="job.debounce.time.ms">job.debounce.time.ms</td>
- <td class="default"> 2000 </td>
+ <td class="default"> 20000 </td>
<td class="description">
How long the Leader processor will wait before recalculating the JobModel on change of registered processors.
</td>
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
index 2322727..60c43c3 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobCoordinatorConfig.java
@@ -22,33 +22,44 @@ package org.apache.samza.config;
import com.google.common.base.Strings;
import org.apache.samza.SamzaException;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
+import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
+import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.util.Util;
import org.apache.samza.zk.ZkCoordinationUtilsFactory;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
public class JobCoordinatorConfig extends MapConfig {
public static final String JOB_COORDINATOR_FACTORY = "job.coordinator.factory";
- public static final String JOB_COORDINATION_UTILS_FACTORY = "job.coordination.utils.factory";
- public final static String DEFAULT_COORDINATION_UTILS_FACTORY = ZkCoordinationUtilsFactory.class.getName();
+ public final static String DEFAULT_COORDINATOR_FACTORY = ZkJobCoordinatorFactory.class.getName();
+ private static final String AZURE_COORDINATION_UTILS_FACTORY = "org.apache.samza.coordinator.AzureCoordinationUtilsFactory";
+ private static final String AZURE_COORDINATOR_FACTORY = "org.apache.samza.coordinator.AzureJobCoordinatorFactory";
public JobCoordinatorConfig(Config config) {
super(config);
}
public String getJobCoordinationUtilsFactoryClassName() {
- String className = get(JOB_COORDINATION_UTILS_FACTORY, DEFAULT_COORDINATION_UTILS_FACTORY);
+ String coordinatorFactory = get(JOB_COORDINATOR_FACTORY, DEFAULT_COORDINATOR_FACTORY);
- if (Strings.isNullOrEmpty(className)) {
- throw new SamzaException("Empty config for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className);
+ String coordinationUtilsFactory;
+ if (AZURE_COORDINATOR_FACTORY.equals(coordinatorFactory)) {
+ coordinationUtilsFactory = AZURE_COORDINATION_UTILS_FACTORY;
+ } else if (PassthroughJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) {
+ coordinationUtilsFactory = PassthroughCoordinationUtilsFactory.class.getName();
+ } else if (ZkJobCoordinatorFactory.class.getName().equals(coordinatorFactory)) {
+ coordinationUtilsFactory = ZkCoordinationUtilsFactory.class.getName();
+ } else {
+ throw new SamzaException(String.format("Coordination factory: %s defined by the config: %s is invalid.", coordinatorFactory, JOB_COORDINATOR_FACTORY));
}
try {
- Class.forName(className);
+ Class.forName(coordinationUtilsFactory);
} catch (ClassNotFoundException e) {
throw new SamzaException(
- "Failed to validate config value for " + JOB_COORDINATION_UTILS_FACTORY + " = " + className, e);
+ "Failed to validate config value for " + JOB_COORDINATOR_FACTORY + " = " + coordinationUtilsFactory, e);
}
- return className;
+ return coordinationUtilsFactory;
}
public CoordinationUtilsFactory getCoordinationUtilsFactory() {
@@ -61,10 +72,9 @@ public class JobCoordinatorConfig extends MapConfig {
public String getJobCoordinatorFactoryClassName() {
String jobCoordinatorFactoryClassName = get(JOB_COORDINATOR_FACTORY);
if (Strings.isNullOrEmpty(jobCoordinatorFactoryClassName)) {
- throw new ConfigException(
- String.format("Missing config - %s. Cannot start StreamProcessor!", JOB_COORDINATOR_FACTORY));
+ return ZkJobCoordinatorFactory.class.getName();
+ } else {
+ return jobCoordinatorFactoryClassName;
}
-
- return jobCoordinatorFactoryClassName;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java b/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
deleted file mode 100644
index 2ef92b5..0000000
--- a/samza-core/src/test/scala/org/apache/samza/config/TestJobCoordinatorConfig.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.config;
-
-import java.util.HashMap;
-import java.util.Map;
-import junit.framework.Assert;
-import org.apache.samza.SamzaException;
-import org.apache.samza.zk.ZkCoordinationUtilsFactory;
-import org.junit.Test;
-
-
-public class TestJobCoordinatorConfig {
-
- private final static String NONEXISTING_FACTORY_CLASS = "AnotherFactory";
- private final static String ANOTHER_FACTORY_CLASS = TestJobCoordinatorConfig.class.getName(); // any valid name
-
- @Test
- public void testJobCoordinationUtilsFactoryConfig() {
-
- Map<String, String> map = new HashMap<>();
- JobCoordinatorConfig jConfig = new JobCoordinatorConfig(new MapConfig(map));
-
- // test default value
- Assert.assertEquals(ZkCoordinationUtilsFactory.class.getName(), jConfig.getJobCoordinationUtilsFactoryClassName());
-
- map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, ANOTHER_FACTORY_CLASS);
- jConfig = new JobCoordinatorConfig(new MapConfig(map));
- Assert.assertEquals(ANOTHER_FACTORY_CLASS, jConfig.getJobCoordinationUtilsFactoryClassName());
-
- // failure case
- map.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, NONEXISTING_FACTORY_CLASS);
- jConfig = new JobCoordinatorConfig(new MapConfig(map));
- try {
- jConfig.getJobCoordinationUtilsFactoryClassName();
- Assert.fail("Failed to validate loading of fake class: " + NONEXISTING_FACTORY_CLASS);
- } catch (SamzaException e) {
- // expected
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
----------------------------------------------------------------------
diff --git a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
index a96fd08..97168d8 100644
--- a/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
+++ b/samza-sql/src/test/java/org/apache/samza/sql/testutil/SamzaSqlTestConfig.java
@@ -74,7 +74,6 @@ public class SamzaSqlTestConfig {
staticConfigs.put(JobConfig.JOB_NAME(), "sql-job");
staticConfigs.put(JobConfig.PROCESSOR_ID(), "1");
staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
- staticConfigs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
staticConfigs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
staticConfigs.put(SamzaSqlApplicationConfig.CFG_IO_RESOLVER, "config");
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
index a1103dd..add3bf6 100644
--- a/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
+++ b/samza-test/src/main/java/org/apache/samza/test/framework/TestRunner.java
@@ -44,7 +44,6 @@ import org.apache.samza.container.grouper.task.SingleContainerGrouperFactory;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.operators.KV;
import org.apache.samza.runtime.LocalApplicationRunner;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.EndOfStreamMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -71,7 +70,6 @@ import org.junit.Assert;
*
* The following configs are set by default
* <ol>
- * <li>"job.coordination.utils.factory" = {@link PassthroughCoordinationUtilsFactory}</li>
* <li>"job.coordination.factory" = {@link PassthroughJobCoordinatorFactory}</li>
* <li>"task.name.grouper.factory" = {@link SingleContainerGrouperFactory}</li>
* <li>"job.name" = "test-samza"</li>
@@ -98,7 +96,6 @@ public class TestRunner {
this.inMemoryScope = RandomStringUtils.random(10, true, true);
configs.put(JobConfig.JOB_NAME(), JOB_NAME);
configs.put(JobConfig.PROCESSOR_ID(), "1");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
addConfig(JobConfig.JOB_DEFAULT_SYSTEM(), JOB_DEFAULT_SYSTEM);
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
index abdba01..4c8884d 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/EndOfStreamIntegrationTest.java
@@ -38,7 +38,6 @@ import org.apache.samza.runtime.ApplicationRunner;
import org.apache.samza.runtime.ApplicationRunners;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.test.controlmessages.TestData.PageView;
import org.apache.samza.test.controlmessages.TestData.PageViewJsonSerdeFactory;
@@ -80,7 +79,6 @@ public class EndOfStreamIntegrationTest extends AbstractIntegrationTestHarness {
configs.put(JobConfig.JOB_NAME(), "test-eos-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
index 3c86a37..e0097bd 100644
--- a/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/controlmessages/WatermarkIntegrationTest.java
@@ -58,7 +58,6 @@ import org.apache.samza.serializers.IntegerSerdeFactory;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.serializers.StringSerdeFactory;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
@@ -133,7 +132,6 @@ public class WatermarkIntegrationTest extends AbstractIntegrationTestHarness {
configs.put(JobConfig.JOB_NAME(), "test-watermark-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
index 658492a..7e89fa9 100644
--- a/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
+++ b/samza-test/src/test/java/org/apache/samza/test/framework/SchedulingTest.java
@@ -52,7 +52,6 @@ public class SchedulingTest extends StreamApplicationIntegrationTestHarness {
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configs.put("job.systemstreampartition.grouper.factory", "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory");
configs.put("task.name.grouper.factory", "org.apache.samza.container.grouper.task.SingleContainerGrouperFactory");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
runApplication(new TestSchedulingApp(), "SchedulingTest", configs);
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
index 144f125..340f0e7 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionJoinWindowApp.java
@@ -82,7 +82,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
String appName = "UserPageAdClickCounter";
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put("systems.kafka.samza.delete.committed.messages", "false");
@@ -112,7 +111,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
final String appName = "UserPageAdClickCounter2";
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put("systems.kafka.samza.delete.committed.messages", "true");
@@ -160,7 +158,6 @@ public class TestRepartitionJoinWindowApp extends StreamApplicationIntegrationTe
String outputTopicName = "user-ad-click-counts";
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
configs.put(BroadcastAssertApp.INPUT_TOPIC_NAME_PROP, inputTopicName1);
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 2e1de96..2f08fed 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -61,7 +61,6 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa
Map<String, String> configs = new HashMap<>();
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, "org.apache.samza.standalone.PassthroughCoordinationUtilsFactory");
configs.put(JobConfig.PROCESSOR_ID(), "0");
configs.put(TaskConfig.GROUPER_FACTORY(), "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory");
http://git-wip-us.apache.org/repos/asf/samza/blob/531b35e9/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
index 4d0d83a..da8af9e 100644
--- a/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
+++ b/samza-test/src/test/java/org/apache/samza/test/table/TestLocalTable.java
@@ -48,7 +48,6 @@ import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.serializers.IntegerSerde;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.NoOpSerde;
-import org.apache.samza.standalone.PassthroughCoordinationUtilsFactory;
import org.apache.samza.standalone.PassthroughJobCoordinatorFactory;
import org.apache.samza.storage.kv.Entry;
import org.apache.samza.storage.kv.KeyValueStore;
@@ -308,7 +307,6 @@ public class TestLocalTable extends AbstractIntegrationTestHarness {
configs.put(JobConfig.JOB_NAME(), "test-table-job");
configs.put(JobConfig.PROCESSOR_ID(), "1");
- configs.put(JobCoordinatorConfig.JOB_COORDINATION_UTILS_FACTORY, PassthroughCoordinationUtilsFactory.class.getName());
configs.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, PassthroughJobCoordinatorFactory.class.getName());
configs.put(TaskConfig.GROUPER_FACTORY(), SingleContainerGrouperFactory.class.getName());