You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/02/20 00:55:04 UTC

[samza] branch master updated: SAMZA-2419: removing duplicate cleaning of stores (#1236)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 66f3b8e  SAMZA-2419: removing duplicate cleaning of stores (#1236)
66f3b8e is described below

commit 66f3b8ec9dfcf88e30577d7724ab8da5976943f1
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Wed Feb 19 16:54:55 2020 -0800

    SAMZA-2419: removing duplicate cleaning of stores (#1236)
---
 .../TransactionalStateTaskRestoreManager.java      | 27 ++++++++++------------
 .../TestTransactionalStateTaskRestoreManager.java  | 15 ++++++++++--
 2 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 8133fac..c578d9a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -243,30 +243,27 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
           timeSinceLastCheckpointInMs = System.currentTimeMillis() -
               checkpointedChangelogOffset.getCheckpointId().getMillis();
         }
-      
-        // if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed
-        if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(
-          config).getCleanLoggedStoreDirsOnStart(storeName)) {
-          File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
-          LOG.info("Marking current directory: {} for store: {} in task: {}.", currentDir, storeName, taskName);
-          storeDirsToDelete.put(storeName, currentDir);
-          LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", storeName, taskName, oldestOffset, checkpointedOffset);
-          storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
-          return;
-        }
 
-        // if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed
+        // if the clean.store.start config is set, delete current and checkpoint dirs, restore from oldest offset to checkpointed
         if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(
           config).getCleanLoggedStoreDirsOnStart(storeName)) {
-          File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
-          LOG.info("Marking current directory: {} for store: {} in task: {}.", currentDir, storeName, taskName);
+          File currentDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskName, taskMode);
+          LOG.info("Marking current directory: {} for store: {} in task: {} for deletion due to clean.on.container.start config.",
+              currentDir, storeName, taskName);
           storeDirsToDelete.put(storeName, currentDir);
+
+          storageManagerUtil.getTaskStoreCheckpointDirs(loggedStoreBaseDirectory, storeName, taskName, taskMode)
+              .forEach(checkpointDir -> {
+                  LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion due to clean.on.container.start config.",
+                      checkpointDir, storeName, taskName);
+                  storeDirsToDelete.put(storeName, checkpointDir);
+                });
+
           LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", storeName, taskName, oldestOffset, checkpointedOffset);
           storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
           return;
         }
 
-
         Optional<File> currentDirOptional;
         Optional<List<File>> checkpointDirsOptional;
 
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
index 0bb88d5..c37cca3 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
@@ -38,6 +38,7 @@ import org.apache.samza.config.Config;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
 import org.apache.samza.job.model.TaskModel;
 import org.apache.samza.storage.TransactionalStateTaskRestoreManager.RestoreOffsets;
 import org.apache.samza.storage.TransactionalStateTaskRestoreManager.StoreActions;
@@ -166,6 +167,7 @@ public class TestTransactionalStateTaskRestoreManager {
     when(mockTaskModel.getTaskName()).thenReturn(taskName);
     Partition taskChangelogPartition = new Partition(0);
     when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+    when(mockTaskModel.getTaskMode()).thenReturn(TaskMode.Active);
 
     String store1Name = "store1";
     StorageEngine store1Engine = mock(StorageEngine.class);
@@ -208,13 +210,22 @@ public class TestTransactionalStateTaskRestoreManager {
             return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
           });
 
+    File dummyCurrentDir = new File("currentDir");
+    File dummyCheckpointDir = new File("checkpointDir1");
+    when(mockStorageManagerUtil.getTaskStoreDir(mockLoggedStoreBaseDir, store1Name, taskName, TaskMode.Active))
+        .thenReturn(dummyCurrentDir);
+    when(mockStorageManagerUtil.getTaskStoreCheckpointDirs(mockLoggedStoreBaseDir, store1Name, taskName, TaskMode.Active))
+        .thenReturn(ImmutableList.of(dummyCheckpointDir));
+
     StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions(
         mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset,
         mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
         mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock);
 
-    // ensure that there is one directory to delete
-    assertEquals(1, storeActions.storeDirsToDelete.size());
+    // ensure that current and checkpoint directories are marked for deletion
+    assertEquals(2, storeActions.storeDirsToDelete.size());
+    assertTrue(storeActions.storeDirsToDelete.containsValue(dummyCheckpointDir));
+    assertTrue(storeActions.storeDirsToDelete.containsValue(dummyCurrentDir));
     // ensure that we restore from the oldest changelog offset to checkpointed changelog offset
     assertEquals("0", storeActions.storesToRestore.get(store1Name).startingOffset);
     assertEquals(changelog1CheckpointedOffset, storeActions.storesToRestore.get(store1Name).endingOffset);