You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2018/12/01 01:07:25 UTC
samza git commit: SAMZA-2018: State restore improvements
Repository: samza
Updated Branches:
refs/heads/master 4dbc2c6de -> fb293bb8c
SAMZA-2018: State restore improvements
This PR makes the following changes:
* Consumer consolidation to ensure 1 storeConsumer per system, earlier it was 1 consumer per SSP per store.
* Refactoring stores to use ContainerStorageManager with parallelization for restoration, and serial execution of sysConsumers start, stop, register, etc.
Author: Ray Matharu <rm...@linkedin.com>
Reviewers: Jagadish<ja...@apache.org>
Closes #823 from rmatharu/consumerConsolidate
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fb293bb8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fb293bb8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fb293bb8
Branch: refs/heads/master
Commit: fb293bb8cb2c9f379a29a0702acd574d62f5a2f1
Parents: 4dbc2c6
Author: Ray Matharu <rm...@linkedin.com>
Authored: Fri Nov 30 17:07:23 2018 -0800
Committer: Jagadish <jv...@linkedin.com>
Committed: Fri Nov 30 17:07:23 2018 -0800
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 55 ++++---
.../apache/samza/container/TaskInstance.scala | 20 +--
.../samza/storage/ContainerStorageManager.java | 159 +++++++++++++++++++
.../samza/storage/TaskStorageManager.scala | 16 +-
.../samza/container/TestSamzaContainer.scala | 9 +-
.../processor/StreamProcessorTestUtils.scala | 3 +-
.../storage/TestContainerStorageManager.java | 125 +++++++++++++++
7 files changed, 330 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 7b4410e..ed50719 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -502,6 +502,22 @@ object SamzaContainer extends Logging {
val timerExecutor = Executors.newSingleThreadScheduledExecutor
+ // We create a map of store SystemName to its respective SystemConsumer
+ val storeSystemConsumers: Map[String, SystemConsumer] = changeLogSystemStreams.mapValues {
+ case (changeLogSystemStream) => (changeLogSystemStream.getSystem)
+ }.values.toSet.map {
+ systemName: String =>
+ (systemName, systemFactories
+ .getOrElse(systemName,
+ throw new SamzaException("Changelog system %s exist in the config." format (systemName)))
+ .getConsumer(systemName, config, samzaContainerMetrics.registry))
+ }.toMap
+
+ info("Created store system consumers: %s" format storeSystemConsumers)
+
+ var taskStorageManagers : Map[TaskInstance, TaskStorageManager] = Map()
+
+ // Create taskInstances
val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.asScala.map(taskModel => {
debug("Setting up task instance: %s" format taskModel)
@@ -516,14 +532,11 @@ object SamzaContainer extends Logging {
val collector = new TaskInstanceCollector(producerMultiplexer, taskInstanceMetrics)
- val storeConsumers = changeLogSystemStreams
+ // Re-use the storeConsumers, stored in storeSystemConsumers
+ val storeConsumers : Map[String, SystemConsumer] = changeLogSystemStreams
.map {
case (storeName, changeLogSystemStream) =>
- val systemConsumer = systemFactories
- .getOrElse(changeLogSystemStream.getSystem,
- throw new SamzaException("Changelog system %s for store %s does not " +
- "exist in the config." format (changeLogSystemStream, storeName)))
- .getConsumer(changeLogSystemStream.getSystem, config, taskInstanceMetrics.registry)
+ val systemConsumer = storeSystemConsumers.get(changeLogSystemStream.getSystem).get
samzaContainerMetrics.addStoreRestorationGauge(taskName, storeName)
(storeName, systemConsumer)
}
@@ -666,9 +679,14 @@ object SamzaContainer extends Logging {
val taskInstance = createTaskInstance(task)
+ taskStorageManagers += taskInstance -> storageManager
(taskName, taskInstance)
}).toMap
+
+ val containerStorageManager = new ContainerStorageManager(taskStorageManagers.asJava, storeSystemConsumers.asJava,
+ samzaContainerMetrics)
+
val maxThrottlingDelayMs = config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1))
val runLoop = RunLoopFactory.createRunLoop(
@@ -734,7 +752,8 @@ object SamzaContainer extends Logging {
taskThreadPool = taskThreadPool,
timerExecutor = timerExecutor,
containerContext = containerContext,
- applicationContainerContextOption = applicationContainerContextOption)
+ applicationContainerContextOption = applicationContainerContextOption,
+ containerStorageManager = containerStorageManager)
}
/**
@@ -769,7 +788,8 @@ class SamzaContainer(
taskThreadPool: ExecutorService = null,
timerExecutor: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor,
containerContext: ContainerContext,
- applicationContainerContextOption: Option[ApplicationContainerContext]) extends Runnable with Logging {
+ applicationContainerContextOption: Option[ApplicationContainerContext],
+ containerStorageManager: ContainerStorageManager) extends Runnable with Logging {
val shutdownMs = config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
var shutdownHookThread: Thread = null
@@ -1003,16 +1023,13 @@ class SamzaContainer(
}
def startStores {
+ info("Starting container storage manager.")
+ containerStorageManager.start()
+
taskInstances.values.foreach(taskInstance => {
val startTime = System.currentTimeMillis()
- info("Starting stores in task instance %s" format taskInstance.taskName)
- taskInstance.startStores
- // Measuring the time to restore the stores
- val timeToRestore = System.currentTimeMillis() - startTime
- val taskGauge = metrics.taskStoreRestorationMetrics.asScala.getOrElse(taskInstance.taskName, null)
- if (taskGauge != null) {
- taskGauge.set(timeToRestore)
- }
+ info("Starting side inputs in task instance %s" format taskInstance.taskName)
+ taskInstance.startSideInputs
})
}
@@ -1152,9 +1169,11 @@ class SamzaContainer(
}
def shutdownStores {
- info("Shutting down task instance stores.")
+ info("Shutting down container storage manager.")
+ containerStorageManager.shutdown()
- taskInstances.values.foreach(_.shutdownStores)
+ info("Shutting down task instance side inputs.")
+ taskInstances.values.foreach(_.shutdownSideInputs)
}
def shutdownTableManager: Unit = {
http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index f8e9c63..53e5af7 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -120,15 +120,7 @@ class TaskInstance(
offsetManager.register(taskName, sspsToRegister)
}
- def startStores {
- if (storageManager != null) {
- debug("Starting storage manager for taskName: %s" format taskName)
-
- storageManager.init
- } else {
- debug("Skipping storage manager initialization for taskName: %s" format taskName)
- }
-
+ def startSideInputs {
if (sideInputStorageManager != null) {
debug("Starting side input storage manager for taskName: %s" format taskName)
sideInputStorageManager.init()
@@ -298,15 +290,7 @@ class TaskInstance(
}
}
- def shutdownStores {
- if (storageManager != null) {
- debug("Shutting down storage manager for taskName: %s" format taskName)
-
- storageManager.stop
- } else {
- debug("Skipping storage manager shutdown for taskName: %s" format taskName)
- }
-
+ def shutdownSideInputs {
if (sideInputStorageManager != null) {
debug("Shutting down side input storage manager for taskName: %s" format taskName)
sideInputStorageManager.stop()
http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
new file mode 100644
index 0000000..5fc5573
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.samza.SamzaException;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstance;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.system.SystemConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ContainerStorageManager is a per-container object that manages
+ * the restore of per-task partitions.
+ *
+ * It is responsible for
+ * a) performing all container-level actions for restore such as, initializing and shutting down
+ * taskStorage managers, starting, registering and stopping consumers, etc.
+ *
+ * b) performing individual taskStorageManager restores in parallel.
+ *
+ */
+public class ContainerStorageManager {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ContainerStorageManager.class);
+ private final Map<TaskInstance, TaskStorageManager> taskStorageManagers;
+ private final SamzaContainerMetrics samzaContainerMetrics;
+
+ // Mapping of from storeSystemNames to SystemConsumers
+ private final Map<String, SystemConsumer> systemConsumers;
+
+ // Size of thread-pool to be used for parallel restores
+ private final int parallelRestoreThreadPoolSize;
+
+ // Naming convention to be used for restore threads
+ private static final String RESTORE_THREAD_NAME = "Samza Restore Thread-%d";
+
+ public ContainerStorageManager(Map<TaskInstance, TaskStorageManager> taskStorageManagers,
+ Map<String, SystemConsumer> systemConsumers, SamzaContainerMetrics samzaContainerMetrics) {
+ this.taskStorageManagers = taskStorageManagers;
+ this.systemConsumers = systemConsumers;
+ this.samzaContainerMetrics = samzaContainerMetrics;
+
+ // Setting thread pool size equal to the number of tasks
+ this.parallelRestoreThreadPoolSize = taskStorageManagers.size();
+ }
+
+ public void start() throws SamzaException {
+ LOG.info("Restore started");
+
+ // initialize each TaskStorageManager
+ this.taskStorageManagers.values().forEach(taskStorageManager -> taskStorageManager.init());
+
+ // Start consumers
+ this.systemConsumers.values().forEach(systemConsumer -> systemConsumer.start());
+
+ // Create a thread pool for parallel restores
+ ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
+ new ThreadFactoryBuilder().setNameFormat(RESTORE_THREAD_NAME).build());
+
+ List<Future> taskRestoreFutures = new ArrayList<>(this.taskStorageManagers.entrySet().size());
+
+ // Submit restore callable for each taskInstance
+ this.taskStorageManagers.forEach((taskInstance, taskStorageManager) -> {
+ taskRestoreFutures.add(
+ executorService.submit(new TaskRestoreCallable(this.samzaContainerMetrics, taskInstance, taskStorageManager)));
+ });
+
+ // loop-over the future list to wait for each thread to finish, catch any exceptions during restore and throw
+ // as samza exceptions
+ for (Future future : taskRestoreFutures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ LOG.error("Exception when restoring ", e);
+ throw new SamzaException("Exception when restoring ", e);
+ }
+ }
+
+ executorService.shutdown();
+
+ // Stop consumers
+ this.systemConsumers.values().forEach(systemConsumer -> systemConsumer.stop());
+
+ LOG.info("Restore complete");
+ }
+
+ public void shutdown() {
+ this.taskStorageManagers.forEach((taskInstance, taskStorageManager) -> {
+ if (taskStorageManager != null) {
+ LOG.debug("Shutting down task storage manager for taskName: {} ", taskInstance);
+ taskStorageManager.stop();
+ } else {
+ LOG.debug("Skipping task storage manager shutdown for taskName: {}", taskInstance);
+ }
+ });
+
+ LOG.info("Shutdown complete");
+ }
+
+ /** Callable for performing the restoreStores on a taskStorage manager and emitting task-restoration metric.
+ *
+ */
+ private class TaskRestoreCallable implements Callable<Void> {
+
+ private TaskInstance taskInstance;
+ private TaskStorageManager taskStorageManager;
+ private SamzaContainerMetrics samzaContainerMetrics;
+
+ public TaskRestoreCallable(SamzaContainerMetrics samzaContainerMetrics, TaskInstance taskInstance,
+ TaskStorageManager taskStorageManager) {
+ this.samzaContainerMetrics = samzaContainerMetrics;
+ this.taskInstance = taskInstance;
+ this.taskStorageManager = taskStorageManager;
+ }
+
+ @Override
+ public Void call() {
+ long startTime = System.currentTimeMillis();
+ LOG.info("Starting stores in task instance {}", this.taskInstance.taskName().getTaskName());
+ taskStorageManager.restoreStores();
+ long timeToRestore = System.currentTimeMillis() - startTime;
+ Gauge taskGauge = this.samzaContainerMetrics.taskStoreRestorationMetrics()
+ .getOrDefault(this.taskInstance.taskName().getTaskName(), null);
+
+ if (taskGauge != null) {
+ taskGauge.set(timeToRestore);
+ }
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
index deb69e1..4bcf2d3 100644
--- a/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala
@@ -74,9 +74,7 @@ class TaskStorageManager(
cleanBaseDirs()
setupBaseDirs()
validateChangelogStreams()
- startConsumers()
- restoreStores()
- stopConsumers()
+ registerSSPs()
}
private def cleanBaseDirs() {
@@ -159,7 +157,7 @@ class TaskStorageManager(
info("Assigning oldest change log offsets for taskName %s: %s" format (taskName, changeLogOldestOffsets))
}
- private def startConsumers() {
+ private def registerSSPs() {
debug("Starting consumers for stores.")
for ((storeName, systemStream) <- changeLogSystemStreams) {
@@ -176,8 +174,6 @@ class TaskStorageManager(
taskStoresToRestore -= storeName
}
}
-
- storeConsumers.values.foreach(_.start)
}
/**
@@ -202,7 +198,7 @@ class TaskStorageManager(
StorageManagerUtil.getStartingOffset(systemStreamPartition, admin, fileOffset, oldestOffset)
}
- private def restoreStores() {
+ def restoreStores() {
debug("Restoring stores for task: %s." format taskName.getTaskName)
for ((storeName, store) <- taskStoresToRestore) {
@@ -216,12 +212,6 @@ class TaskStorageManager(
}
}
- private def stopConsumers() {
- debug("Stopping consumers for stores.")
-
- storeConsumers.values.foreach(_.stop)
- }
-
def flush() {
debug("Flushing stores.")
http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index a35366d..eca4673 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -157,12 +157,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
when(this.taskInstance.taskName).thenReturn(TASK_NAME)
val restoreGauge = mock[Gauge[Long]]
when(this.metrics.taskStoreRestorationMetrics).thenReturn(Map(TASK_NAME -> restoreGauge))
- when(this.taskInstance.startStores).thenAnswer(new Answer[Void] {
- override def answer(invocation: InvocationOnMock): Void = {
- Thread.sleep(1)
- null
- }
- })
+
this.samzaContainer.startStores
val restoreGaugeValueCaptor = ArgumentCaptor.forClass(classOf[Long])
verify(restoreGauge).set(restoreGaugeValueCaptor.capture())
@@ -283,7 +278,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
this.producerMultiplexer,
metrics,
containerContext = this.containerContext,
- applicationContainerContextOption = applicationContainerContext)
+ applicationContainerContextOption = applicationContainerContext, containerStorageManager = null)
this.samzaContainer.setContainerListener(this.samzaContainerListener)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index 59f8662..d7c71fa 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -67,7 +67,8 @@ object StreamProcessorTestUtils {
producerMultiplexer = producerMultiplexer,
metrics = new SamzaContainerMetrics,
containerContext = containerContext,
- applicationContainerContextOption = None)
+ applicationContainerContextOption = None,
+ containerStorageManager = null)
container
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/fb293bb8/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
new file mode 100644
index 0000000..dba8678
--- /dev/null
+++ b/samza-core/src/test/scala/org/apache/samza/storage/TestContainerStorageManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.storage;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import org.apache.samza.container.SamzaContainerMetrics;
+import org.apache.samza.container.TaskInstance;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.system.SystemConsumer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+public class TestContainerStorageManager {
+
+ private ContainerStorageManager containerStorageManager;
+ private Map<String, SystemConsumer> systemConsumers;
+ private Map<TaskInstance, TaskStorageManager> taskStorageManagers;
+ private SamzaContainerMetrics samzaContainerMetrics;
+
+ private CountDownLatch taskStorageManagersRestoreStoreCount;
+ private CountDownLatch taskStorageManagersInitCount;
+ private CountDownLatch taskStorageManagersRestoreStopCount;
+
+ private CountDownLatch systemConsumerStartCount;
+ private CountDownLatch systemConsumerStopCount;
+
+ /**
+ * Utility method for creating a mocked taskInstance and taskStorageManager and adding it to the map.
+ * @param taskname the desired taskname.
+ */
+ private void addMockedTask(String taskname) {
+ TaskInstance mockTaskInstance = Mockito.mock(TaskInstance.class);
+ Mockito.doAnswer(invocation -> {
+ return new TaskName(taskname);
+ }).when(mockTaskInstance).taskName();
+
+ TaskStorageManager mockTaskStorageManager = Mockito.mock(TaskStorageManager.class);
+ Mockito.doAnswer(invocation -> {
+ taskStorageManagersInitCount.countDown();
+ return null;
+ }).when(mockTaskStorageManager).init();
+
+ Mockito.doAnswer(invocation -> {
+ taskStorageManagersRestoreStopCount.countDown();
+ return null;
+ }).when(mockTaskStorageManager).stop();
+
+ Mockito.doAnswer(invocation -> {
+ taskStorageManagersRestoreStoreCount.countDown();
+ return null;
+ }).when(mockTaskStorageManager).restoreStores();
+
+ taskStorageManagers.put(mockTaskInstance, mockTaskStorageManager);
+ }
+
+ @Before
+ public void setUp() {
+ systemConsumers = new HashMap<>();
+ taskStorageManagers = new HashMap<>();
+
+ // add two mocked tasks
+ addMockedTask("task 1");
+ addMockedTask("task 2");
+
+ // define the expected number of invocations on taskStorageManagers' init, stop and restore count
+ // and the expected number of sysConsumer start and stop
+ this.taskStorageManagersInitCount = new CountDownLatch(2);
+ this.taskStorageManagersRestoreStoreCount = new CountDownLatch(2);
+ this.taskStorageManagersRestoreStopCount = new CountDownLatch(2);
+ this.systemConsumerStartCount = new CountDownLatch(1);
+ this.systemConsumerStopCount = new CountDownLatch(1);
+
+ // mock container metrics
+ samzaContainerMetrics = Mockito.mock(SamzaContainerMetrics.class);
+
+ // mock and setup sysconsumers
+ SystemConsumer mockSystemConsumer = Mockito.mock(SystemConsumer.class);
+ Mockito.doAnswer(invocation -> {
+ systemConsumerStartCount.countDown();
+ return null;
+ }).when(mockSystemConsumer).start();
+ Mockito.doAnswer(invocation -> {
+ systemConsumerStopCount.countDown();
+ return null;
+ }).when(mockSystemConsumer).stop();
+
+ systemConsumers.put("kafka", mockSystemConsumer);
+
+ this.containerStorageManager =
+ new ContainerStorageManager(taskStorageManagers, systemConsumers, samzaContainerMetrics);
+ }
+
+ @Test
+ public void testParallelism() {
+ this.containerStorageManager.start();
+ this.containerStorageManager.shutdown();
+ Assert.assertTrue("init count should be 0", this.taskStorageManagersInitCount.getCount() == 0);
+ Assert.assertTrue("Restore count should be 0", this.taskStorageManagersRestoreStoreCount.getCount() == 0);
+ Assert.assertTrue("stop count should be 0", this.taskStorageManagersRestoreStopCount.getCount() == 0);
+
+ Assert.assertTrue("systemConsumerStopCount count should be 0", this.systemConsumerStopCount.getCount() == 0);
+ Assert.assertTrue("systemConsumerStartCount count should be 0", this.systemConsumerStartCount.getCount() == 0);
+ }
+}