You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:19:50 UTC
[09/26] samza git commit: SAMZA-1396 TestZkLocalApplicationRunner
tests fails after SAMZA-1385
SAMZA-1396 TestZkLocalApplicationRunner tests fails after SAMZA-1385
* Fixes ZkPath issues
* Fixes appname / jobname mismatch
Author: Navina Ramesh <na...@apache.org>
Reviewers: Xinyu Liu <xi...@apache.org>, Bharath Kumarasubramanian <co...@gmail.com>
Closes #274 from navina/SAMZA-1396
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/119e2fa0
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/119e2fa0
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/119e2fa0
Branch: refs/heads/0.14.0
Commit: 119e2fa0126a66f949cba8d2e2d8cbb2a36cc1ec
Parents: 89989fd
Author: Navina Ramesh <na...@apache.org>
Authored: Wed Aug 16 14:42:18 2017 -0700
Committer: navina <na...@apache.org>
Committed: Wed Aug 16 14:42:18 2017 -0700
----------------------------------------------------------------------
.../samza/zk/ZkJobCoordinatorFactory.java | 2 +-
.../processor/TestZkLocalApplicationRunner.java | 62 ++++++++++++--------
2 files changed, 39 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/119e2fa0/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index 08d826e..563bf4c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -60,7 +60,7 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
}
- private String getJobCoordinationZkPath(Config config) {
+ public static String getJobCoordinationZkPath(Config config) {
JobConfig jobConfig = new JobConfig(config);
String appId = new ApplicationConfig(config).getGlobalAppId();
String jobName = jobConfig.getName().isDefined()
http://git-wip-us.apache.org/repos/asf/samza/blob/119e2fa0/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index 9ca48ad..76fd046 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,15 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminUtils;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
@@ -58,6 +49,7 @@ import org.apache.samza.runtime.LocalApplicationRunner;
import org.apache.samza.test.StandaloneIntegrationTestHarness;
import org.apache.samza.test.StandaloneTestUtils;
import org.apache.samza.util.NoOpMetricsRegistry;
+import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.apache.samza.zk.ZkKeyBuilder;
import org.apache.samza.zk.ZkUtils;
import org.junit.Rule;
@@ -68,7 +60,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
-import static org.junit.Assert.*;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Integration tests for {@link LocalApplicationRunner}.
@@ -87,7 +90,6 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
private static final String TEST_TASK_GROUPER_FACTORY = "org.apache.samza.container.grouper.task.GroupByContainerIdsFactory";
private static final String TEST_JOB_COORDINATOR_FACTORY = "org.apache.samza.zk.ZkJobCoordinatorFactory";
private static final String TEST_SYSTEM_FACTORY = "org.apache.samza.system.kafka.KafkaSystemFactory";
- private static final String TEST_JOB_NAME = "test-job";
private static final String TASK_SHUTDOWN_MS = "3000";
private static final String JOB_DEBOUNCE_TIME_MS = "1000";
private static final String[] PROCESSOR_IDS = new String[] {"0000000000", "0000000001", "0000000002"};
@@ -118,16 +120,23 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId);
outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
+
+ // Set up stream application config map with the given testStreamAppName, testStreamAppId and test kafka system
+ // TODO: processorId should typically come up from a processorID generator as processor.id will be deprecated in 0.14.0+
+ Map<String, String> configMap =
+ buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+ applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+ applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+ applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
+
ZkClient zkClient = new ZkClient(zkConnect());
- ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(String.format("app-%s-%s", testStreamAppName, testStreamAppId));
+ ZkKeyBuilder zkKeyBuilder = new ZkKeyBuilder(ZkJobCoordinatorFactory.getJobCoordinationZkPath(applicationConfig1));
zkUtils = new ZkUtils(zkKeyBuilder, zkClient, ZK_CONNECTION_TIMEOUT_MS, new NoOpMetricsRegistry());
zkUtils.connect();
- // Set up stream application configs with different processorIds and same testStreamAppName, testStreamAppId.
- applicationConfig1 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId);
- applicationConfig2 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId);
- applicationConfig3 = buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[2], testStreamAppName, testStreamAppId);
-
// Create local application runners.
applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
applicationRunner2 = new LocalApplicationRunner(applicationConfig2);
@@ -164,8 +173,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
}
}
- private ApplicationConfig buildStreamApplicationConfig(String systemName, String inputTopic,
- String processorId, String appName, String appId) {
+ private Map<String, String> buildStreamApplicationConfigMap(String systemName, String inputTopic,
+ String appName, String appId) {
Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
.put(TaskConfig.INPUT_STREAMS(), inputTopic)
.put(JobConfig.JOB_DEFAULT_SYSTEM(), systemName)
@@ -174,17 +183,17 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
.put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY)
.put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY)
.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY)
- .put(JobConfig.PROCESSOR_ID(), processorId)
.put(ApplicationConfig.APP_NAME, appName)
.put(ApplicationConfig.APP_ID, appId)
.put(String.format("systems.%s.samza.factory", systemName), TEST_SYSTEM_FACTORY)
- .put(JobConfig.JOB_NAME(), TEST_JOB_NAME)
+ .put(JobConfig.JOB_NAME(), appName)
+ .put(JobConfig.JOB_ID(), appId)
.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
.build();
Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(systemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
- return new ApplicationConfig(new MapConfig(applicationConfig));
+ return applicationConfig;
}
@Test
@@ -354,9 +363,14 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
* are for ZkJobCoordinator output(onNewJobModelConfirmed, onNewJobModelAvailable). Increasing DefaultDebounceTime to make sure that streamApplication dies & rejoins before expiry.
*/
Map<String, String> debounceTimeConfig = ImmutableMap.of(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
+ Map<String, String> configMap = buildStreamApplicationConfigMap(TEST_SYSTEM, inputKafkaTopic, testStreamAppName, testStreamAppId);
+ configMap.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), "40000");
+
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+ Config applicationConfig1 = new MapConfig(configMap);
- Config applicationConfig1 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[0], testStreamAppName, testStreamAppId), debounceTimeConfig));
- Config applicationConfig2 = new MapConfig(ImmutableList.of(buildStreamApplicationConfig(TEST_SYSTEM, inputKafkaTopic, PROCESSOR_IDS[1], testStreamAppName, testStreamAppId), debounceTimeConfig));
+ configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+ Config applicationConfig2 = new MapConfig(configMap);
LocalApplicationRunner applicationRunner1 = new LocalApplicationRunner(applicationConfig1);
LocalApplicationRunner applicationRunner2 = new LocalApplicationRunner(applicationConfig2);