You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by GitBox <> on 2019/10/02 02:10:40 UTC

[GitHub] [samza] prateekm commented on a change in pull request #1163: Transactional State [4/5]: Added new interfaces for TaskRestoreManager and TaskStorageManager

prateekm commented on a change in pull request #1163: Transactional State [4/5]: Added new interfaces for TaskRestoreManager and TaskStorageManager

 File path: samza-core/src/main/java/org/apache/samza/storage/
 @@ -0,0 +1,349 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.samza.Partition;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.StorageConfig;
+import org.apache.samza.job.model.TaskModel;
+import org.apache.samza.system.ChangelogSSPIterator;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.StreamSpec;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemAdmins;
+import org.apache.samza.system.SystemConsumer;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.system.SystemStreamMetadata;
+import org.apache.samza.system.SystemStreamPartition;
+import org.apache.samza.util.Clock;
+import org.apache.samza.util.FileUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+ * This is the legacy state restoration, based on changelog and offset files.
+ *
+ * Restore logic for all stores of a task including directory cleanup, setup, changelogSSP validation, registering
+ * with the respective consumer, restoring stores, and stopping stores.
+ */
+class NonTransactionalStateTaskRestoreManager implements TaskRestoreManager {
+  private static final Logger LOG = LoggerFactory.getLogger(NonTransactionalStateTaskRestoreManager.class);
+  private final Map<String, StorageEngine> taskStores; // Map of all StorageEngines for this task indexed by store name
+  private final Set<String> taskStoresToRestore;
+  // Set of store names which need to be restored by consuming using system-consumers (see registerStartingOffsets)
+  private final TaskModel taskModel;
+  private final Clock clock; // Clock value used to validate base-directories for staleness. See isLoggedStoreValid.
+  private Map<SystemStream, String> changeLogOldestOffsets; // Map of changelog oldest known offsets
+  private final Map<SystemStreamPartition, String> fileOffsets; // Map of offsets read from offset file indexed by changelog SSP
+  private final Map<String, SystemStream> changelogSystemStreams; // Map of change log system-streams indexed by store name
+  private final SystemAdmins systemAdmins;
+  private final File loggedStoreBaseDirectory;
+  private final File nonLoggedStoreBaseDirectory;
+  private final StreamMetadataCache streamMetadataCache;
+  private final Map<String, SystemConsumer> storeConsumers;
+  private final int maxChangeLogStreamPartitions;
+  private final Config config;
+  private final StorageManagerUtil storageManagerUtil;
+  NonTransactionalStateTaskRestoreManager(
+      TaskModel taskModel,
+      Map<String, SystemStream> changelogSystemStreams,
+      Map<String, StorageEngine> taskStores,
+      SystemAdmins systemAdmins,
+      StreamMetadataCache streamMetadataCache,
+      Map<String, SystemConsumer> storeConsumers,
+      int maxChangeLogStreamPartitions,
+      File loggedStoreBaseDirectory,
+      File nonLoggedStoreBaseDirectory,
+      Config config,
+      Clock clock) {
+    this.taskStores = taskStores;
+    this.taskModel = taskModel;
+    this.clock = clock;
+    this.changelogSystemStreams = changelogSystemStreams;
+    this.systemAdmins = systemAdmins;
+    this.fileOffsets = new HashMap<>();
+    this.taskStoresToRestore = this.taskStores.entrySet().stream()
+        .filter(x -> x.getValue().getStoreProperties().isLoggedStore())
+        .map(x -> x.getKey()).collect(Collectors.toSet());
+    this.loggedStoreBaseDirectory = loggedStoreBaseDirectory;
+    this.nonLoggedStoreBaseDirectory = nonLoggedStoreBaseDirectory;
+    this.streamMetadataCache = streamMetadataCache;
+    this.storeConsumers = storeConsumers;
+    this.maxChangeLogStreamPartitions = maxChangeLogStreamPartitions;
+    this.config = config;
+    this.storageManagerUtil = new StorageManagerUtil();
+  }
+  /**
+   * Cleans up and sets up store directories, validates changeLog SSPs for all stores of this task,
+   * and registers SSPs with the respective consumers.
+   */
+  @Override
+  public void init(Map<SystemStreamPartition, String> checkpointedChangelogSSPOffsets) {
+    cleanBaseDirsAndReadOffsetFiles();
+    setupBaseDirs();
+    validateChangelogStreams();
+    getOldestChangeLogOffsets();
+    registerStartingOffsets();
+  }
+  /**
+   * For each store for this task,
+   * a. Deletes the corresponding non-logged-store base dir.
+   * b. Deletes the logged-store-base-dir if it not valid. See {@link #isLoggedStoreValid} for validation semantics.
+   * c. If the logged-store-base-dir is valid, this method reads the offset file and stores each offset.
+   */
+  private void cleanBaseDirsAndReadOffsetFiles() {
+    LOG.debug("Cleaning base directories for stores.");
+    FileUtil fileUtil = new FileUtil();
+    taskStores.forEach((storeName, storageEngine) -> {
+        if (!storageEngine.getStoreProperties().isLoggedStore()) {
+          File nonLoggedStorePartitionDir =
+              storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+"Got non logged storage partition directory as " + nonLoggedStorePartitionDir.toPath().toString());
+          if (nonLoggedStorePartitionDir.exists()) {
+  "Deleting non logged storage partition directory " + nonLoggedStorePartitionDir.toPath().toString());
+            fileUtil.rm(nonLoggedStorePartitionDir);
+          }
+        } else {
+          File loggedStorePartitionDir =
+              storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+"Got logged storage partition directory as " + loggedStorePartitionDir.toPath().toString());
+          // Delete the logged store if it is not valid.
+          if (!isLoggedStoreValid(storeName, loggedStorePartitionDir)) {
+  "Deleting logged storage partition directory " + loggedStorePartitionDir.toPath().toString());
+            fileUtil.rm(loggedStorePartitionDir);
+          } else {
+            SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
+            Map<SystemStreamPartition, String> offset =
+                storageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP), false);
+  "Read offset {} for the store {} from logged storage partition directory {}", offset, storeName, loggedStorePartitionDir);
+            if (offset.containsKey(changelogSSP)) {
+              fileOffsets.put(changelogSSP, offset.get(changelogSSP));
+            }
+          }
+        }
+      });
+  }
+  /**
+   * Directory loggedStoreDir associated with the logged store storeName is determined to be valid
+   * if all of the following conditions are true.
+   * a) If the store has to be persisted to disk.
+   * b) If there is a valid offset file associated with the logged store.
+   * c) If the logged store has not gone stale.
+   *
+   * @return true if the logged store is valid, false otherwise.
+   */
+  private boolean isLoggedStoreValid(String storeName, File loggedStoreDir) {
+    long changeLogDeleteRetentionInMs = new StorageConfig(config).getChangeLogDeleteRetentionInMs(storeName);
+    if (changelogSystemStreams.containsKey(storeName)) {
+      SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
+      return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk()
+          && storageManagerUtil.isOffsetFileValid(loggedStoreDir, Collections.singleton(changelogSSP), false)
+          && !storageManagerUtil.isStaleStore(loggedStoreDir, changeLogDeleteRetentionInMs, clock.currentTimeMillis(), false);
+    }
+    return false;
+  }
+  /**
+   * Create stores' base directories for logged-stores if they dont exist.
+   */
+  private void setupBaseDirs() {
+    LOG.debug("Setting up base directories for stores.");
+    taskStores.forEach((storeName, storageEngine) -> {
+        if (storageEngine.getStoreProperties().isLoggedStore()) {
+          File loggedStorePartitionDir =
+              storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+"Using logged storage partition directory: " + loggedStorePartitionDir.toPath().toString()
+              + " for store: " + storeName);
+          if (!loggedStorePartitionDir.exists()) {
+            loggedStorePartitionDir.mkdirs();
+          }
+        } else {
+          File nonLoggedStorePartitionDir =
+              storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskModel.getTaskName(), taskModel.getTaskMode());
+"Using non logged storage partition directory: " + nonLoggedStorePartitionDir.toPath().toString()
+              + " for store: " + storeName);
+          nonLoggedStorePartitionDir.mkdirs();
+        }
+      });
+  }
+  /**
+   *  Validates each changelog system-stream with its respective SystemAdmin.
+   */
+  private void validateChangelogStreams() {
+"Validating change log streams: " + changelogSystemStreams);
+    for (SystemStream changelogSystemStream : changelogSystemStreams.values()) {
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(changelogSystemStream.getSystem());
+      StreamSpec changelogSpec =
+          StreamSpec.createChangeLogStreamSpec(changelogSystemStream.getStream(), changelogSystemStream.getSystem(),
+              maxChangeLogStreamPartitions);
+      systemAdmin.validateStream(changelogSpec);
+    }
+  }
+  /**
+   * Get the oldest offset for each changelog SSP based on the stream's metadata (obtained from streamMetadataCache).
+   */
+  private void getOldestChangeLogOffsets() {
+    Map<SystemStream, SystemStreamMetadata> changeLogMetadata = JavaConverters.mapAsJavaMapConverter(
+        streamMetadataCache.getStreamMetadata(
+            JavaConverters.asScalaSetConverter(new HashSet<>(changelogSystemStreams.values())).asScala().toSet(),
+            false)).asJava();
+"Got change log stream metadata: {}", changeLogMetadata);
+    changeLogOldestOffsets =
+        getChangeLogOldestOffsetsForPartition(taskModel.getChangelogPartition(), changeLogMetadata);
+"Assigning oldest change log offsets for taskName {} : {}", taskModel.getTaskName(),
+        changeLogOldestOffsets);
+  }
+  /**
+   * Builds a map from SystemStreamPartition to oldest offset for changelogs.
+   */
+  private Map<SystemStream, String> getChangeLogOldestOffsetsForPartition(Partition partition,
+      Map<SystemStream, SystemStreamMetadata> inputStreamMetadata) {
+    Map<SystemStream, String> retVal = new HashMap<>();
+    // NOTE: do not use Collectors.Map because of
+    inputStreamMetadata.entrySet()
+        .stream()
+        .filter(x -> x.getValue().getSystemStreamPartitionMetadata().get(partition) != null)
+        .forEach(e -> retVal.put(e.getKey(),
+            e.getValue().getSystemStreamPartitionMetadata().get(partition).getOldestOffset()));
+    return retVal;
+  }
+  /**
+   * Determines the starting offset for each store SSP (based on {@link #getStartingOffset(SystemStreamPartition, SystemAdmin)}) and
+   * registers it with the respective SystemConsumer for starting consumption.
+   */
+  private void registerStartingOffsets() {
+    for (Map.Entry<String, SystemStream> changelogSystemStreamEntry : changelogSystemStreams.entrySet()) {
+      SystemStreamPartition systemStreamPartition =
+          new SystemStreamPartition(changelogSystemStreamEntry.getValue(), taskModel.getChangelogPartition());
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(changelogSystemStreamEntry.getValue().getSystem());
+      SystemConsumer systemConsumer = storeConsumers.get(changelogSystemStreamEntry.getKey());
+      String offset = getStartingOffset(systemStreamPartition, systemAdmin);
+      if (offset != null) {
+"Registering change log consumer with offset " + offset + " for %" + systemStreamPartition);
+        systemConsumer.register(systemStreamPartition, offset);
+      } else {
+"Skipping change log restoration for {} because stream appears to be empty (offset was null).",
+            systemStreamPartition);
+        taskStoresToRestore.remove(changelogSystemStreamEntry.getKey());
+      }
+    }
+  }
+  /**
+   * Returns the offset with which the changelog consumer should be initialized for the given SystemStreamPartition.
+   *
+   * If a file offset exists, it represents the last changelog offset which is also reflected in the on-disk state.
+   * In that case, we use the next offset after the file offset, as long as it is newer than the oldest offset
+   * currently available in the stream.
+   *
+   * If there isn't a file offset or it's older than the oldest available offset, we simply start with the oldest.
+   *
+   * @param systemStreamPartition  the changelog partition for which the offset is needed.
+   * @param systemAdmin                  the [[SystemAdmin]] for the changelog.
+   * @return the offset to from which the changelog consumer should be initialized.
+   */
+  private String getStartingOffset(SystemStreamPartition systemStreamPartition, SystemAdmin systemAdmin) {
+    String fileOffset = fileOffsets.get(systemStreamPartition);
+    // NOTE: changeLogOldestOffsets may contain a null-offset for the given SSP (signifying an empty stream)
+    // therefore, we need to differentiate that from the case where the offset is simply missing
+    if (!changeLogOldestOffsets.containsKey(systemStreamPartition.getSystemStream())) {
+      throw new SamzaException("Missing a change log offset for " + systemStreamPartition);
+    }
+    String oldestOffset = changeLogOldestOffsets.get(systemStreamPartition.getSystemStream());
+    return storageManagerUtil.getStartingOffset(systemStreamPartition, systemAdmin, fileOffset, oldestOffset);
+  }
+  /**
+   * Restore each store in taskStoresToRestore sequentially
+   */
+  @Override
+  public void restore() {
+    for (String storeName : taskStoresToRestore) {
+"Restoring store: {} for task: {}", storeName, taskModel.getTaskName());
+      SystemConsumer systemConsumer = storeConsumers.get(storeName);
+      SystemStream systemStream = changelogSystemStreams.get(storeName);
+      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(systemStream.getSystem());
+      ChangelogSSPIterator changelogSSPIterator = new ChangelogSSPIterator(systemConsumer,
+          new SystemStreamPartition(systemStream, taskModel.getChangelogPartition()), null, systemAdmin, false);
+      taskStores.get(storeName).restore(changelogSSPIterator);
+    }
+  }
+  /**
+   * Stop only persistent stores. In case of certain stores and store mode (such as RocksDB), this
+   * can invoke compaction.
+   */
+  public void stopPersistentStores() {
 Review comment:
   This should be the same as the existing usage in ContainerStorageManager. What breakage do you have in mind?

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

With regards,
Apache Git Services