You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "mynameborat (via GitHub)" <gi...@apache.org> on 2023/03/21 17:18:48 UTC

[GitHub] [samza] mynameborat commented on a diff in pull request #1657: Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores

mynameborat commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1142837174


##########
samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java:
##########
@@ -44,16 +45,19 @@ public class SideInputTask implements RunLoopTask {
   private final Set<SystemStreamPartition> taskSSPs;
   private final TaskSideInputHandler taskSideInputHandler;
   private final TaskInstanceMetrics metrics;
+  private final long commitMs;
 
   public SideInputTask(
       TaskName taskName,
       Set<SystemStreamPartition> taskSSPs,
       TaskSideInputHandler taskSideInputHandler,
-      TaskInstanceMetrics metrics) {
+      TaskInstanceMetrics metrics,
+      long commitMs) {
     this.taskName = taskName;
     this.taskSSPs = taskSSPs;
     this.taskSideInputHandler = taskSideInputHandler;
     this.metrics = metrics;
+    this.commitMs = commitMs;

Review Comment:
   Reminder to add warn logging about `commitMs <= 0`



##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
 
+  // Unlike checkpoint or offset files, side input offset file can have multiple writers / readers,
+  // since they are written during SideInputTask commit and copied to store checkpoint directory
+  // by TaskStorageCommitManager during regular TaskInstance commit (these commits are on separate run loops).
+  // We use a (process-wide) lock to ensure that such write and copy operations are thread-safe.

Review Comment:
   Why do we need process-wide locks? We don't have container level stores and this util is scoped to task level right?



##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
 
+  // Unlike checkpoint or offset files, side input offset file can have multiple writers / readers,
+  // since they are written during SideInputTask commit and copied to store checkpoint directory
+  // by TaskStorageCommitManager during regular TaskInstance commit (these commits are on separate run loops).
+  // We use a (process-wide) lock to ensure that such write and copy operations are thread-safe.

Review Comment:
   Never mind. I realized this lock is used across the two run loops during the upload to the blob store code path. Is that right? 



##########
samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java:
##########
@@ -339,7 +339,7 @@ private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, Jo
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {

Review Comment:
   Is this part of bug fix or more like enforcing the flow for only persisted stores?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org