You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:19:50 UTC

[09/26] samza git commit: SAMZA-1396 TestZkLocalApplicationRunner tests fails after SAMZA-1385

SAMZA-1396 TestZkLocalApplicationRunner tests fails after SAMZA-1385

* Fixes ZkPath issues
* Fixes appname / jobname mismatch

Author: Navina Ramesh <na...@apache.org>

Reviewers: Xinyu Liu <xi...@apache.org>, Bharath Kumarasubramanian <co...@gmail.com>

Closes #274 from navina/SAMZA-1396


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/119e2fa0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/119e2fa0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/119e2fa0

Branch: refs/heads/0.14.0
Commit: 119e2fa0126a66f949cba8d2e2d8cbb2a36cc1ec
Parents: 89989fd
Author: Navina Ramesh <na...@apache.org>
Authored: Wed Aug 16 14:42:18 2017 -0700
Committer: navina <na...@apache.org>
Committed: Wed Aug 16 14:42:18 2017 -0700

----------------------------------------------------------------------
 .../samza/zk/ZkJobCoordinatorFactory.java       |  2 +-
 .../processor/TestZkLocalApplicationRunner.java | 62 ++++++++++++--------
 2 files changed, 39 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/119e2fa0/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 08d826e..563bf4c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -60,7 +60,7 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
     return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
   }
 
-  private String getJobCoordinationZkPath(Config config) {
+  public static String getJobCoordinationZkPath(Config config) {
     JobConfig jobConfig = new JobConfig(config);
     String appId = new ApplicationConfig(config).getGlobalAppId();
     String jobName = jobConfig.getName().isDefined()

http://git-wip-us.apache.org/repos/asf/samza/blob/119e2fa0/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 9ca48ad..76fd046 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,15 +23,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
 import kafka.admin.AdminUtils;
 import kafka.server.KafkaServer;
 import kafka.utils.TestUtils;
@@ -58,6 +49,7 @@ import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.test.StandaloneIntegrationTestHarness;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
 import org.apache.samza.zk.ZkKeyBuilder;
 import org.apache.samza.zk.ZkUtils;
 import org.junit.Rule;
@@ -68,7 +60,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
 
-import static org.junit.Assert.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Integration tests for {@link LocalApplicationRunner}.
@@ -87,7 +90,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
   private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
   private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
-  private static final String TEST_JOB_NAME = "test-job";
   private static final String TASK_SHUTDOWN_MS = "3000";
   private static final String JOB_DEBOUNCE_TIME_MS = "1000";
   private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
@@ -118,16 +120,23 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
     inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId);
     outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
+
+    // Set up stream application config map with the given testStreamAppName, testStreamAppId and test kafka system
+    // TODO: processorId should typically come up from a processorID generator as processor.id will be deprecated in 0.14.0+
+    Map<String, String> configMap =
+        buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+    applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+    applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+    applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
+
     ZkClient zkClient = new ZkClient(zkConnect());
-    ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(String.format("app-%s-%s", testStreamAppName, testStreamAppId));
+    ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(ZkJobCoordinatorFactory.getJobCoordinationZkPath(applicationConfig1));
     zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
     zkUtils.connect();
 
-    // Set up stream application configs with different processorIds and same testStreamAppName, testStreamAppId.
-    applicationConfig1 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId);
-    applicationConfig2 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId);
-    applicationConfig3 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[2], testStreamAppName, testStreamAppId);
-
     // Create local application runners.
     applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
     applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
@@ -164,8 +173,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
     }
   }
 
-  private ApplicationConfig buildStreamApplicationConfig(String systemName, String inputTopic,
-                                                         String processorId, String appName, String appId) {
+  private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
+                                                              String appName, String appId) {
     Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
         .put(TaskConfig.INPUT_STREAMS(), inputTopic)
         .put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
@@ -174,17 +183,17 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
         .put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY)
         .put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY)
         .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY)
-        .put(JobConfig.PROCESSOR_ID(), processorId)
         .put(ApplicationConfig.APP_NAME, appName)
         .put(ApplicationConfig.APP_ID, appId)
         .put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
-        .put(JobConfig.JOB_NAME(), TEST_JOB_NAME)
+        .put(JobConfig.JOB_NAME(), appName)
+        .put(JobConfig.JOB_ID(), appId)
         .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
         .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
         .build();
     Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
     applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
-    return new ApplicationConfig(new MapConfig(applicationConfig));
+    return applicationConfig;
   }
 
   @Test
@@ -354,9 +363,14 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
      * are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry.
      */
     Map<String, String> debounceTimeConfig = ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
+    Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
+    configMap.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
+
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+    Config applicationConfig1 = new MapConfig(configMap);
 
-    Config applicationConfig1 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId), debounceTimeConfig));
-    Config applicationConfig2 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId), debounceTimeConfig));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+    Config applicationConfig2 = new MapConfig(configMap);
 
     LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
     LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);