You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/10/08 18:26:07 UTC

[GitHub] [samza] prateekm commented on a change in pull request #1164: [WIP] Transactional State [5/5]: Added implementations for transactional state checkpoints and restore

prateekm commented on a change in pull request #1164: [WIP] Transactional State [5/5]: Added implementations for transactional state checkpoints and restore
URL: https://github.com/apache/samza/pull/1164#discussion_r332664376
 
 

 ##########
 File path: samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
 ##########
 @@ -0,0 +1,525 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ListMultimap;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfig;
+import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.ChangelogSSPIterator;
+import org.apache.samza.system.SSPMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the state restore based on state snapshots of checkpoints and changelog.
+ */
+public class TransactionalStateTaskRestoreManager implements TaskRestoreManager {
+  private static final Logger LOG = LoggerFactory.getLogger(TransactionalStateTaskRestoreManager.class);
+
+  private final TaskModel taskModel;
+  private final Map<String, StorageEngine> storeEngines; // store name to storage engines
+  private final Map<String, SystemStream> storeChangelogs; // store name to changelog system stream
+  private final SystemAdmins systemAdmins;
+  private final Map<String, SystemConsumer> storeConsumers;
+  private final SSPMetadataCache sspMetadataCache;
+  private final File loggedStoreBaseDirectory;
+  private final File nonLoggedStoreBaseDirectory;
+  private final Config config;
+  private final Clock clock;
+  private final StorageManagerUtil storageManagerUtil;
+  private final FileUtil fileUtil;
+
+  private StoreActions storeActions; // available after init
+
+  public TransactionalStateTaskRestoreManager(
+      TaskModel taskModel,
+      Map<String, StorageEngine> storeEngines,
+      Map<String, SystemStream> storeChangelogs,
+      SystemAdmins systemAdmins,
+      Map<String, SystemConsumer> storeConsumers,
+      SSPMetadataCache sspMetadataCache,
+      File loggedStoreBaseDirectory,
+      File nonLoggedStoreBaseDirectory,
+      Config config,
+      Clock clock) {
+    this.taskModel = taskModel;
+    this.storeEngines = storeEngines;
+    this.storeChangelogs = storeChangelogs;
+    this.systemAdmins = systemAdmins;
+    this.storeConsumers = storeConsumers;
+    // OK to use SSPMetadataCache here since unlike commit newest changelog ssp offsets will not change
+    // between cache init and restore completion
+    this.sspMetadataCache = sspMetadataCache;
+    this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
+    this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory;
+    this.config = config;
+    this.clock = clock;
+    this.storageManagerUtil = new StorageManagerUtil();
+    this.fileUtil = new FileUtil();
+  }
+
+  @Override
+  public void init(Map<SystemStreamPartition, String> checkpointedChangelogOffsets) {
+    Map<SystemStreamPartition, SystemStreamPartitionMetadata> currentChangelogOffsets =
+        getCurrentChangelogOffsets(taskModel, storeChangelogs, sspMetadataCache);
+
+    this.storeActions = getStoreActions(taskModel, storeEngines, storeChangelogs,
+        checkpointedChangelogOffsets, currentChangelogOffsets, systemAdmins, storageManagerUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory, config, clock);
+
+    setupStoreDirs(taskModel, storeEngines, storeActions, storageManagerUtil, fileUtil,
+        loggedStoreBaseDirectory, nonLoggedStoreBaseDirectory);
+    registerStartingOffsets(taskModel, storeActions, storeChangelogs, systemAdmins, storeConsumers, currentChangelogOffsets);
+  }
+
+  @Override
+  public void restore() {
+    Map<String, RestoreOffsets> storesToRestore = storeActions.storesToRestore;
+
+    for (Map.Entry<String, RestoreOffsets> entry : storesToRestore.entrySet()) {
+      String storeName = entry.getKey();
+      String endOffset = entry.getValue().endingOffset;
+      SystemStream systemStream = storeChangelogs.get(storeName);
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem());
+      SystemConsumer systemConsumer = storeConsumers.get(storeName);
+      SystemStreamPartition changelogSSP = new SystemStreamPartition(systemStream, taskModel.getChangelogPartition());
+
+      ChangelogSSPIterator changelogSSPIterator =
+          new ChangelogSSPIterator(systemConsumer, changelogSSP, endOffset, systemAdmin, true);
+      StorageEngine taskStore = storeEngines.get(storeName);
+
+      LOG.info("Restoring store: {} for task: {}", storeName, taskModel.getTaskName());
+      taskStore.restore(changelogSSPIterator);
+    }
+  }
+
+  /**
+   * Stop only persistent stores. In case of certain stores and store mode (such as RocksDB), this
+   * can invoke compaction. Persisted stores are recreated in read-write mode in {@link ContainerStorageManager}.
+   */
+  public void stopPersistentStores() {
+    TaskName taskName = taskModel.getTaskName();
+    storeEngines.forEach((storeName, storeEngine) -> {
+        if (storeEngine.getStoreProperties().isPersistedToDisk())
+          storeEngine.stop();
+        LOG.info("Stopped persistent store: {} in task: {}", storeName, taskName);
+      });
+  }
+
+  /**
+   * Get offset metadata for each changelog SSP for this task. A task may have multiple changelog streams
+   * (e.g., for different stores), but will have the same partition for all of them.
+   */
+  @VisibleForTesting
+  static Map<SystemStreamPartition, SystemStreamPartitionMetadata> getCurrentChangelogOffsets(
+      TaskModel taskModel, Map<String, SystemStream> storeChangelogs, SSPMetadataCache sspMetadataCache) {
+    Map<SystemStreamPartition, SystemStreamPartitionMetadata> changelogOffsets = new HashMap<>();
+
+    Partition changelogPartition = taskModel.getChangelogPartition();
+    for (Map.Entry<String, SystemStream> storeChangelog : storeChangelogs.entrySet()) {
+      SystemStream changelog = storeChangelog.getValue();
+      SystemStreamPartition changelogSSP = new SystemStreamPartition(
+          changelog.getSystem(), changelog.getStream(), changelogPartition);
+      SystemStreamPartitionMetadata metadata = sspMetadataCache.getMetadata(changelogSSP);
+      changelogOffsets.put(changelogSSP, metadata);
+    }
+
+    LOG.info("Got current changelog offsets for taskName: {} as: {}", taskModel.getTaskName(), changelogOffsets);
+    return changelogOffsets;
+  }
+
+  /**
+   * Marks each persistent but non-logged store for deletion.
+   *
+   * For each logged store, based on the current, checkpointed and local changelog offsets,
+   * 1. decides which directories (current and checkpoints) to delete for persistent stores.
+   * 2. decides which directories (checkpoints) to retain for persistent stores.
+   * 3. decides which stores (persistent or not) need to be restored, and the beginning and end offsets for the restore.
+   *
+   * When this method returns, in StoreActions,
+   * 1. all persistent store current directories will be present in storeDirsToDelete
+   * 2. each persistent store checkpoint directory will be present in either storeDirToRetain or storeDirsToDelete.
+   * 3. there will be at most one storeDirToRetain per persistent store, which will be a checkpoint directory.
+   * 4. any stores (persistent or not) that need to be restored from changelogs will be present in
+   *    storesToRestore with appropriate offsets.
+   */
+  @VisibleForTesting
+  static StoreActions getStoreActions(
+      TaskModel taskModel,
+      Map<String, StorageEngine> storeEngines,
+      Map<String, SystemStream> storeChangelogs,
+      Map<SystemStreamPartition, String> checkpointedChangelogOffsets,
+      Map<SystemStreamPartition, SystemStreamPartitionMetadata> currentChangelogOffsets,
+      SystemAdmins systemAdmins,
+      StorageManagerUtil storageManagerUtil,
+      File loggedStoreBaseDirectory,
+      File nonLoggedStoreBaseDirectory,
+      Config config,
+      Clock clock) {
+    TaskName taskName = taskModel.getTaskName();
+    TaskMode taskMode = taskModel.getTaskMode();
+
+    Map<String, File> storeDirToRetain = new HashMap<>();
+    ListMultimap<String, File> storeDirsToDelete = ArrayListMultimap.create();
+    Map<String, RestoreOffsets> storesToRestore = new HashMap<>();
+
+    storeEngines.forEach((storeName, storageEngine) -> {
+        // do nothing if store is non persistent and not logged (e.g. in memory cache only)
+        if (!storageEngine.getStoreProperties().isPersistedToDisk() &&
+          !storageEngine.getStoreProperties().isLoggedStore()) {
+          return;
+        }
+
+        // persistent but non-logged stores are always deleted
+        if (storageEngine.getStoreProperties().isPersistedToDisk() &&
+            !storageEngine.getStoreProperties().isLoggedStore()) {
+          File currentDir = storageManagerUtil.getTaskStoreDir(
+              nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
+          storeDirsToDelete.put(storeName, currentDir);
+          // persistent but non-logged stores should not have checkpoint dirs
+          return;
+        }
+
+        // get the oldest and newest current changelog SSP offsets as well as the checkpointed changelog SSP offset
+        SystemStream changelog = storeChangelogs.get(storeName);
+        SystemStreamPartition changelogSSP = new SystemStreamPartition(changelog, taskModel.getChangelogPartition());
+        SystemAdmin admin = systemAdmins.getSystemAdmin(changelogSSP.getSystem());
+        SystemStreamPartitionMetadata changelogSSPMetadata = currentChangelogOffsets.get(changelogSSP);
+        String oldestOffset = changelogSSPMetadata.getOldestOffset();
+        String newestOffset = changelogSSPMetadata.getNewestOffset();
+        String checkpointedOffset = checkpointedChangelogOffsets.get(changelogSSP);
+
+
+        Optional<File> currentDirOptional;
+        Optional<List<File>> checkpointDirsOptional;
+
+        if (!storageEngine.getStoreProperties().isPersistedToDisk()) {
+          currentDirOptional = Optional.empty();
+          checkpointDirsOptional = Optional.empty();
+        } else {
+          currentDirOptional = Optional.of(storageManagerUtil.getTaskStoreDir(
+              loggedStoreBaseDirectory, storeName, taskName, taskMode));
+          checkpointDirsOptional = Optional.of(storageManagerUtil.getTaskStoreCheckpointDirs(
+              loggedStoreBaseDirectory, storeName, taskName, taskMode));
+        }
+
+        LOG.info("For store: {} in task: {} got current dir: {}, checkpoint dirs: {}, checkpointed changelog offset: {}",
+            storeName, taskName, currentDirOptional, checkpointDirsOptional, checkpointedOffset);
+
+        // TODO BLOCKER pmaheshw: will do full restore from changelog even if retain existing state == true
+        // always delete current logged store dir for persistent stores.
+        currentDirOptional.ifPresent(currentDir -> storeDirsToDelete.put(storeName, currentDir));
+
+        // first check if checkpointed offset is invalid (i.e., out of range of current offsets, or null)
+        if (checkpointedOffset == null && oldestOffset != null) {
 
 Review comment:
   Just FYI, current KafkaSystemAdmin does not follow the SSPMetadata interface, and returns oldest offset == 0 for an empty topic.
   
   To your point, if you change the changelog topic for a store to an empty topic, this is the behavior:
   1. Checkpointed offset will be null, since the checkpoint will not have an offset for the new changelog SSP.
   2. Oldest offset may be non-zero, signifying that topic contains data, or null / 0, signifying that the topic is empty.
   3. If oldest is non-zero, behavior depends on the retain.existing.changelog.state config. I renamed the config to clarify that if true, this will overwrite local state with the changelog topic state. This is also the case when you turn on the config for the first time, without writing changelog offests in checkpoints first.
   4. If oldest is null or 0, it will fall through to the 'happy path', delete all local state, and do a full restore, which in this case will do nothing, which is the desired behavior.
   
   I'll add an explicit unit test to cover the null oldest offset scenario.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services