You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/05/18 06:24:28 UTC

[GitHub] [samza] dxichen commented on a change in pull request #1501: SAMZA-2657: Blob Store as backend for Samza State backup and restore

dxichen commented on a change in pull request #1501:
URL: https://github.com/apache/samza/pull/1501#discussion_r633953579



##########
File path: samza-api/src/main/java/org/apache/samza/storage/blobstore/BlobStoreManager.java
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.CompletionStage;
+import org.apache.samza.annotation.InterfaceStability;
+
+
+/**
+ * Provides interface for common blob store operations: GET, PUT and DELETE
+ */
+@InterfaceStability.Unstable
+public interface BlobStoreManager {
+  /**
+   * init method to initialize underlying blob store client, if necessary
+   */
+  void init();
+  /**
+   * Non-blocking PUT call to remote blob store with supplied metadata

Review comment:
       P2: add @params here as well

##########
File path: samza-api/src/main/java/org/apache/samza/storage/BlobStoreAdminFactory.java
##########
@@ -0,0 +1,9 @@
+package org.apache.samza.storage;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.job.model.JobModel;
+
+

Review comment:
       P2: javadocs for all public interfaces

##########
File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+  private static final Logger LOG = LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+  private final JobModel jobModel;
+  private final ExecutorService executor;
+  private final String jobName;
+  private final String jobId;
+  private final ContainerModel containerModel;
+  private final TaskModel taskModel;
+  private final String taskName;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final List<String> storesToBackup;
+  private final File loggedStoreBaseDir;
+  private final BlobStoreUtil blobStoreUtil;
+
+  private final BlobStoreBackupManagerMetrics metrics;
+
+  /**
+   * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} from
+   * last successful task checkpoint or {@link #upload}.
+   *
+   * After {@link #init}, the map reflects the contents of the last completed checkpoint for the task from the previous
+   * deployment, if any.
+   *
+   * During regular processing, this map is updated after each successful {@link #upload} with the blob id of
+   * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the upload.
+   *
+   * The contents of this map are used to:
+   * 1. Delete any unused stores from the previous deployment in the remote store during {@link #init}.
+   * 2. Calculate the diff for local state between the last and the current checkpoint during {@link #upload}.
+   *
+   * Since the task commit process guarantees that the async stage of the previous commit is complete before another
+   * commit can start, this future is guaranteed to be complete in the call to {@link #upload} during the next commit.
+   *
+   * This field is non-final, since the future itself is replaced in its entirety after init/upload.
+   * The internal map contents are never directly modified (e.g. using puts). It's volatile to ensure visibility
+   * across threads since the map assignment may happen on a different thread than the one reading the contents.
+   */
+  private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+      prevStoreSnapshotIndexesFuture;
+
+  public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config,
+      Clock clock, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+    this.jobModel = jobModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.containerModel = containerModel;
+    this.taskModel = taskModel;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.executor = backupExecutor;
+    this.config = config;
+    this.clock = clock;
+    this.storageManagerUtil = storageManagerUtil;
+    StorageConfig storageConfig = new StorageConfig(config);
+    this.storesToBackup =
+        storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    this.loggedStoreBaseDir = loggedStoreBaseDir;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of());
+    this.metrics = blobStoreTaskBackupMetrics;
+    metrics.initStoreMetrics(storesToBackup);
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+    // Note: blocks the caller (main) thread.
+    Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil);
+    this.prevStoreSnapshotIndexesFuture =
+        CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public Map<String, String> snapshot(CheckpointId checkpointId) {
+    // No-op. Stores are flushed and checkpoints are created by commit manager
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> storeSCMs) {
+    long uploadStartTime = System.nanoTime();
+
+    // reset gauges for each upload
+    metrics.filesToUpload.getValue().set(0L);
+    metrics.bytesToUpload.getValue().set(0L);
+    metrics.filesUploaded.getValue().set(0L);
+    metrics.bytesUploaded.getValue().set(0L);
+    metrics.filesRemaining.getValue().set(0L);
+    metrics.bytesRemaining.getValue().set(0L);
+    metrics.filesToRetain.getValue().set(0L);
+    metrics.bytesToRetain.getValue().set(0L);
+
+    Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+        storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+    Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new HashMap<>();
+
+    storesToBackup.forEach((storeName) -> {
+      long storeUploadStartTime = System.nanoTime();
+      try {
+        // metadata for the current store snapshot to upload
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, jobName, jobId, taskName, storeName);
+
+        // Only durable/persistent stores are passed here from commit manager
+        // get the local store dir corresponding to the current checkpointId
+        File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, storeName,
+            taskModel.getTaskName(), taskModel.getTaskMode());
+        String checkpointDirPath = storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId);
+        File checkpointDir = new File(checkpointDirPath);
+
+        LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}",
+            taskName, storeName, storeDir, checkpointDir);
+
+        // get the previous store directory contents
+        DirIndex prevDirIndex;

Review comment:
       Lets move this right before the if statement to set this, or use a tertiary statement

##########
File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreStateBackendFactory.java
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.context.ContainerContext;
+import org.apache.samza.context.JobContext;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.storage.KafkaChangelogRestoreParams;
+import org.apache.samza.storage.BlobStoreAdminFactory;
+import org.apache.samza.storage.StateBackendFactory;
+import org.apache.samza.storage.StateBackendAdmin;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.TaskRestoreManager;
+import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.ReflectionUtil;
+
+
+public class BlobStoreStateBackendFactory implements StateBackendFactory {
+  @Override
+  public TaskBackupManager getBackupManager(
+      JobModel jobModel,
+      ContainerModel containerModel,
+      TaskModel taskModel,
+      ExecutorService backupExecutor,
+      MetricsRegistry metricsRegistry,
+      Config config,
+      Clock clock,
+      File loggedStoreBaseDir,
+      File nonLoggedStoreBaseDir) {
+    StorageConfig storageConfig = new StorageConfig(config);
+    String blobStoreManagerFactory = storageConfig.getBlobStoreManagerFactory();
+    Preconditions.checkState(StringUtils.isNotBlank(blobStoreManagerFactory));
+    BlobStoreManagerFactory factory = ReflectionUtil.getObj(blobStoreManagerFactory, BlobStoreManagerFactory.class);
+    BlobStoreManager blobStoreManager = factory.getBackupBlobStoreManager(config, backupExecutor);
+    BlobStoreBackupManagerMetrics metrics = new BlobStoreBackupManagerMetrics(metricsRegistry);
+    BlobStoreUtil blobStoreUtil = new BlobStoreUtil(blobStoreManager, backupExecutor, metrics, null);

Review comment:
       Seems weird to create separate metrics classes for blobStoreUtil since it is agnostic to whether the caller is performing backup or restore, could we merge these metrics classes? 

##########
File path: samza-api/src/main/java/org/apache/samza/storage/blobstore/exceptions/DeletedException.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore.exceptions;
+
+public class DeletedException extends RuntimeException {

Review comment:
       If this is used to abstract out the specific blob store client, we need to make sure that on contract for the blob store impl must follow wrap this (make sure it is documented in the interface class)
   
   Also, how do we know if it is deleted vs missing/not present? Would the behaviour for MissingException be different?

##########
File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+  private static final Logger LOG = LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+  private final JobModel jobModel;
+  private final ExecutorService executor;
+  private final String jobName;
+  private final String jobId;
+  private final ContainerModel containerModel;
+  private final TaskModel taskModel;
+  private final String taskName;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final List<String> storesToBackup;
+  private final File loggedStoreBaseDir;
+  private final BlobStoreUtil blobStoreUtil;
+
+  private final BlobStoreBackupManagerMetrics metrics;
+
+  /**
+   * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} from
+   * last successful task checkpoint or {@link #upload}.
+   *
+   * After {@link #init}, the map reflects the contents of the last completed checkpoint for the task from the previous
+   * deployment, if any.
+   *
+   * During regular processing, this map is updated after each successful {@link #upload} with the blob id of
+   * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the upload.
+   *
+   * The contents of this map are used to:
+   * 1. Delete any unused stores from the previous deployment in the remote store during {@link #init}.
+   * 2. Calculate the diff for local state between the last and the current checkpoint during {@link #upload}.
+   *
+   * Since the task commit process guarantees that the async stage of the previous commit is complete before another
+   * commit can start, this future is guaranteed to be complete in the call to {@link #upload} during the next commit.
+   *
+   * This field is non-final, since the future itself is replaced in its entirety after init/upload.
+   * The internal map contents are never directly modified (e.g. using puts). It's volatile to ensure visibility
+   * across threads since the map assignment may happen on a different thread than the one reading the contents.
+   */
+  private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+      prevStoreSnapshotIndexesFuture;
+
+  public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config,
+      Clock clock, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+    this.jobModel = jobModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.containerModel = containerModel;
+    this.taskModel = taskModel;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.executor = backupExecutor;
+    this.config = config;
+    this.clock = clock;
+    this.storageManagerUtil = storageManagerUtil;
+    StorageConfig storageConfig = new StorageConfig(config);
+    this.storesToBackup =
+        storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    this.loggedStoreBaseDir = loggedStoreBaseDir;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of());
+    this.metrics = blobStoreTaskBackupMetrics;
+    metrics.initStoreMetrics(storesToBackup);
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+    // Note: blocks the caller (main) thread.
+    Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil);
+    this.prevStoreSnapshotIndexesFuture =
+        CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public Map<String, String> snapshot(CheckpointId checkpointId) {
+    // No-op. Stores are flushed and checkpoints are created by commit manager
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> storeSCMs) {
+    long uploadStartTime = System.nanoTime();
+
+    // reset gauges for each upload
+    metrics.filesToUpload.getValue().set(0L);
+    metrics.bytesToUpload.getValue().set(0L);
+    metrics.filesUploaded.getValue().set(0L);
+    metrics.bytesUploaded.getValue().set(0L);
+    metrics.filesRemaining.getValue().set(0L);
+    metrics.bytesRemaining.getValue().set(0L);
+    metrics.filesToRetain.getValue().set(0L);
+    metrics.bytesToRetain.getValue().set(0L);
+
+    Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+        storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+    Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new HashMap<>();
+
+    storesToBackup.forEach((storeName) -> {
+      long storeUploadStartTime = System.nanoTime();
+      try {
+        // metadata for the current store snapshot to upload
+        SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, jobName, jobId, taskName, storeName);
+
+        // Only durable/persistent stores are passed here from commit manager
+        // get the local store dir corresponding to the current checkpointId
+        File storeDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDir, storeName,
+            taskModel.getTaskName(), taskModel.getTaskMode());
+        String checkpointDirPath = storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId);
+        File checkpointDir = new File(checkpointDirPath);
+
+        LOG.debug("Got task: {} store: {} storeDir: {} and checkpointDir: {}",
+            taskName, storeName, storeDir, checkpointDir);
+
+        // get the previous store directory contents
+        DirIndex prevDirIndex;
+
+        // guaranteed to be available since a new task commit may not start until the previous one is complete
+        Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+            prevStoreSnapshotIndexesFuture.get(0, TimeUnit.MILLISECONDS);
+
+        if (prevStoreSnapshotIndexes.containsKey(storeName)) {
+          prevDirIndex = prevStoreSnapshotIndexes.get(storeName).getRight().getDirIndex();
+        } else {
+          // no previous SnapshotIndex means that this is the first commit for this store. Create an empty DirIndex.
+          prevDirIndex = new DirIndex(checkpointDir.getName(), Collections.emptyList(), Collections.emptyList(),
+              Collections.emptyList(), Collections.emptyList());
+        }
+
+        long dirDiffStartTime = System.nanoTime();
+        // get the diff between previous and current store directories
+        DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, BlobStoreUtil.areSameFile(false));
+        metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - dirDiffStartTime);
+
+        DirDiff.Stats stats = DirDiff.getStats(dirDiff);
+        updateStoreDiffMetrics(storeName, stats);
+        metrics.filesToUpload.getValue().addAndGet(stats.filesAdded);

Review comment:
       Could these metrics also be rolled into the updateSToreDiffMetrics() to keep this class cleaner?

##########
File path: samza-core/src/main/java/org/apache/samza/standalone/PassthroughJobCoordinator.java
##########
@@ -87,6 +90,18 @@ public void start() {
       // TODO metrics registry has been null here for a while; is it safe?
       MetadataResourceUtil metadataResourceUtil = new MetadataResourceUtil(jobModel, null, config);
       metadataResourceUtil.createResources();
+
+      // create all the resources required for state backend factories
+      new StorageConfig(config).getStateBackendBackupFactories().forEach(stateStorageBackendBackupFactory -> {

Review comment:
       We had a discussion offline, but to bring the thread here, is this is correct place to add this for dev deploys?
   IIRC we discussed putting the logic in processJobFactory instead, is this required for standalone jobs?

##########
File path: samza-api/src/main/java/org/apache/samza/storage/StateBackendFactory.java
##########
@@ -55,5 +56,5 @@ TaskRestoreManager getRestoreManager(JobContext jobContext,
       File nonLoggedStoreBaseDir,
       KafkaChangelogRestoreParams kafkaChangelogRestoreParams);
 
-  TaskStorageAdmin getAdmin();
+  StateBackendAdmin getStateBackendAdmin(JobModel jobModel, Config config);

Review comment:
       minor: lets change these to JobContext for consistency

##########
File path: samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
##########
@@ -66,6 +66,9 @@
   public static final String CHANGELOG_MIN_COMPACTION_LAG_MS = STORE_PREFIX + "%s.changelog." + MIN_COMPACTION_LAG_MS;
   public static final long DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS = TimeUnit.HOURS.toMillis(4);
 
+  public static final String BLOB_STORE_BACKEND_ADMIN_FACTORY = "blob.store.backend.admin.factory";

Review comment:
       +1 lets create a blobStoreConfig (similar to KafkaConfig)

##########
File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.collect.ImmutableMap;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.ContainerModel;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskBackupManager;
+import org.apache.samza.storage.blobstore.diff.DirDiff;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
+import org.apache.samza.storage.blobstore.metrics.BlobStoreBackupManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.storage.blobstore.util.DirDiffUtil;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreBackupManager implements TaskBackupManager {
+  private static final Logger LOG = LoggerFactory.getLogger(BlobStoreBackupManager.class);
+
+  private final JobModel jobModel;
+  private final ExecutorService executor;
+  private final String jobName;
+  private final String jobId;
+  private final ContainerModel containerModel;
+  private final TaskModel taskModel;
+  private final String taskName;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final List<String> storesToBackup;
+  private final File loggedStoreBaseDir;
+  private final BlobStoreUtil blobStoreUtil;
+
+  private final BlobStoreBackupManagerMetrics metrics;
+
+  /**
+   * Map of store name to a Pair of blob id of {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} from
+   * last successful task checkpoint or {@link #upload}.
+   *
+   * After {@link #init}, the map reflects the contents of the last completed checkpoint for the task from the previous
+   * deployment, if any.
+   *
+   * During regular processing, this map is updated after each successful {@link #upload} with the blob id of
+   * {@link SnapshotIndex} and the corresponding {@link SnapshotIndex} of the upload.
+   *
+   * The contents of this map are used to:
+   * 1. Delete any unused stores from the previous deployment in the remote store during {@link #init}.
+   * 2. Calculate the diff for local state between the last and the current checkpoint during {@link #upload}.
+   *
+   * Since the task commit process guarantees that the async stage of the previous commit is complete before another
+   * commit can start, this future is guaranteed to be complete in the call to {@link #upload} during the next commit.
+   *
+   * This field is non-final, since the future itself is replaced in its entirety after init/upload.
+   * The internal map contents are never directly modified (e.g. using puts). It's volatile to ensure visibility
+   * across threads since the map assignment may happen on a different thread than the one reading the contents.
+   */
+  private volatile CompletableFuture<Map<String, Pair<String, SnapshotIndex>>>
+      prevStoreSnapshotIndexesFuture;
+
+  public BlobStoreBackupManager(JobModel jobModel, ContainerModel containerModel, TaskModel taskModel,
+      ExecutorService backupExecutor, BlobStoreBackupManagerMetrics blobStoreTaskBackupMetrics, Config config,
+      Clock clock, File loggedStoreBaseDir, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil) {
+    this.jobModel = jobModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.containerModel = containerModel;
+    this.taskModel = taskModel;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.executor = backupExecutor;
+    this.config = config;
+    this.clock = clock;
+    this.storageManagerUtil = storageManagerUtil;
+    StorageConfig storageConfig = new StorageConfig(config);
+    this.storesToBackup =
+        storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    this.loggedStoreBaseDir = loggedStoreBaseDir;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexesFuture = CompletableFuture.completedFuture(ImmutableMap.of());
+    this.metrics = blobStoreTaskBackupMetrics;
+    metrics.initStoreMetrics(storesToBackup);
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store backup manager for task: {}", taskName);
+
+    // Note: blocks the caller (main) thread.
+    Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil);
+    this.prevStoreSnapshotIndexesFuture =
+        CompletableFuture.completedFuture(ImmutableMap.copyOf(prevStoreSnapshotIndexes));
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  @Override
+  public Map<String, String> snapshot(CheckpointId checkpointId) {
+    // No-op. Stores are flushed and checkpoints are created by commit manager
+    return Collections.emptyMap();
+  }
+
+  @Override
+  public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId, Map<String, String> storeSCMs) {
+    long uploadStartTime = System.nanoTime();
+
+    // reset gauges for each upload
+    metrics.filesToUpload.getValue().set(0L);
+    metrics.bytesToUpload.getValue().set(0L);
+    metrics.filesUploaded.getValue().set(0L);
+    metrics.bytesUploaded.getValue().set(0L);
+    metrics.filesRemaining.getValue().set(0L);
+    metrics.bytesRemaining.getValue().set(0L);
+    metrics.filesToRetain.getValue().set(0L);
+    metrics.bytesToRetain.getValue().set(0L);
+
+    Map<String, CompletableFuture<Pair<String, SnapshotIndex>>>
+        storeToSCMAndSnapshotIndexPairFutures = new HashMap<>();
+    Map<String, CompletableFuture<String>> storeToSerializedSCMFuture = new HashMap<>();
+
+    storesToBackup.forEach((storeName) -> {

Review comment:
       Is it possible to make this async in the future?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org