You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "prateekm (via GitHub)" <gi...@apache.org> on 2023/03/18 02:08:54 UTC

[GitHub] [samza] prateekm opened a new pull request, #1657: Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores

prateekm opened a new pull request, #1657:
URL: https://github.com/apache/samza/pull/1657

   Feature: Allow bulk restore from blob store for side input stores.
   If a blob store state backend is available, side input store restores can be sped up significantly by bulk-restoring initial state from the blob store, and only using the Kafka side input topic to catch up on the delta since last checkpoint. 
   
   Additional Context for Reviewers:
   Side inputs RunLoop and regular Task RunLoop are separate and commit independently. SideInputTask commit flushes the the side input store and writes the SIDE-INPUT-OFFSETS file in the store directory, but does not create a store checkpoint or upload it to the state backend. This SIDE-INPUT-OFFSETS file is used during restore to determine the starting offset in the side input topic. Regular Task commit creates store checkpoints, uploads them to the state backends, and saves the resulting store StateCheckpointMarker in the task checkpoint, but currently does not copy the SIDE-INPUT-OFFSETS file to the checkpoint directry.
    
   Changes:
   This is a follow up to #1654 and #1655 
   1. Copy the SIDE-INPUT-OFFSETS file (if exists) to the side input store checkpoint directory created during regular Task commit, so that it can be backed up along with the side input store contents and used during restore for incremental catchup. This copying needs to be done under process-wide synchronization to avoid data corruption, since the two RunLoops are currently completely independent of each other and do not coordinate/synchronize their commits, and there are no atomic file copy APIs.
   2. If a blob store state backend factory is configured for side input stores, use it to do an inital bulk restore in ContainerStorageManager before starting the incremental restore and consumption from side input topics.
    
   
Tests: Added new integration tests to verify BlobStoreStateBackend functionality in general, and the new BlobStoreStateBackend + Side Inputs functionality in particular.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1657: Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1143812365


##########
samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java:
##########
@@ -339,7 +339,7 @@ private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, Jo
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {

Review Comment:
   Actually the PR description is backwards, will update. The stores created iniitally are retained but not restored, instead a new throw-away store is restored from changelog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on a diff in pull request #1657: Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores

Posted by "mynameborat (via GitHub)" <gi...@apache.org>.
mynameborat commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1142837174


##########
samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java:
##########
@@ -44,16 +45,19 @@ public class SideInputTask implements RunLoopTask {
   private final Set<SystemStreamPartition> taskSSPs;
   private final TaskSideInputHandler taskSideInputHandler;
   private final TaskInstanceMetrics metrics;
+  private final long commitMs;
 
   public SideInputTask(
       TaskName taskName,
       Set<SystemStreamPartition> taskSSPs,
       TaskSideInputHandler taskSideInputHandler,
-      TaskInstanceMetrics metrics) {
+      TaskInstanceMetrics metrics,
+      long commitMs) {
     this.taskName = taskName;
     this.taskSSPs = taskSSPs;
     this.taskSideInputHandler = taskSideInputHandler;
     this.metrics = metrics;
+    this.commitMs = commitMs;

Review Comment:
   Reminder to add warn logging about `commitMs <= 0`



##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
 
+  // Unlike checkpoint or offset files, side input offset file can have multiple writers / readers,
+  // since they are written during SideInputTask commit and copied to store checkpoint directory
+  // by TaskStorageCommitManager during regular TaskInstance commit (these commits are on separate run loops).
+  // We use a (process-wide) lock to ensure that such write and copy operations are thread-safe.

Review Comment:
   Why do we need process-wide locks? We don't have container level stores and this util is scoped to task level right?



##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
 
+  // Unlike checkpoint or offset files, side input offset file can have multiple writers / readers,
+  // since they are written during SideInputTask commit and copied to store checkpoint directory
+  // by TaskStorageCommitManager during regular TaskInstance commit (these commits are on separate run loops).
+  // We use a (process-wide) lock to ensure that such write and copy operations are thread-safe.

Review Comment:
   Never mind. I realized this lock is used across the two run loops during the upload to the blob store code path. Is that right? 



##########
samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java:
##########
@@ -339,7 +339,7 @@ private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, Jo
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {

Review Comment:
   Is this part of bug fix or more like enforcing the flow for only persisted stores?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1657: Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1143770748


##########
samza-core/src/main/java/org/apache/samza/storage/NonTransactionalStateTaskRestoreManager.java:
##########
@@ -339,7 +339,7 @@ private Map<String, StorageEngine> createStoreEngines(Set<String> storeNames, Jo
     // Put non persisted stores
     nonPersistedStores.forEach(storageEngines::put);
     // Create persisted stores
-    storeNames.forEach(storeName -> {
+    storeNames.stream().filter(s -> !nonPersistedStores.containsKey(s)).forEach(storeName -> {

Review Comment:
   Part of a bug fix: in-memory stores configured with a changelog were being restored from the changelog, but were being replaced with a new empty store after the restore. 
   This method is putting pre-created and passed in in-memory stores to the storageEngines map in line 340 above this. But it is then overwriting it with a newly created in-memory store in this section because it does not exclude in-memory stores from the list. This new store is the one that will be restored from changelog. But the caller (ContainerStorageManager) is holding on to the passed in in-memory store expecting it to be restored instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on a diff in pull request #1657: Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on code in PR #1657:
URL: https://github.com/apache/samza/pull/1657#discussion_r1143790143


##########
samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java:
##########
@@ -69,6 +72,12 @@ public class StorageManagerUtil {
   private static final String SST_FILE_SUFFIX = ".sst";
   private static final CheckpointV2Serde CHECKPOINT_V2_SERDE = new CheckpointV2Serde();
 
+  // Unlike checkpoint or offset files, side input offset file can have multiple writers / readers,
+  // since they are written during SideInputTask commit and copied to store checkpoint directory
+  // by TaskStorageCommitManager during regular TaskInstance commit (these commits are on separate run loops).
+  // We use a (process-wide) lock to ensure that such write and copy operations are thread-safe.

Review Comment:
   That's right, needed because there's no other coordination/synchronization between the two run loops. It is used during commit whenever they read/write/copy side input offsets file.
   
   To avoid deadlocks, lock is only held within a single method invocation, and the methods do not call each other. On second thought, I'll use a semaphore instead of a reentrant lock to document and enforce this.



##########
samza-core/src/main/java/org/apache/samza/storage/SideInputTask.java:
##########
@@ -44,16 +45,19 @@ public class SideInputTask implements RunLoopTask {
   private final Set<SystemStreamPartition> taskSSPs;
   private final TaskSideInputHandler taskSideInputHandler;
   private final TaskInstanceMetrics metrics;
+  private final long commitMs;
 
   public SideInputTask(
       TaskName taskName,
       Set<SystemStreamPartition> taskSSPs,
       TaskSideInputHandler taskSideInputHandler,
-      TaskInstanceMetrics metrics) {
+      TaskInstanceMetrics metrics,
+      long commitMs) {
     this.taskName = taskName;
     this.taskSSPs = taskSSPs;
     this.taskSideInputHandler = taskSideInputHandler;
     this.metrics = metrics;
+    this.commitMs = commitMs;

Review Comment:
   Will do, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm merged pull request #1657: Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm merged PR #1657:
URL: https://github.com/apache/samza/pull/1657


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] prateekm commented on pull request #1657: Side Inputs + Blob Store Backups - Part 2 - Allow bulk restore from blob store for side input stores

Posted by "prateekm (via GitHub)" <gi...@apache.org>.
prateekm commented on PR #1657:
URL: https://github.com/apache/samza/pull/1657#issuecomment-1474625944

   cc @mynameborat @shekhars-li for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org