You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/04/16 16:18:43 UTC

[GitHub] [samza] prateekm commented on a change in pull request #1429: SAMZA-2591: API updates for TaskStorageBackupManager

prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r614965895



##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -256,6 +254,10 @@ public ContainerStorageManager(
         containerChangelogSystems, systemFactories, config, this.samzaContainerMetrics.registry());
     this.storeConsumers = createStoreIndexedMap(this.changelogSystemStreams, storeSystemConsumers);
 
+    // TODO HIGH dchen tune based on observed concurrency
+    this.restoreExecutor = Executors.newFixedThreadPool(containerModel.getTasks().size(),

Review comment:
       @dxichen I think this still needs to be 2x until you make RestoreManager init() and restore() nonblocking.
   
   Where is restore manager init (not store init) called btw? Is this on this executor as well? Since we're deleting old stores during restore init, ideally that should be parallel and nonblocking as well.

##########
File path: samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java
##########
@@ -643,15 +652,11 @@ private void restoreStores() throws InterruptedException {
     // Start each store consumer once
     this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
 
-    // Create a thread pool for parallel restores (and stopping of persistent stores)
-    ExecutorService executorService = Executors.newFixedThreadPool(this.parallelRestoreThreadPoolSize,
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat(RESTORE_THREAD_NAME).build());
-
     List<Future> taskRestoreFutures = new ArrayList<>(this.taskRestoreManagers.entrySet().size());
 
     // Submit restore callable for each taskInstance
     this.taskRestoreManagers.forEach((taskInstance, taskRestoreManager) -> {
-      taskRestoreFutures.add(executorService.submit(
+      taskRestoreFutures.add(restoreExecutor.submit(

Review comment:
       What's the relationship b/w the parallelRestoreThreadPoolSize and new executors pool size? Are they the same?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org