You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2021/05/17 23:33:57 UTC

[GitHub] [samza] shekhars-li commented on a change in pull request #1501: SAMZA-2657: Blob Store as backend for Samza State backup and restore

shekhars-li commented on a change in pull request #1501:
URL: https://github.com/apache/samza/pull/1501#discussion_r633931258



##########
File path: samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java
##########
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage.blobstore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.SamzaException;
+import org.apache.samza.checkpoint.Checkpoint;
+import org.apache.samza.checkpoint.CheckpointId;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.storage.StorageManagerUtil;
+import org.apache.samza.storage.TaskRestoreManager;
+import org.apache.samza.storage.blobstore.index.DirIndex;
+import org.apache.samza.storage.blobstore.index.SnapshotIndex;
+import org.apache.samza.storage.blobstore.metrics.BlobStoreRestoreManagerMetrics;
+import org.apache.samza.storage.blobstore.util.BlobStoreStateBackendUtil;
+import org.apache.samza.storage.blobstore.util.BlobStoreUtil;
+import org.apache.samza.util.FileUtil;
+import org.apache.samza.util.FutureUtil;
+import org.checkerframework.checker.nullness.Opt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BlobStoreRestoreManager implements TaskRestoreManager {
+  private static final Logger LOG = LoggerFactory.getLogger(BlobStoreRestoreManager.class);
+  // when checking if checkpoint dir is the same as remote snapshot, exclude the "OFFSET" family of files files
+  // that are written to the checkpoint dir after the remote upload is complete as part of
+  // TaskStorageCommitManager#writeCheckpointToStoreDirectories.
+  private static final Set<String> FILES_TO_IGNORE = ImmutableSet.of(
+      StorageManagerUtil.OFFSET_FILE_NAME_LEGACY,
+      StorageManagerUtil.OFFSET_FILE_NAME_NEW,
+      StorageManagerUtil.SIDE_INPUT_OFFSET_FILE_NAME_LEGACY,
+      StorageManagerUtil.CHECKPOINT_FILE_NAME);
+
+  private final TaskModel taskModel;
+  private final String jobName;
+  private final String jobId;
+  private final ExecutorService executor;
+  private final Config config;
+  private final StorageConfig storageConfig;
+  private final StorageManagerUtil storageManagerUtil;
+  private final BlobStoreUtil blobStoreUtil;
+  private final File loggedBaseDir;
+  private final File nonLoggedBaseDir;
+  private final String taskName;
+  private final List<String> storesToRestore;
+
+  private final BlobStoreRestoreManagerMetrics metrics;
+
+  /**
+   * Map of store name and Pair of blob id of SnapshotIndex and the corresponding SnapshotIndex from last snapshot
+   * creation
+   */
+  private Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes;
+
+  public BlobStoreRestoreManager(TaskModel taskModel, ExecutorService restoreExecutor,
+      BlobStoreRestoreManagerMetrics metrics, Config config, StorageManagerUtil storageManagerUtil,
+      BlobStoreUtil blobStoreUtil, File loggedBaseDir, File nonLoggedBaseDir) {
+    this.taskModel = taskModel;
+    this.jobName = new JobConfig(config).getName().get();
+    this.jobId = new JobConfig(config).getJobId();
+    this.executor = restoreExecutor; // TODO BLOCKER dchen1 dont block on restore executor
+    this.config = config;
+    this.storageConfig = new StorageConfig(config);
+    this.storageManagerUtil = storageManagerUtil;
+    this.blobStoreUtil = blobStoreUtil;
+    this.prevStoreSnapshotIndexes = new HashMap<>();
+    this.loggedBaseDir = loggedBaseDir;
+    this.nonLoggedBaseDir = nonLoggedBaseDir;
+    this.taskName = taskModel.getTaskName().getTaskName();
+    this.storesToRestore =
+        storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+    this.metrics = metrics;
+  }
+
+  @Override
+  public void init(Checkpoint checkpoint) {
+    long startTime = System.nanoTime();
+    LOG.debug("Initializing blob store restore manager for task: {}", taskName);
+    // get previous SCMs from checkpoint
+    prevStoreSnapshotIndexes =
+        BlobStoreStateBackendUtil.getStoreSnapshotIndexes(jobName, jobId, taskName, checkpoint, blobStoreUtil);
+    metrics.getSnapshotIndexNs.set(System.nanoTime() - startTime);
+    LOG.trace("Found previous snapshot index during blob store restore manager init for task: {} to be: {}",
+        taskName, prevStoreSnapshotIndexes);
+
+    metrics.initStoreMetrics(storesToRestore);
+
+    // Note: blocks the caller (main) thread.
+    deleteUnusedStoresFromBlobStore(jobName, jobId, taskName, storageConfig, prevStoreSnapshotIndexes, blobStoreUtil, executor);
+    metrics.initNs.set(System.nanoTime() - startTime);
+  }
+
+  /**
+   * Restore state from checkpoints, state snapshots and changelog.
+   */
+  @Override
+  public void restore() {
+    restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir,
+        storageConfig, metrics, storageManagerUtil, blobStoreUtil, executor);
+  }
+
+  @Override
+  public void close() {
+  }
+
+  /**
+   * Deletes blob store contents for stores that were present in the last checkpoint but are either no longer
+   * present in job configs (removed by user since last deploymetn) or are no longer configured to be backed
+   * up using blob stores.
+   *
+   * This method blocks until all the necessary store contents and snapshot index blobs have been marked for deletion.
+   */
+  @VisibleForTesting
+  static void deleteUnusedStoresFromBlobStore(String jobName, String jobId, String taskName, StorageConfig storageConfig,
+      Map<String, Pair<String, SnapshotIndex>> initialStoreSnapshotIndexes,
+      BlobStoreUtil blobStoreUtil, ExecutorService executor) {
+
+    List<String> storesToBackup =
+        storageConfig.getStoresWithStateBackendBackupFactory(BlobStoreStateBackendFactory.class.getName());
+    List<String> storesToRestore =
+        storageConfig.getStoresWithStateBackendRestoreFactory(BlobStoreStateBackendFactory.class.getName());
+
+    List<CompletionStage<Void>> storeDeletionFutures = new ArrayList<>();
+    initialStoreSnapshotIndexes.forEach((storeName, scmAndSnapshotIndex) -> {
+      if (!storesToBackup.contains(storeName) && !storesToRestore.contains(storeName)) {
+        LOG.debug("Removing task: {} store: {} from blob store. It is either no longer used, " +
+            "or is no longer configured to be backed up or restored with blob store.", taskName, storeName);
+        DirIndex dirIndex = scmAndSnapshotIndex.getRight().getDirIndex();
+        Metadata requestMetadata =
+            new Metadata(Metadata.PAYLOAD_PATH_SNAPSHOT_INDEX, Optional.empty(), jobName, jobId, taskName, storeName);
+        CompletionStage<Void> storeDeletionFuture =
+            blobStoreUtil.cleanUpDir(dirIndex, requestMetadata) // delete files and sub-dirs previously marked for removal
+                .thenComposeAsync(v ->
+                    blobStoreUtil.deleteDir(dirIndex, requestMetadata), executor) // deleted files and dirs still present
+                .thenComposeAsync(v -> blobStoreUtil.deleteSnapshotIndexBlob(
+                    scmAndSnapshotIndex.getLeft(), requestMetadata),
+                    executor); // delete the snapshot index blob
+        storeDeletionFutures.add(storeDeletionFuture);
+      }
+    });
+
+    FutureUtil.allOf(storeDeletionFutures).join();
+  }
+
+  /**
+   * Restores all eligible stores in the task.
+   */
+  @VisibleForTesting
+  static void restoreStores(String jobName, String jobId, TaskName taskName, List<String> storesToRestore,
+      Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
+      File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics,
+      StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil,
+      ExecutorService executor) {
+    long restoreStartTime = System.nanoTime();
+    List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
+
+    LOG.debug("Starting restore for task: {} stores: {}", taskName, storesToRestore);
+    storesToRestore.forEach(storeName -> {
+      if (!prevStoreSnapshotIndexes.containsKey(storeName)) {
+        LOG.debug("No checkpointed snapshot index found for task: {} store: {}. Skipping restore.", taskName, storeName);
+        // TODO HIGH shesharm what should we do with the local state already present on disk, if any?
+        // E.g. this will be the case if user changes a store from changelog based backup and restore to
+        // blob store based backup and restore, both at the same time.
+        return;
+      }
+
+      Pair<String, SnapshotIndex> scmAndSnapshotIndex = prevStoreSnapshotIndexes.get(storeName);
+
+      long storeRestoreStartTime = System.nanoTime();
+      SnapshotIndex snapshotIndex = scmAndSnapshotIndex.getRight();
+      DirIndex dirIndex = snapshotIndex.getDirIndex();
+
+      // TODO MINOR shesharm: calculate recursively similar to DirDiff.Stats
+      long bytesToRestore = dirIndex.getFilesPresent().stream().mapToLong(fi -> fi.getFileMetadata().getSize()).sum();
+      metrics.filesToRestore.getValue().addAndGet(dirIndex.getFilesPresent().size());
+      metrics.bytesToRestore.getValue().addAndGet(bytesToRestore);
+      metrics.filesRemaining.getValue().addAndGet(dirIndex.getFilesPresent().size());
+      metrics.bytesRemaining.getValue().addAndGet(bytesToRestore);
+
+      CheckpointId checkpointId = snapshotIndex.getSnapshotMetadata().getCheckpointId();
+      File storeDir = storageManagerUtil.getTaskStoreDir(loggedBaseDir, storeName, taskName, TaskMode.Active);
+      Path storeCheckpointDir = Paths.get(storageManagerUtil.getStoreCheckpointDir(storeDir, checkpointId));
+      LOG.trace("Got task: {} store: {} local store directory: {} and local store checkpoint directory: {}",
+          taskName, storeName, storeDir, storeCheckpointDir);
+
+      // we always delete the store dir to preserve transactional state guarantees.
+      try {
+        LOG.debug("Deleting local store directory: {}. Will be restored from local store checkpoint directory " +
+            "or remote snapshot.", storeDir);
+        FileUtils.deleteDirectory(storeDir);
+      } catch (IOException e) {
+        throw new SamzaException(String.format("Error deleting store directory: %s", storeDir), e);
+      }
+
+      boolean shouldRestore = shouldRestore(taskName.getTaskName(), storeName, dirIndex,
+          storeCheckpointDir, storageConfig, blobStoreUtil);
+
+      if (shouldRestore) { // restore the store from the remote blob store
+        LOG.debug("Deleting local store checkpoint directory: {} before restore.", storeCheckpointDir);
+        // delete all store checkpoint directories. if we only delete the store directory and don't
+        // delete the checkpoint directories, the store size on disk will grow to 2x after restore
+        // until the first commit is completed and older checkpoint dirs are deleted. This is
+        // because the hard-linked checkpoint dir files will no longer be de-duped with the
+        // now-deleted main store directory contents and will take up additional space of their
+        // own during the restore.
+        deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil);
+
+        enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime,
+            restoreFutures, blobStoreUtil, metrics, executor);
+      } else {
+        LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " +
+            "to the remote snapshot.", storeCheckpointDir, storeDir);
+        // atomically rename the checkpoint dir to the store dir
+        new FileUtil().move(storeCheckpointDir.toFile(), storeDir);
+
+        // delete any other checkpoint dirs.
+        deleteCheckpointDirs(taskName, storeName, loggedBaseDir, storageManagerUtil);
+      }
+    });
+
+    // wait for all restores to finish
+    FutureUtil.allOf(restoreFutures).whenComplete((res, ex) -> {
+      LOG.info("Restore completed for task: {} stores", taskName);
+      metrics.restoreNs.set(System.nanoTime() - restoreStartTime);
+    }).join(); // TODO BLOCKER dchen1 make non-blocking.
+  }
+
+  /**
+   * Determines if the store needs to be restored from remote snapshot based on local and remote state.
+   */
+  @VisibleForTesting
+  static boolean shouldRestore(String taskName, String storeName, DirIndex dirIndex,
+      Path storeCheckpointDir, StorageConfig storageConfig, BlobStoreUtil blobStoreUtil) {
+    // if a store checkpoint directory exists for the last successful task checkpoint, try to use it.
+    boolean restoreStore;
+    if (Files.exists(storeCheckpointDir)) {
+      if (storageConfig.getCleanLoggedStoreDirsOnStart(storeName)) {
+        LOG.debug("Restoring task: {} store: {} from remote snapshot since the store is configured to be " +
+            "restored on each restart.", taskName, storeName);
+        restoreStore = true;
+      } else if (blobStoreUtil.areSameDir(FILES_TO_IGNORE, true).test(storeCheckpointDir.toFile(), dirIndex)) {
+        restoreStore = false; // no restore required for this store.
+      } else {
+        // we don't optimize for the case when the local host doesn't contain the most recent store checkpoint
+        // directory but contains an older checkpoint directory which could have partial overlap with the remote
+        // snapshot. we also don't try to optimize for any edge cases where the most recent checkpoint directory
+        // contents could be partially different than the remote store (afaik, there is no known valid scenario
+        // where this could happen right now, except for the offset file handling above).
+        // it's simpler and fast enough for now to restore the entire store instead.
+
+        LOG.error("Local store checkpoint directory: {} contents are not the same as the remote snapshot. " +
+            "Queuing for restore from remote snapshot.", storeCheckpointDir);
+        // old checkpoint directory will be deleted later during commits
+        restoreStore = true;
+      }
+    } else { // did not find last checkpoint dir, restore the store from the remote blob store
+      LOG.debug("No local store checkpoint directory found at: {}. " +
+          "Queuing for restore from remote snapshot.", storeCheckpointDir);
+      restoreStore = true;
+    }
+
+    return restoreStore;
+  }
+
+  /**
+   * Starts the restore for the store, enqueuing all restore-completion futures into {@param restoreFutures}.
+   */
+  @VisibleForTesting
+  static void enqueueRestore(String jobName, String jobId, String taskName, String storeName, File storeDir, DirIndex dirIndex,
+      long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures,
+      BlobStoreUtil blobStoreUtil, BlobStoreRestoreManagerMetrics metrics, ExecutorService executor) {
+    metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);
+
+    Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(), Optional.empty(), jobName, jobId, taskName, storeName);

Review comment:
       Metadata contains Metadata related to the current request. So a restoreDir request has storeDir in payload. When the restoreDir is called, it creates new Metadata object with payload as individual file and that's what is eventually sent to the blob store API (GET). Basically the real request to blob store has file names, because at every step right request Metadata object is built. That way, we track request at each step and use that to log if necessary.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org