You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2019/10/04 19:12:50 UTC

[GitHub] [samza] xinyuiscool commented on a change in pull request #1164: [WIP] Transactional State [5/5]: Added implementations for transactional state checkpoints and restore

xinyuiscool commented on a change in pull request #1164: [WIP] Transactional State [5/5]: Added implementations for transactional state checkpoints and restore
URL: https://github.com/apache/samza/pull/1164#discussion_r331580528
 
 

 ##########
 File path: samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java
 ##########
 @@ -293,11 +327,57 @@ public static boolean storeExists(File storeDir) {
    * @param taskMode the mode of the given task
    * @return the partition directory for the store
    */
-  public static File getStorePartitionDir(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode) {
+  public File getTaskStoreDir(File storeBaseDir, String storeName, TaskName taskName, TaskMode taskMode) {
     TaskName taskNameForDirName = taskName;
     if (taskMode.equals(TaskMode.Standby)) {
       taskNameForDirName =  StandbyTaskUtil.getActiveTaskName(taskName);
     }
     return new File(storeBaseDir, (storeName + File.separator + taskNameForDirName.toString()).replace(' ', '_'));
   }
+
+  public List<File> getTaskStoreCheckpointDirs(File storeBaseDir, String storeName,
+      TaskName taskName, TaskMode taskMode) {
+    try {
+      File storeDir = new File(storeBaseDir, storeName);
+      String taskStoreName = getTaskStoreDir(storeBaseDir, storeName, taskName, taskMode).getName();
+
+      if (storeDir.exists()) { // new store or no local state
+        return Files.list(storeDir.toPath())
+            .map(Path::toFile)
+            .filter(file -> file.getName().contains(taskStoreName + "-"))
+            .collect(Collectors.toList());
+      } else {
+        return Collections.emptyList();
+      }
+    } catch (IOException e) {
+      throw new SamzaException(
+          String.format("Error finding checkpoint dirs for task: %s mode: %s store: %s in dir: %s",
+              taskName, taskMode, storeName, storeBaseDir), e);
+    }
+  }
+
+  public void moveCheckpointFiles(File checkpointDir, File storeDir) {
 
 Review comment:
   I guess it's better to name it restoreCheckpointFiles instead of move, since we don't move files here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services