You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/07/25 22:32:32 UTC

samza git commit: SAMZA-1282: Spinning up more containers than number of tasks.

Repository: samza
Updated Branches:
  refs/heads/master 35143b676 -> 57758615b


SAMZA-1282: Spinning up more containers than number of tasks.

Changes

* Stop streamProcessor in onNewJobModelAvailable eventHandler(instead of onNewJobModelConfirmed
  eventHandler) when it's not part of the group and prevent it from joining the barrier.
* When numContainerIds > numTaskModels, generate JobModel by choosing lexicographically
  least `x` containerIds(where x = numTaskModels).
* Added unit and integration tests in appropriate classes to verify the expected behavior.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Boris Shkolnik <bo...@apache.org>, Navina Ramesh <na...@apache.org>

Closes #244 from shanthoosh/more_processor_than_tasks


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/57758615
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/57758615
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/57758615

Branch: refs/heads/master
Commit: 57758615b3f8713364ac1afecab4f5355f64d1d4
Parents: 35143b6
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Tue Jul 25 15:32:22 2017 -0700
Committer: navina <na...@apache.org>
Committed: Tue Jul 25 15:32:22 2017 -0700

----------------------------------------------------------------------
 .../grouper/task/GroupByContainerIds.java       | 18 +++-
 .../apache/samza/processor/StreamProcessor.java | 91 +++++++++-----------
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 52 +++++------
 .../samza/zk/ZkJobCoordinatorFactory.java       | 22 ++++-
 .../grouper/task/TestGroupByContainerIds.java   | 31 +++++--
 .../apache/samza/zk/TestZkJobCoordinator.java   | 47 ++++++++++
 .../processor/TestZkLocalApplicationRunner.java | 78 ++++++++++++++++-
 7 files changed, 248 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
index 651dca7..f5a5a86 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerIds.java
@@ -20,6 +20,7 @@
 package org.apache.samza.container.grouper.task;
 
 import java.util.Arrays;
+import java.util.stream.Collectors;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
@@ -31,6 +32,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -39,6 +42,8 @@ import java.util.Set;
  * IDs as an argument. Please note - this first implementation ignores locality information.
  */
 public class GroupByContainerIds implements TaskNameGrouper {
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByContainerIds.class);
+
   private final int startContainerCount;
   public GroupByContainerIds(int count) {
     this.startContainerCount = count;
@@ -64,8 +69,17 @@ public class GroupByContainerIds implements TaskNameGrouper {
       throw new IllegalArgumentException("cannot group an empty set. containersIds=" + Arrays
           .toString(containersIds.toArray()));
 
-    if (containersIds.size() > tasks.size())
-      throw new IllegalArgumentException("number of containers "  + containersIds.size() + " is bigger than number of tasks " + tasks.size());
+    if (containersIds.size() > tasks.size()) {
+      LOG.warn("Number of containers: {} is greater than number of tasks: {}.",  containersIds.size(), tasks.size());
+      /**
+       * Choose lexicographically least `x` containerIds(where x = tasks.size()).
+       */
+      containersIds = containersIds.stream()
+                                   .sorted()
+                                   .limit(tasks.size())
+                                   .collect(Collectors.toList());
+      LOG.info("Generating containerModel with containers: {}.", containersIds);
+    }
 
     int containerCount = containersIds.size();
 

http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 590fa11..415111f 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -244,64 +244,59 @@ public class StreamProcessor {
 
       @Override
       public void onNewJobModel(String processorId, JobModel jobModel) {
-        if (!jobModel.getContainers().containsKey(processorId)) {
-          LOGGER.warn("JobModel does not contain the processorId: " + processorId + ". Stopping the processor.");
-          stop();
-        } else {
-          jcContainerShutdownLatch = new CountDownLatch(1);
-
-          SamzaContainerListener containerListener = new SamzaContainerListener() {
-            @Override
-            public void onContainerStart() {
-              if (!processorOnStartCalled) {
-                // processorListener is called on start only the first time the container starts.
-                // It is not called after every re-balance of partitions among the processors
-                processorOnStartCalled = true;
-                if (processorListener != null) {
-                  processorListener.onStart();
-                }
-              } else {
-                LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
-              }
-            }
+        jcContainerShutdownLatch = new CountDownLatch(1);
 
-            @Override
-            public void onContainerStop(boolean pauseByJm) {
-              if (pauseByJm) {
-                LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
-                if (jcContainerShutdownLatch != null) {
-                  jcContainerShutdownLatch.countDown();
-                }
-              } else {  // sp.stop was called or container stopped by itself
-                LOGGER.info("Container " + container.toString() + " stopped.");
-                container = null; // this guarantees that stop() doesn't try to stop container again
-                stop();
+        SamzaContainerListener containerListener = new SamzaContainerListener() {
+          @Override
+          public void onContainerStart() {
+            if (!processorOnStartCalled) {
+              // processorListener is called on start only the first time the container starts.
+              // It is not called after every re-balance of partitions among the processors
+              processorOnStartCalled = true;
+              if (processorListener != null) {
+                processorListener.onStart();
               }
+            } else {
+              LOGGER.debug("StreamProcessorListener was notified of container start previously. Hence, skipping this time.");
             }
+          }
 
-            @Override
-            public void onContainerFailed(Throwable t) {
+          @Override
+          public void onContainerStop(boolean pauseByJm) {
+            if (pauseByJm) {
+              LOGGER.info("Container " + container.toString() + " stopped due to a request from JobCoordinator.");
               if (jcContainerShutdownLatch != null) {
                 jcContainerShutdownLatch.countDown();
-              } else {
-                LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
               }
-              containerException = t;
-              LOGGER.error("Container failed. Stopping the processor.", containerException);
-              container = null;
+            } else {  // sp.stop was called or container stopped by itself
+              LOGGER.info("Container " + container.toString() + " stopped.");
+              container = null; // this guarantees that stop() doesn't try to stop container again
               stop();
             }
-          };
+          }
 
-          container = createSamzaContainer(
-              jobModel.getContainers().get(processorId),
-              jobModel.maxChangeLogStreamPartitions);
-          container.setContainerListener(containerListener);
-          LOGGER.info("Starting container " + container.toString());
-          executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
-              .setNameFormat("p-" + processorId + "-container-thread-%d").build());
-          executorService.submit(container::run);
-        }
+          @Override
+          public void onContainerFailed(Throwable t) {
+            if (jcContainerShutdownLatch != null) {
+              jcContainerShutdownLatch.countDown();
+            } else {
+              LOGGER.warn("JobCoordinatorLatch was null. It is possible for some component to be waiting.");
+            }
+            containerException = t;
+            LOGGER.error("Container failed. Stopping the processor.", containerException);
+            container = null;
+            stop();
+          }
+        };
+
+        container = createSamzaContainer(
+            jobModel.getContainers().get(processorId),
+            jobModel.maxChangeLogStreamPartitions);
+        container.setContainerListener(containerListener);
+        LOGGER.info("Starting container " + container.toString());
+        executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+            .setNameFormat("p-" + processorId + "-container-thread-%d").build());
+        executorService.submit(container::run);
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index e973099..2204240 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -25,7 +25,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.ZkClient;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
@@ -77,31 +76,29 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private JobModel newJobModel;
   private int debounceTimeMs;
 
-  public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry) {
+  ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
     this.config = config;
-    ZkConfig zkConfig = new ZkConfig(config);
-    ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
-    ZkClient zkClient = ZkCoordinationServiceFactory
-        .createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
-
-    // setup a listener for a session state change
-    // we are mostly interested in "session closed" and "new session created" events
-    zkClient.subscribeStateChanges(new ZkSessionStateChangedListener());
 
     this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
-    this.zkUtils = new ZkUtils(
-        keyBuilder,
-        zkClient,
-        zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
 
     this.processorId = createProcessorId(config);
+    this.zkUtils = zkUtils;
+    // setup a listener for a session state change
+    // we are mostly interested in "session closed" and "new session created" events
+    zkUtils.getZkClient().subscribeStateChanges(new ZkSessionStateChangedListener());
     LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
     leaderElector.setLeaderElectorListener(new LeaderElectorListenerImpl());
     this.zkController = new ZkControllerImpl(processorId, zkUtils, this, leaderElector);
-    this.barrier = new ZkBarrierForVersionUpgrade(keyBuilder.getJobModelVersionBarrierPrefix(), zkUtils,
+    this.barrier =  new ZkBarrierForVersionUpgrade(
+        zkUtils.getKeyBuilder().getJobModelVersionBarrierPrefix(),
+        zkUtils,
         new ZkBarrierListenerImpl());
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
     this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
+    debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
+        LOG.error("Received exception from in JobCoordinator Processing!", throwable);
+        stop();
+      });
   }
 
   @Override
@@ -109,12 +106,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     startMetrics();
     streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
 
-    debounceTimer = new ScheduleAfterDebounceTime(throwable ->
-      {
-        LOG.error("Received exception from in JobCoordinator Processing!", throwable);
-        stop();
-      });
-
     zkController.register();
   }
 
@@ -212,18 +203,21 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
       {
         LOG.info("pid=" + processorId + "new JobModel available");
-
-        // stop current work
-        if (coordinatorListener != null) {
-          coordinatorListener.onJobModelExpired();
-        }
         // get the new job model from ZK
         newJobModel = zkUtils.getJobModel(version);
-
         LOG.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
 
-        // update ZK and wait for all the processors to get this new version
-        barrier.join(version, processorId);
+        if (!newJobModel.getContainers().containsKey(processorId)) {
+          LOG.info("JobModel: {} does not contain the processorId: {}. Stopping the processor.", newJobModel, processorId);
+          stop();
+        } else {
+          // stop current work
+          if (coordinatorListener != null) {
+            coordinatorListener.onJobModelExpired();
+          }
+          // update ZK and wait for all the processors to get this new version
+          barrier.join(version, processorId);
+        }
       });
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index c077f94..85e3b4a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -19,13 +19,21 @@
 
 package org.apache.samza.zk;
 
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metrics.MetricsRegistry;
 import org.apache.samza.metrics.MetricsRegistryMap;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ZkJobCoordinatorFactory.class);
+
   /**
    * Method to instantiate an implementation of JobCoordinator
    *
@@ -34,6 +42,16 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
    */
   @Override
   public JobCoordinator getJobCoordinator(Config config) {
-    return new ZkJobCoordinator(config, new MetricsRegistryMap());
+    MetricsRegistry metricsRegistry = new MetricsRegistryMap();
+    ZkUtils zkUtils = getZkUtils(config, metricsRegistry);
+    LOG.debug("Creating ZkJobCoordinator instance with config: {}.", config);
+    return new ZkJobCoordinator(config, metricsRegistry, zkUtils);
+  }
+
+  private ZkUtils getZkUtils(Config config, MetricsRegistry metricsRegistry) {
+    ZkConfig zkConfig = new ZkConfig(config);
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
+    ZkClient zkClient = ZkCoordinationServiceFactory.createZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+    return new ZkUtils(keyBuilder, zkClient, zkConfig.getZkConnectionTimeoutMs(), metricsRegistry);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
index cd2cc3d..13afeef 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerIds.java
@@ -19,6 +19,8 @@
 
 package org.apache.samza.container.grouper.task;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -26,16 +28,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.container.LocalityManager;
+import org.apache.samza.container.TaskName;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.TaskModel;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.samza.container.mock.ContainerMocks.generateTaskModels;
-import static org.apache.samza.container.mock.ContainerMocks.getTaskModel;
 import static org.apache.samza.container.mock.ContainerMocks.getTaskName;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -73,13 +76,6 @@ public class TestGroupByContainerIds {
     buildSimpleGrouper(1).group(new HashSet());
   }
 
-  @Test(expected = IllegalArgumentException.class)
-  public void testGroupFewerTasksThanContainers() {
-    Set<TaskModel> taskModels = new HashSet<>();
-    taskModels.add(getTaskModel(1));
-    buildSimpleGrouper(2).group(taskModels);
-  }
-
   @Test(expected = UnsupportedOperationException.class)
   public void testGrouperResultImmutable() {
     Set<TaskModel> taskModels = generateTaskModels(3);
@@ -237,4 +233,23 @@ public class TestGroupByContainerIds {
     assertTrue(container1.getTasks().containsKey(getTaskName(6)));
     assertTrue(container1.getTasks().containsKey(getTaskName(8)));
   }
+
+  @Test
+  public void testFewerTasksThanContainers() {
+    final String testContainerId1 = "1";
+    final String testContainerId2 = "2";
+    final int testProcessorId = 1;
+
+    Set<TaskModel> taskModels = generateTaskModels(1);
+    List<String> containerIds = ImmutableList.of(testContainerId1, testContainerId2);
+
+    Map<TaskName, TaskModel> expectedTasks = taskModels.stream()
+                                                       .collect(Collectors.toMap(TaskModel::getTaskName, x -> x));
+    ContainerModel expectedContainerModel = new ContainerModel(testContainerId1, testProcessorId, expectedTasks);
+
+    Set<ContainerModel> actualContainerModels = buildSimpleGrouper().group(taskModels, containerIds);
+
+    assertEquals(1, actualContainerModels.size());
+    assertEquals(ImmutableSet.of(expectedContainerModel), actualContainerModels);
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
new file mode 100644
index 0000000..9b5210f
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.HashMap;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestZkJobCoordinator {
+  private static final String TEST_BARRIER_ROOT = "/testBarrierRoot";
+  private static final String TEST_JOB_MODEL_VERSION = "1";
+
+  @Test
+  public void testFollowerShouldStopWhenNotPartOfGeneratedJobModel() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    Mockito.when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+    Mockito.when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    Mockito.when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator(new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    zkJobCoordinator.onNewJobModelAvailable(TEST_JOB_MODEL_VERSION);
+
+    Mockito.doNothing().when(zkJobCoordinator).stop();
+    Mockito.verify(zkJobCoordinator, Mockito.atMost(1)).stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/57758615/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index ebbe07b..cf0a242 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 import kafka.admin.AdminUtils;
 import kafka.utils.TestUtils;
 import org.I0Itec.zkclient.ZkClient;
@@ -96,6 +97,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   private LocalApplicationRunner applicationRunner1;
   private LocalApplicationRunner applicationRunner2;
   private LocalApplicationRunner applicationRunner3;
+  private String testStreamAppName;
+  private String testStreamAppId;
 
   // Set 90 seconds as max execution time for each test.
   @Rule
@@ -108,8 +111,8 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   public void setUp() {
     super.setUp();
     String uniqueTestId = UUID.randomUUID().toString();
-    String testStreamAppName = String.format("test-app-name-%s", uniqueTestId);
-    String testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
+    testStreamAppName = String.format("test-app-name-%s", uniqueTestId);
+    testStreamAppId = String.format("test-app-id-%s", uniqueTestId);
     inputKafkaTopic = String.format("test-input-topic-%s", uniqueTestId);
     outputKafkaTopic = String.format("test-output-topic-%s", uniqueTestId);
     ZkClient zkClient = new ZkClient(zkConnect());
@@ -179,6 +182,77 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
   }
 
   @Test
+  public void shouldStopNewProcessorsJoiningGroupWhenNumContainersIsGreaterThanNumTasks() throws InterruptedException {
+    /**
+     * sspGrouper is set to AllSspToSingleTaskGrouperFactory for this test case(All ssp's from input kafka topic are mapped to a single task).
+     * Run a stream application(streamApp1) consuming messages from input topic(effectively one container).
+     *
+     * In the callback triggered by streamApp1 after processing a message, bring up an another stream application(streamApp2).
+     *
+     * Assertions:
+     *           A) JobModel generated before and after the addition of streamApp2 should be equal.
+     *           B) Second stream application(streamApp2) should not join the group and process any message.
+     */
+
+    // Set up kafka topics.
+    publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS * 2, PROCESSOR_IDS[0]);
+
+    // Configuration, verification variables
+    MapConfig testConfig = new MapConfig(ImmutableMap.of(JobConfig.SSP_GROUPER_FACTORY(), "org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory", JobConfig.JOB_DEBOUNCE_TIME_MS(), "10"));
+    // Declared as final array to update it from streamApplication callback(Variable should be declared final to access in lambda block).
+    final JobModel[] previousJobModel = new JobModel[1];
+    final String[] previousJobModelVersion = new String[1];
+    AtomicBoolean hasSecondProcessorJoined = new AtomicBoolean(false);
+    final CountDownLatch secondProcessorRegistered = new CountDownLatch(1);
+
+    zkUtils.subscribeToProcessorChange((parentPath, currentChilds) -> {
+        // When streamApp2 with id: PROCESSOR_IDS[1] is registered, start processing message in streamApp1.
+        if (currentChilds.contains(PROCESSOR_IDS[1])) {
+          secondProcessorRegistered.countDown();
+        }
+      });
+
+    // Set up stream app 2.
+    CountDownLatch processedMessagesLatch = new CountDownLatch(NUM_KAFKA_EVENTS);
+    LocalApplicationRunner localApplicationRunner2 = new LocalApplicationRunner(new MapConfig(applicationConfig2, testConfig));
+    StreamApplication streamApp2 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, processedMessagesLatch, null, null);
+
+    // Callback handler for streamApp1.
+    StreamApplicationCallback streamApplicationCallback = message -> {
+      if (hasSecondProcessorJoined.compareAndSet(false, true)) {
+        previousJobModelVersion[0] = zkUtils.getJobModelVersion();
+        previousJobModel[0] = zkUtils.getJobModel(previousJobModelVersion[0]);
+        localApplicationRunner2.run(streamApp2);
+        try {
+          // Wait for streamApp2 to register with zookeeper.
+          secondProcessorRegistered.await();
+        } catch (InterruptedException e) {
+        }
+      }
+    };
+
+    CountDownLatch kafkaEventsConsumedLatch = new CountDownLatch(NUM_KAFKA_EVENTS * 2);
+
+    // Set up stream app 1.
+    LocalApplicationRunner localApplicationRunner1 = new LocalApplicationRunner(new MapConfig(applicationConfig1, testConfig));
+    StreamApplication streamApp1 = new TestStreamApplication(inputKafkaTopic, outputKafkaTopic, null, streamApplicationCallback, kafkaEventsConsumedLatch);
+    localApplicationRunner1.run(streamApp1);
+
+    kafkaEventsConsumedLatch.await();
+
+    String currentJobModelVersion = zkUtils.getJobModelVersion();
+    JobModel updatedJobModel = zkUtils.getJobModel(currentJobModelVersion);
+
+    // JobModelVersion check to verify that leader publishes new jobModel.
+    assertTrue(Integer.parseInt(previousJobModelVersion[0]) < Integer.parseInt(currentJobModelVersion));
+    // Job model before and after the addition of second stream processor should be the same.
+    assertEquals(previousJobModel[0], updatedJobModel);
+    // TODO: After SAMZA-1364 add assertion for localApplicationRunner2.status(streamApp)
+    // ProcessedMessagesLatch shouldn't have changed. Should retain it's initial value.
+    assertEquals(NUM_KAFKA_EVENTS, processedMessagesLatch.getCount());
+  }
+
+  @Test
   public void shouldReElectLeaderWhenLeaderDies() throws InterruptedException {
     // Set up kafka topics.
     publishKafkaEvents(inputKafkaTopic, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);