You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/05/29 23:26:31 UTC

samza git commit: SAMZA-1732: Reduce the coordination timeouts in TestZkLocalApplicationRunner tests.

Repository: samza
Updated Branches:
  refs/heads/master 53d7f2625 -> 9f323c950


SAMZA-1732: Reduce the coordination timeouts in TestZkLocalApplicationRunner tests.

Currently all the tests in TestZkLocalApplicationRunner takes around 5 minutes to finish. Reducing the coordination timeout to reduce the test time.

Changes in TestZkLocalApplicationRunner test timeout values:
* Change debounce timeout from 20 seconds to 2 seconds.
* Change task.shutdown timeout from 30 seconds to 2 seconds.
* Change barrier timeout from 40 seconds to 2 seconds.

After this change, execution time of TestZkLocalApplicationRunner tests has reduced from `310` seconds to `55` seconds.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Jagadish <ja...@apache.org>

Closes #537 from shanthoosh/reduce_zk_localAppRunnerTestTime


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/9f323c95
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/9f323c95
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/9f323c95

Branch: refs/heads/master
Commit: 9f323c950410ad52cd29a9ac8def65af5b578487
Parents: 53d7f26
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue May 29 16:26:25 2018 -0700
Committer: Jagadish <jv...@linkedin.com>
Committed: Tue May 29 16:26:25 2018 -0700

----------------------------------------------------------------------
 .../processor/TestZkLocalApplicationRunner.java     | 16 ++++++----------
 1 file changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/9f323c95/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 0b0a271..ea44052 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -88,8 +88,9 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
   private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
   private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
-  private static final String TASK_SHUTDOWN_MS = "20000";
-  private static final String JOB_DEBOUNCE_TIME_MS = "30000";
+  private static final String TASK_SHUTDOWN_MS = "2000";
+  private static final String JOB_DEBOUNCE_TIME_MS = "2000";
+  private static final String BARRIER_TIMEOUT_MS = "2000";
   private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
 
   private String inputKafkaTopic;
@@ -185,6 +186,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
       String appName, String appId) {
     Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
+        .put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS)
         .put(TaskConfig.INPUT_STREAMS(), inputTopic)
         .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
         .put(TaskConfig.IGNORED_EXCEPTIONS(), "*")
@@ -489,11 +491,10 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     processedMessagesLatch1.await();
     processedMessagesLatch2.await();
 
-    LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(applicationConfig2);
+    MapConfig appConfig = new ApplicationConfig(new MapConfig(applicationConfig2, ImmutableMap.of(ZkConfig.ZK_SESSION_TIMEOUT_MS, "10")));
+    LocalApplicationRunner applicationRunner3 = new LocalApplicationRunner(appConfig);
 
     // Create a stream app with same processor id as SP2 and run it. It should fail.
-    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, 2 * NUM_KAFKA_EVENTS, PROCESSOR_IDS[2]);
-    kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
     StreamApplication streamApp3 = TestStreamApplication.getInstance(inputKafkaTopic, outputKafkaTopic, null, null, kafkaEventsConsumedLatch, applicationConfig2);
     // Fail when the duplicate processor joins.
     expectedException.expect(SamzaException.class);
@@ -514,12 +515,7 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     // Set up kafka topics.
     publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
 
-    /**
-     * Custom listeners can't be plugged in for transition events(generatingNewJobModel, waitingForProcessors, waitingForBarrierCompletion etc) from zkJobCoordinator. Only possible listeners
-     * are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry.
-     */
     Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
-    configMap.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
 
     configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
     Config applicationConfig1 = new MapConfig(configMap);