You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/07/25 22:32:32 UTC
samza git commit: SAMZA-1282: Spinning up more containers than number
of tasks.
Repository: samza
Updated Branches:
refs/heads/master 35143b676 -> 57758615b
SAMZA-1282: Spinning up more containers than number of tasks.
Changes
* Stop streamProcessor in onNewJobModelAvailable eventHandler(instead of onNewJobModelConfirmed
eventHandler) when it's not part of the group and prevent it from joining the barrier.
* When numContainerIds > numTaskModels, generate JobModel by choosing lexicographically
least `x` containerIds(where x = numTaskModels).
* Added unit and integration tests in appropriate classes to verify the expected behavior.
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Reviewers: Boris Shkolnik <bo...@apache.org>, Navina Ramesh <na...@apache.org>
Closes #244 from shanthoosh/more_processor_than_tasks
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57758615
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57758615
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57758615
Branch: refs/heads/master
Commit: 57758615b3f8713364ac1afecab4f5355f64d1d4
Parents: 35143b6
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue Jul 25 15:32:22 2017 -0700
Committer: navina <na...@apache.org>
Committed: Tue Jul 25 15:32:22 2017 -0700
----------------------------------------------------------------------
.../grouper/task/GroupByContainerIds.java | 18 +++-
.../apache/samza/processor/StreamProcessor.java | 91 +++++++++-----------
.../org/apache/samza/zk/ZkJobCoordinator.java | 52 +++++------
.../samza/zk/ZkJobCoordinatorFactory.java | 22 ++++-
.../grouper/task/TestGroupByContainerIds.java | 31 +++++--
.../apache/samza/zk/TestZkJobCoordinator.java | 47 ++++++++++
.../processor/TestZkLocalApplicationRunner.java | 78 ++++++++++++++++-
7 files changed, 248 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index 651dca7..f5a5a86 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -20,6 +20,7 @@
package org.apache.samza.container.grouper.task;
import java.util.Arrays;
+import java.util.stream.Collectors;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
@@ -31,6 +32,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -39,6 +42,8 @@ import java.util.Set;
* IDs as an argument. Please note - this first implementation ignores locality information.
*/
public class GroupByContainerIds implements TaskNameGrouper {
+ private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerIds.class);
+
private final int startContainerCount;
public GroupByContainerIds(int count) {
this.startContainerCount = count;
@@ -64,8 +69,17 @@ public class GroupByContainerIds implements TaskNameGrouper {
throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
.toString(containersIds.toArray()));
- if (containersIds.size() > tasks.size())
- throw new IllegalArgumentException("number of containers " + containersIds.size() + " is bigger than number of tasks " + tasks.size());
+ if (containersIds.size() > tasks.size()) {
+ LOG.warn("Number of containers: {} is greater than number of tasks: {}.", containersIds.size(), tasks.size());
+ /**
+ * Choose lexicographically least `x` containerIds(where x = tasks.size()).
+ */
+ containersIds = containersIds.stream()
+ .sorted()
+ .limit(tasks.size())
+ .collect(Collectors.toList());
+ LOG.info("Generating containerModel with containers: {}.", containersIds);
+ }
int containerCount = containersIds.size();
http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 590fa11..415111f 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -244,64 +244,59 @@ public class StreamProcessor {
@Override
public void onNewJobModel(String processorId, JobModel jobModel) {
- if (!jobModel.getContainers().containsKey(processorId)) {
- LOGGER.warn("JobModel does not contain the processorId: " + processorId + ". Stopping the processor.");
- stop();
- } else {
- jcContainerShutdownLatch = new CountDownLatch(1);
-
- SamzaContainerListener containerListener = new SamzaContainerListener() {
- @Override
- public void onContainerStart() {
- if (!processorOnStartCalled) {
- // processorListener is called on start only the first time the container starts.
- // It is not called after every re-balance of partitions among the processors
- processorOnStartCalled = true;
- if (processorListener != null) {
- processorListener.onStart();
- }
- } else {
- LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
- }
- }
+ jcContainerShutdownLatch = new CountDownLatch(1);
- @Override
- public void onContainerStop(boolean pauseByJm) {
- if (pauseByJm) {
- LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
- if (jcContainerShutdownLatch != null) {
- jcContainerShutdownLatch.countDown();
- }
- } else { // sp.stop was called or container stopped by itself
- LOGGER.info("Container " + container.toString() + " stopped.");
- container = null; // this guarantees that stop() doesn't try to stop container again
- stop();
+ SamzaContainerListener containerListener = new SamzaContainerListener() {
+ @Override
+ public void onContainerStart() {
+ if (!processorOnStartCalled) {
+ // processorListener is called on start only the first time the container starts.
+ // It is not called after every re-balance of partitions among the processors
+ processorOnStartCalled = true;
+ if (processorListener != null) {
+ processorListener.onStart();
}
+ } else {
+ LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
}
+ }
- @Override
- public void onContainerFailed(Throwable t) {
+ @Override
+ public void onContainerStop(boolean pauseByJm) {
+ if (pauseByJm) {
+ LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
if (jcContainerShutdownLatch != null) {
jcContainerShutdownLatch.countDown();
- } else {
- LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
}
- containerException = t;
- LOGGER.error("Container failed. Stopping the processor.", containerException);
- container = null;
+ } else { // sp.stop was called or container stopped by itself
+ LOGGER.info("Container " + container.toString() + " stopped.");
+ container = null; // this guarantees that stop() doesn't try to stop container again
stop();
}
- };
+ }
- container = createSamzaContainer(
- jobModel.getContainers().get(processorId),
- jobModel.maxChangeLogStreamPartitions);
- container.setContainerListener(containerListener);
- LOGGER.info("Starting container " + container.toString());
- executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
- .setNameFormat("p-" + processorId + "-container-thread-%d").build());
- executorService.submit(container::run);
- }
+ @Override
+ public void onContainerFailed(Throwable t) {
+ if (jcContainerShutdownLatch != null) {
+ jcContainerShutdownLatch.countDown();
+ } else {
+ LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
+ }
+ containerException = t;
+ LOGGER.error("Container failed. Stopping the processor.", containerException);
+ container = null;
+ stop();
+ }
+ };
+
+ container = createSamzaContainer(
+ jobModel.getContainers().get(processorId),
+ jobModel.maxChangeLogStreamPartitions);
+ container.setContainerListener(containerListener);
+ LOGGER.info("Starting container " + container.toString());
+ executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ .setNameFormat("p-" + processorId + "-container-thread-%d").build());
+ executorService.submit(container::run);
}
@Override
http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index e973099..2204240 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkClient;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
@@ -77,31 +76,29 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
private JobModel newJobModel;
private int debounceTimeMs;
- public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry) {
+ ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
this.config = config;
- ZkConfig zkConfig = new ZkConfig(config);
- ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
- ZkClient zkClient = ZkCoordinationServiceFactory
- .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
-
- // setup a listener for a session state change
- // we are mostly interested in "session closed" and "new session created" events
- zkClient.subscribeStateChanges(new ZkSessionStateChangedListener());
this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
- this.zkUtils = new ZkUtils(
- keyBuilder,
- zkClient,
- zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
this.processorId = createProcessorId(config);
+ this.zkUtils = zkUtils;
+ // setup a listener for a session state change
+ // we are mostly interested in "session closed" and "new session created" events
+ zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
- this.barrier = new ZkBarrierForVersionUpgrade(keyBuilder.getJobModelVersionBarrierPrefix(), zkUtils,
+ this.barrier = new ZkBarrierForVersionUpgrade(
+ zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(),
+ zkUtils,
new ZkBarrierListenerImpl());
this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
+ debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
+ LOG.error("Received exception from in JobCoordinator Processing!", throwable);
+ stop();
+ });
}
@Override
@@ -109,12 +106,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
startMetrics();
streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
- debounceTimer = new ScheduleAfterDebounceTime(throwable ->
- {
- LOG.error("Received exception from in JobCoordinator Processing!", throwable);
- stop();
- });
-
zkController.register();
}
@@ -212,18 +203,21 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
{
LOG.info("pid=" + processorId + "new JobModel available");
-
- // stop current work
- if (coordinatorListener != null) {
- coordinatorListener.onJobModelExpired();
- }
// get the new job model from ZK
newJobModel = zkUtils.getJobModel(version);
-
LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
- // update ZK and wait for all the processors to get this new version
- barrier.join(version, processorId);
+ if (!newJobModel.getContainers().containsKey(processorId)) {
+ LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", newJobModel, processorId);
+ stop();
+ } else {
+ // stop current work
+ if (coordinatorListener != null) {
+ coordinatorListener.onJobModelExpired();
+ }
+ // update ZK and wait for all the processors to get this new version
+ barrier.join(version, processorId);
+ }
});
}
http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index c077f94..85e3b4a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -19,13 +19,21 @@
package org.apache.samza.zk;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
+import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.metrics.MetricsRegistryMap;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
+
/**
* Method to instantiate an implementation of JobCoordinator
*
@@ -34,6 +42,16 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
*/
@Override
public JobCoordinator getJobCoordinator(Config config) {
- return new ZkJobCoordinator(config, new MetricsRegistryMap());
+ MetricsRegistry metricsRegistry = new MetricsRegistryMap();
+ ZkUtils zkUtils = getZkUtils(config, metricsRegistry);
+ LOG.debug("Creating ZkJobCoordinator instance with config: {}.", config);
+ return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
+ }
+
+ private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) {
+ ZkConfig zkConfig = new ZkConfig(config);
+ ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
+ ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+ return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
index cd2cc3d..13afeef 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
@@ -19,6 +19,8 @@
package org.apache.samza.container.grouper.task;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -26,16 +28,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
import org.junit.Before;
import org.junit.Test;
import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels;
-import static org.apache.samza.container.mock.ContainerMocks.getTaskModel;
import static org.apache.samza.container.mock.ContainerMocks.getTaskName;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -73,13 +76,6 @@ public class TestGroupByContainerIds {
buildSimpleGrouper(1).group(new HashSet());
}
- @Test(expected = IllegalArgumentException.class)
- public void testGroupFewerTasksThanContainers() {
- Set<TaskModel> taskModels = new HashSet<>();
- taskModels.add(getTaskModel(1));
- buildSimpleGrouper(2).group(taskModels);
- }
-
@Test(expected = UnsupportedOperationException.class)
public void testGrouperResultImmutable() {
Set<TaskModel> taskModels = generateTaskModels(3);
@@ -237,4 +233,23 @@ public class TestGroupByContainerIds {
assertTrue(container1.getTasks().containsKey(getTaskName(6)));
assertTrue(container1.getTasks().containsKey(getTaskName(8)));
}
+
+ @Test
+ public void testFewerTasksThanContainers() {
+ final String testContainerId1 = "1";
+ final String testContainerId2 = "2";
+ final int testProcessorId = 1;
+
+ Set<TaskModel> taskModels = generateTaskModels(1);
+ List<String> containerIds = ImmutableList.of(testContainerId1, testContainerId2);
+
+ Map<TaskName, TaskModel> expectedTasks = taskModels.stream()
+ .collect(Collectors.toMap(TaskModel::getTaskName, x -> x));
+ ContainerModel expectedContainerModel = new ContainerModel(testContainerId1, testProcessorId, expectedTasks);
+
+ Set<ContainerModel> actualContainerModels = buildSimpleGrouper().group(taskModels, containerIds);
+
+ assertEquals(1, actualContainerModels.size());
+ assertEquals(ImmutableSet.of(expectedContainerModel), actualContainerModels);
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
new file mode 100644
index 0000000..9b5210f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.HashMap;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestZkJobCoordinator {
+ private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
+ private static final String TEST_JOB_MODEL_VERSION = "1";
+
+ @Test
+ public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() {
+ ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+ Mockito.when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+ ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+ Mockito.when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+ Mockito.when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+ zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION);
+
+ Mockito.doNothing().when(zkJobCoordinator).stop();
+ Mockito.verify(zkJobCoordinator, Mockito.atMost(1)).stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index ebbe07b..cf0a242 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminUtils;
import kafka.utils.TestUtils;
import org.I0Itec.zkclient.ZkClient;
@@ -96,6 +97,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
private LocalApplicationRunner applicationRunner1;
private LocalApplicationRunner applicationRunner2;
private LocalApplicationRunner applicationRunner3;
+ private String testStreamAppName;
+ private String testStreamAppId;
// Set 90 seconds as max execution time for each test.
@Rule
@@ -108,8 +111,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
public void setUp() {
super.setUp();
String uniqueTestId = UUID.randomUUID().toString();
- String testStreamAppName = String.format("test-app-name-%s", uniqueTestId);
- String testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
+ testStreamAppName = String.format("test-app-name-%s", uniqueTestId);
+ testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId);
outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
ZkClient zkClient = new ZkClient(zkConnect());
@@ -179,6 +182,77 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
}
@Test
+ public void shouldStopNewProcessorsJoiningGroupWhenNumContainersIsGreaterThanNumTasks() throws InterruptedException {
+ /**
+ * sspGrouper is set to AllSspToSingleTaskGrouperFactory for this test case(All ssp's from input kafka topic are mapped to a single task).
+ * Run a stream application(streamApp1) consuming messages from input topic(effectively one container).
+ *
+ * In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2).
+ *
+ * Assertions:
+ * A) JobModel generated before and after the addition of streamApp2 should be equal.
+ * B) Second stream application(streamApp2) should not join the group and process any message.
+ */
+
+ // Set up kafka topics.
+ publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]);
+
+ // Configuration, verification variables
+ MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
+ // Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block).
+ final JobModel[] previousJobModel = new JobModel[1];
+ final String[] previousJobModelVersion = new String[1];
+ AtomicBoolean hasSecondProcessorJoined = new AtomicBoolean(false);
+ final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
+
+ zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
+ // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start processing message in streamApp1.
+ if (currentChilds.contains(PROCESSOR_IDS[1])) {
+ secondProcessorRegistered.countDown();
+ }
+ });
+
+ // Set up stream app 2.
+ CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
+ LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
+ StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null);
+
+ // Callback handler for streamApp1.
+ StreamApplicationCallback streamApplicationCallback = message -> {
+ if (hasSecondProcessorJoined.compareAndSet(false, true)) {
+ previousJobModelVersion[0] = zkUtils.getJobModelVersion();
+ previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
+ localApplicationRunner2.run(streamApp2);
+ try {
+ // Wait for streamApp2 to register with zookeeper.
+ secondProcessorRegistered.await();
+ } catch (InterruptedException e) {
+ }
+ }
+ };
+
+ CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
+
+ // Set up stream app 1.
+ LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
+ StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, streamApplicationCallback, kafkaEventsConsumedLatch);
+ localApplicationRunner1.run(streamApp1);
+
+ kafkaEventsConsumedLatch.await();
+
+ String currentJobModelVersion = zkUtils.getJobModelVersion();
+ JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
+
+ // JobModelVersion check to verify that leader publishes new jobModel.
+ assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion));
+ // Job model before and after the addition of second stream processor should be the same.
+ assertEquals(previousJobModel[0], updatedJobModel);
+ // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
+ // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
+ assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
+ }
+
+ @Test
public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException {
// Set up kafka topics.
publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);