You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "shekhars-li (via GitHub)" <gi...@apache.org> on 2023/07/26 21:55:00 UTC

[GitHub] [samza] shekhars-li commented on a diff in pull request #1676: [SAMZA-2787] GetDeleted API and Recover from DeletedException

shekhars-li commented on code in PR #1676:
URL: https://github.com/apache/samza/pull/1676#discussion_r1275518474


##########
samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java:
##########
@@ -301,64 +300,14 @@ private void restoreStores() throws InterruptedException {
               samzaContainerMetrics, taskInstanceMetrics, taskInstanceCollectors, serdes,
               loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
       taskRestoreManagers.put(taskName, taskStoreRestoreManagers);
+      taskBackendFactoryToStoreNames.put(taskName, backendFactoryToStoreNames);
     });
 
-    // Initialize each TaskStorageManager
-    taskRestoreManagers.forEach((taskName, restoreManagers) ->
-        restoreManagers.forEach((factoryName, taskRestoreManager) ->
-            taskRestoreManager.init(taskCheckpoints.get(taskName))
-        )
-    );
-
-    // Start each store consumer once.
-    // Note: These consumers are per system and only changelog system store consumers will be started.
-    // Some TaskRestoreManagers may not require the consumer to to be started, but due to the agnostic nature of
-    // ContainerStorageManager we always start the changelog consumer here in case it is required
-    this.storeConsumers.values().stream().distinct().forEach(SystemConsumer::start);
-
-    List<Future<Void>> taskRestoreFutures = new ArrayList<>();
-
-    // Submit restore callable for each taskInstance
-    taskRestoreManagers.forEach((taskInstance, restoreManagersMap) -> {
-      // Submit for each restore factory
-      restoreManagersMap.forEach((factoryName, taskRestoreManager) -> {
-        long startTime = System.currentTimeMillis();
-        String taskName = taskInstance.getTaskName();
-        LOG.info("Starting restore for state for task: {}", taskName);
-        CompletableFuture<Void> restoreFuture = taskRestoreManager.restore().handle((res, ex) -> {
-          // Stop all persistent stores after restoring. Certain persistent stores opened in BulkLoad mode are compacted
-          // on stop, so paralleling stop() also parallelizes their compaction (a time-intensive operation).
-          try {
-            taskRestoreManager.close();
-          } catch (Exception e) {
-            LOG.error("Error closing restore manager for task: {} after {} restore",
-                taskName, ex != null ? "unsuccessful" : "successful", e);
-            // ignore exception from close. container may still be be able to continue processing/backups
-            // if restore manager close fails.
-          }
-
-          long timeToRestore = System.currentTimeMillis() - startTime;
-          if (samzaContainerMetrics != null) {
-            Gauge taskGauge = samzaContainerMetrics.taskStoreRestorationMetrics().getOrDefault(taskInstance, null);
-
-            if (taskGauge != null) {
-              taskGauge.set(timeToRestore);
-            }
-          }
-
-          if (ex != null) {
-            // log and rethrow exception to communicate restore failure
-            String msg = String.format("Error restoring state for task: %s", taskName);
-            LOG.error(msg, ex);
-            throw new SamzaException(msg, ex); // wrap in unchecked exception to throw from lambda
-          } else {
-            return null;
-          }
-        });
-
-        taskRestoreFutures.add(restoreFuture);
-      });
-    });
+    // Init all taskRestores and if successful, create a future for restores for each task
+    List<Future<Void>> taskRestoreFutures =

Review Comment:
   @prateekm Newly created checkpoints are reflected in the TaskCheckpoints map. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org