You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/01 01:07:25 UTC

samza git commit: SAMZA-2018: State restore improvements

Repository: samza
Updated Branches:
  refs/heads/master 4dbc2c6de -> fb293bb8c


SAMZA-2018: State restore improvements

This PR makes the following changes:

* Consumer consolidation to ensure 1 storeConsumer per system, earlier it was 1 consumer per SSP per store.
* Refactoring stores to use ContainerStorageManager with parallelization for restoration, and serial execution of sysConsumers start, stop, register, etc.

Author: Ray Matharu <rm...@linkedin.com>

Reviewers: Jagadish<ja...@apache.org>

Closes #823 from rmatharu/consumerConsolidate


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fb293bb8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fb293bb8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fb293bb8

Branch: refs/heads/master
Commit: fb293bb8cb2c9f379a29a0702acd574d62f5a2f1
Parents: 4dbc2c6
Author: Ray Matharu <rm...@linkedin.com>
Authored: Fri Nov 30 17:07:23 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Nov 30 17:07:23 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala |  55 ++++---
 .../apache/samza/container/TaskInstance.scala   |  20 +--
 .../samza/storage/ContainerStorageManager.java  | 159 +++++++++++++++++++
 .../samza/storage/TaskStorageManager.scala      |  16 +-
 .../samza/container/TestSamzaContainer.scala    |   9 +-
 .../processor/StreamProcessorTestUtils.scala    |   3 +-
 .../storage/TestContainerStorageManager.java    | 125 +++++++++++++++
 7 files changed, 330 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 7b4410e..ed50719 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -502,6 +502,22 @@ object SamzaContainer extends Logging {
 
     val timerExecutor = Executors.newSingleThreadScheduledExecutor
 
+    // We create a map of store SystemName to its respective SystemConsumer
+    val storeSystemConsumers: Map[String, SystemConsumer] = changeLogSystemStreams.mapValues {
+      case (changeLogSystemStream) => (changeLogSystemStream.getSystem)
+    }.values.toSet.map {
+      systemName: String =>
+        (systemName, systemFactories
+          .getOrElse(systemName,
+            throw new SamzaException("Changelog system %s exist in the config." format (systemName)))
+          .getConsumer(systemName, config, samzaContainerMetrics.registry))
+    }.toMap
+
+    info("Created store system consumers: %s" format storeSystemConsumers)
+
+    var taskStorageManagers : Map[TaskInstance, TaskStorageManager] = Map()
+
+    // Create taskInstances
     val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => {
       debug("Setting up task instance: %s" format taskModel)
 
@@ -516,14 +532,11 @@ object SamzaContainer extends Logging {
 
       val collector = new TaskInstanceCollector(producerMultiplexer, taskInstanceMetrics)
 
-      val storeConsumers = changeLogSystemStreams
+      // Re-use the storeConsumers, stored in storeSystemConsumers
+      val storeConsumers : Map[String, SystemConsumer] = changeLogSystemStreams
         .map {
           case (storeName, changeLogSystemStream) =>
-            val systemConsumer = systemFactories
-              .getOrElse(changeLogSystemStream.getSystem,
-                throw new SamzaException("Changelog system %s for store %s does not " +
-                  "exist in the config." format (changeLogSystemStream, storeName)))
-              .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry)
+            val systemConsumer = storeSystemConsumers.get(changeLogSystemStream.getSystem).get
             samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName)
             (storeName, systemConsumer)
         }
@@ -666,9 +679,14 @@ object SamzaContainer extends Logging {
 
       val taskInstance = createTaskInstance(task)
 
+      taskStorageManagers += taskInstance -> storageManager
       (taskName, taskInstance)
     }).toMap
 
+
+    val containerStorageManager = new ContainerStorageManager(taskStorageManagers.asJava, storeSystemConsumers.asJava,
+      samzaContainerMetrics)
+
     val maxThrottlingDelayMs = config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1))
 
     val runLoop = RunLoopFactory.createRunLoop(
@@ -734,7 +752,8 @@ object SamzaContainer extends Logging {
       taskThreadPool = taskThreadPool,
       timerExecutor = timerExecutor,
       containerContext = containerContext,
-      applicationContainerContextOption = applicationContainerContextOption)
+      applicationContainerContextOption = applicationContainerContextOption,
+      containerStorageManager = containerStorageManager)
   }
 
   /**
@@ -769,7 +788,8 @@ class SamzaContainer(
   taskThreadPool: ExecutorService = null,
   timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor,
   containerContext: ContainerContext,
-  applicationContainerContextOption: Option[ApplicationContainerContext]) extends Runnable with Logging {
+  applicationContainerContextOption: Option[ApplicationContainerContext],
+  containerStorageManager: ContainerStorageManager) extends Runnable with Logging {
 
   val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
   var shutdownHookThread: Thread = null
@@ -1003,16 +1023,13 @@ class SamzaContainer(
   }
 
   def startStores {
+    info("Starting container storage manager.")
+    containerStorageManager.start()
+
     taskInstances.values.foreach(taskInstance => {
       val startTime = System.currentTimeMillis()
-      info("Starting stores in task instance %s" format taskInstance.taskName)
-      taskInstance.startStores
-      // Measuring the time to restore the stores
-      val timeToRestore = System.currentTimeMillis() - startTime
-      val taskGauge = metrics.taskStoreRestorationMetrics.asScala.getOrElse(taskInstance.taskName, null)
-      if (taskGauge != null) {
-        taskGauge.set(timeToRestore)
-      }
+      info("Starting side inputs in task instance %s" format taskInstance.taskName)
+      taskInstance.startSideInputs
     })
   }
 
@@ -1152,9 +1169,11 @@ class SamzaContainer(
   }
 
   def shutdownStores {
-    info("Shutting down task instance stores.")
+    info("Shutting down container storage manager.")
+    containerStorageManager.shutdown()
 
-    taskInstances.values.foreach(_.shutdownStores)
+    info("Shutting down task instance side inputs.")
+    taskInstances.values.foreach(_.shutdownSideInputs)
   }
 
   def shutdownTableManager: Unit = {

http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index f8e9c63..53e5af7 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -120,15 +120,7 @@ class TaskInstance(
     offsetManager.register(taskName, sspsToRegister)
   }
 
-  def startStores {
-    if (storageManager != null) {
-      debug("Starting storage manager for taskName: %s" format taskName)
-
-      storageManager.init
-    } else {
-      debug("Skipping storage manager initialization for taskName: %s" format taskName)
-    }
-
+  def startSideInputs {
     if (sideInputStorageManager != null) {
       debug("Starting side input storage manager for taskName: %s" format taskName)
       sideInputStorageManager.init()
@@ -298,15 +290,7 @@ class TaskInstance(
     }
   }
 
-  def shutdownStores {
-    if (storageManager != null) {
-      debug("Shutting down storage manager for taskName: %s" format taskName)
-
-      storageManager.stop
-    } else {
-      debug("Skipping storage manager shutdown for taskName: %s" format taskName)
-    }
-
+  def shutdownSideInputs {
     if (sideInputStorageManager != null) {
       debug("Shutting down side input storage manager for taskName: %s" format taskName)
       sideInputStorageManager.stop()

http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
new file mode 100644
index 0000000..5fc5573
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstance;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.system.SystemConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ *  ContainerStorageManager is a per-container object that manages
+ *  the restore of per-task partitions.
+ *
+ *  It is responsible for
+ *  a) performing all container-level actions for restore such as, initializing and shutting down
+ *  taskStorage managers, starting, registering and stopping consumers, etc.
+ *
+ *  b) performing individual taskStorageManager restores in parallel.
+ *
+ */
+public class ContainerStorageManager {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
+  private final Map<TaskInstance, TaskStorageManager> taskStorageManagers;
+  private final SamzaContainerMetrics samzaContainerMetrics;
+
+  // Mapping of from storeSystemNames to SystemConsumers
+  private final Map<String, SystemConsumer> systemConsumers;
+
+  // Size of thread-pool to be used for parallel restores
+  private final int parallelRestoreThreadPoolSize;
+
+  // Naming convention to be used for restore threads
+  private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
+
+  public ContainerStorageManager(Map<TaskInstance, TaskStorageManager> taskStorageManagers,
+      Map<String, SystemConsumer> systemConsumers, SamzaContainerMetrics samzaContainerMetrics) {
+    this.taskStorageManagers = taskStorageManagers;
+    this.systemConsumers = systemConsumers;
+    this.samzaContainerMetrics = samzaContainerMetrics;
+
+    // Setting thread pool size equal to the number of tasks
+    this.parallelRestoreThreadPoolSize = taskStorageManagers.size();
+  }
+
+  public void start() throws SamzaException {
+    LOG.info("Restore started");
+
+    // initialize each TaskStorageManager
+    this.taskStorageManagers.values().forEach(taskStorageManager -> taskStorageManager.init());
+
+    // Start consumers
+    this.systemConsumers.values().forEach(systemConsumer -> systemConsumer.start());
+
+    // Create a thread pool for parallel restores
+    ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
+        new ThreadFactoryBuilder().setNameFormat(RESTORE_THREAD_NAME).build());
+
+    List<Future> taskRestoreFutures = new ArrayList<>(this.taskStorageManagers.entrySet().size());
+
+    // Submit restore callable for each taskInstance
+    this.taskStorageManagers.forEach((taskInstance, taskStorageManager) -> {
+        taskRestoreFutures.add(
+            executorService.submit(new TaskRestoreCallable(this.samzaContainerMetrics, taskInstance, taskStorageManager)));
+      });
+
+    // loop-over the future list to wait for each thread to finish, catch any exceptions during restore and throw
+    // as samza exceptions
+    for (Future future : taskRestoreFutures) {
+      try {
+        future.get();
+      } catch (Exception e) {
+        LOG.error("Exception when restoring ", e);
+        throw new SamzaException("Exception when restoring ", e);
+      }
+    }
+
+    executorService.shutdown();
+
+    // Stop consumers
+    this.systemConsumers.values().forEach(systemConsumer -> systemConsumer.stop());
+
+    LOG.info("Restore complete");
+  }
+
+  public void shutdown() {
+    this.taskStorageManagers.forEach((taskInstance, taskStorageManager) -> {
+        if (taskStorageManager != null) {
+          LOG.debug("Shutting down task storage manager for taskName: {} ", taskInstance);
+          taskStorageManager.stop();
+        } else {
+          LOG.debug("Skipping task storage manager shutdown for taskName: {}", taskInstance);
+        }
+      });
+
+    LOG.info("Shutdown complete");
+  }
+
+  /** Callable for performing the restoreStores on a taskStorage manager and emitting task-restoration metric.
+   *
+   */
+  private class TaskRestoreCallable implements Callable<Void> {
+
+    private TaskInstance taskInstance;
+    private TaskStorageManager taskStorageManager;
+    private SamzaContainerMetrics samzaContainerMetrics;
+
+    public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskInstance taskInstance,
+        TaskStorageManager taskStorageManager) {
+      this.samzaContainerMetrics = samzaContainerMetrics;
+      this.taskInstance = taskInstance;
+      this.taskStorageManager = taskStorageManager;
+    }
+
+    @Override
+    public Void call() {
+      long startTime = System.currentTimeMillis();
+      LOG.info("Starting stores in task instance {}", this.taskInstance.taskName().getTaskName());
+      taskStorageManager.restoreStores();
+      long timeToRestore = System.currentTimeMillis() - startTime;
+      Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics()
+          .getOrDefault(this.taskInstance.taskName().getTaskName(), null);
+
+      if (taskGauge != null) {
+        taskGauge.set(timeToRestore);
+      }
+
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index deb69e1..4bcf2d3 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -74,9 +74,7 @@ class TaskStorageManager(
     cleanBaseDirs()
     setupBaseDirs()
     validateChangelogStreams()
-    startConsumers()
-    restoreStores()
-    stopConsumers()
+    registerSSPs()
   }
 
   private def cleanBaseDirs() {
@@ -159,7 +157,7 @@ class TaskStorageManager(
     info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
   }
 
-  private def startConsumers() {
+  private def registerSSPs() {
     debug("Starting consumers for stores.")
 
     for ((storeName, systemStream) <- changeLogSystemStreams) {
@@ -176,8 +174,6 @@ class TaskStorageManager(
         taskStoresToRestore -= storeName
       }
     }
-
-    storeConsumers.values.foreach(_.start)
   }
 
   /**
@@ -202,7 +198,7 @@ class TaskStorageManager(
     StorageManagerUtil.getStartingOffset(systemStreamPartition, admin, fileOffset, oldestOffset)
   }
 
-  private def restoreStores() {
+  def restoreStores() {
     debug("Restoring stores for task: %s." format taskName.getTaskName)
 
     for ((storeName, store) <- taskStoresToRestore) {
@@ -216,12 +212,6 @@ class TaskStorageManager(
     }
   }
 
-  private def stopConsumers() {
-    debug("Stopping consumers for stores.")
-
-    storeConsumers.values.foreach(_.stop)
-  }
-
   def flush() {
     debug("Flushing stores.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a35366d..eca4673 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -157,12 +157,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     when(this.taskInstance.taskName).thenReturn(TASK_NAME)
     val restoreGauge = mock[Gauge[Long]]
     when(this.metrics.taskStoreRestorationMetrics).thenReturn(Map(TASK_NAME -> restoreGauge))
-    when(this.taskInstance.startStores).thenAnswer(new Answer[Void] {
-      override def answer(invocation: InvocationOnMock): Void = {
-        Thread.sleep(1)
-        null
-      }
-    })
+
     this.samzaContainer.startStores
     val restoreGaugeValueCaptor = ArgumentCaptor.forClass(classOf[Long])
     verify(restoreGauge).set(restoreGaugeValueCaptor.capture())
@@ -283,7 +278,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       this.producerMultiplexer,
       metrics,
       containerContext = this.containerContext,
-      applicationContainerContextOption = applicationContainerContext)
+      applicationContainerContextOption = applicationContainerContext, containerStorageManager = null)
     this.samzaContainer.setContainerListener(this.samzaContainerListener)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 59f8662..d7c71fa 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -67,7 +67,8 @@ object StreamProcessorTestUtils {
       producerMultiplexer = producerMultiplexer,
       metrics = new SamzaContainerMetrics,
       containerContext = containerContext,
-      applicationContainerContextOption = None)
+      applicationContainerContextOption = None,
+      containerStorageManager = null)
     container
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
new file mode 100644
index 0000000..dba8678
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstance;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemConsumer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestContainerStorageManager {
+
+  private ContainerStorageManager containerStorageManager;
+  private Map<String, SystemConsumer> systemConsumers;
+  private Map<TaskInstance, TaskStorageManager> taskStorageManagers;
+  private SamzaContainerMetrics samzaContainerMetrics;
+
+  private CountDownLatch taskStorageManagersRestoreStoreCount;
+  private CountDownLatch taskStorageManagersInitCount;
+  private CountDownLatch taskStorageManagersRestoreStopCount;
+
+  private CountDownLatch systemConsumerStartCount;
+  private CountDownLatch systemConsumerStopCount;
+
+  /**
+   * Utility method for creating a mocked taskInstance and taskStorageManager and adding it to the map.
+   * @param taskname the desired taskname.
+   */
+  private void addMockedTask(String taskname) {
+    TaskInstance mockTaskInstance = Mockito.mock(TaskInstance.class);
+    Mockito.doAnswer(invocation -> {
+        return new TaskName(taskname);
+      }).when(mockTaskInstance).taskName();
+
+    TaskStorageManager mockTaskStorageManager = Mockito.mock(TaskStorageManager.class);
+    Mockito.doAnswer(invocation -> {
+        taskStorageManagersInitCount.countDown();
+        return null;
+      }).when(mockTaskStorageManager).init();
+
+    Mockito.doAnswer(invocation -> {
+        taskStorageManagersRestoreStopCount.countDown();
+        return null;
+      }).when(mockTaskStorageManager).stop();
+
+    Mockito.doAnswer(invocation -> {
+        taskStorageManagersRestoreStoreCount.countDown();
+        return null;
+      }).when(mockTaskStorageManager).restoreStores();
+
+    taskStorageManagers.put(mockTaskInstance, mockTaskStorageManager);
+  }
+
+  @Before
+  public void setUp() {
+    systemConsumers = new HashMap<>();
+    taskStorageManagers = new HashMap<>();
+
+    // add two mocked tasks
+    addMockedTask("task 1");
+    addMockedTask("task 2");
+
+    // define the expected number of invocations on taskStorageManagers' init, stop and restore count
+    // and the expected number of sysConsumer start and stop
+    this.taskStorageManagersInitCount = new CountDownLatch(2);
+    this.taskStorageManagersRestoreStoreCount = new CountDownLatch(2);
+    this.taskStorageManagersRestoreStopCount = new CountDownLatch(2);
+    this.systemConsumerStartCount = new CountDownLatch(1);
+    this.systemConsumerStopCount = new CountDownLatch(1);
+
+    // mock container metrics
+    samzaContainerMetrics = Mockito.mock(SamzaContainerMetrics.class);
+
+    // mock and setup sysconsumers
+    SystemConsumer mockSystemConsumer = Mockito.mock(SystemConsumer.class);
+    Mockito.doAnswer(invocation -> {
+        systemConsumerStartCount.countDown();
+        return null;
+      }).when(mockSystemConsumer).start();
+    Mockito.doAnswer(invocation -> {
+        systemConsumerStopCount.countDown();
+        return null;
+      }).when(mockSystemConsumer).stop();
+
+    systemConsumers.put("kafka", mockSystemConsumer);
+
+    this.containerStorageManager =
+        new ContainerStorageManager(taskStorageManagers, systemConsumers, samzaContainerMetrics);
+  }
+
+  @Test
+  public void testParallelism() {
+    this.containerStorageManager.start();
+    this.containerStorageManager.shutdown();
+    Assert.assertTrue("init count should be 0", this.taskStorageManagersInitCount.getCount() == 0);
+    Assert.assertTrue("Restore count should be 0", this.taskStorageManagersRestoreStoreCount.getCount() == 0);
+    Assert.assertTrue("stop count should be 0", this.taskStorageManagersRestoreStopCount.getCount() == 0);
+
+    Assert.assertTrue("systemConsumerStopCount count should be 0", this.systemConsumerStopCount.getCount() == 0);
+    Assert.assertTrue("systemConsumerStartCount count should be 0", this.systemConsumerStartCount.getCount() == 0);
+  }
+}