You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/02/20 00:55:04 UTC
[samza] branch master updated: SAMZA-2419: removing duplicate
cleaning of stores (#1236)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 66f3b8e SAMZA-2419: removing duplicate cleaning of stores (#1236)
66f3b8e is described below
commit 66f3b8ec9dfcf88e30577d7724ab8da5976943f1
Author: bkonold <bk...@users.noreply.github.com>
AuthorDate: Wed Feb 19 16:54:55 2020 -0800
SAMZA-2419: removing duplicate cleaning of stores (#1236)
---
.../TransactionalStateTaskRestoreManager.java | 27 ++++++++++------------
.../TestTransactionalStateTaskRestoreManager.java | 15 ++++++++++--
2 files changed, 25 insertions(+), 17 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
index 8133fac..c578d9a 100644
--- a/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/main/java/org/apache/samza/storage/TransactionalStateTaskRestoreManager.java
@@ -243,30 +243,27 @@ public class TransactionalStateTaskRestoreManager implements TaskRestoreManager
timeSinceLastCheckpointInMs = System.currentTimeMillis() -
checkpointedChangelogOffset.getCheckpointId().getMillis();
}
-
- // if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed
- if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(
- config).getCleanLoggedStoreDirsOnStart(storeName)) {
- File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
- LOG.info("Marking current directory: {} for store: {} in task: {}.", currentDir, storeName, taskName);
- storeDirsToDelete.put(storeName, currentDir);
- LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", storeName, taskName, oldestOffset, checkpointedOffset);
- storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
- return;
- }
- // if the clean.store.start config is set, delete the currentDir, restore from oldest offset to checkpointed
+ // if the clean.store.start config is set, delete current and checkpoint dirs, restore from oldest offset to checkpointed
if (storageEngine.getStoreProperties().isPersistedToDisk() && new StorageConfig(
config).getCleanLoggedStoreDirsOnStart(storeName)) {
- File currentDir = storageManagerUtil.getTaskStoreDir(nonLoggedStoreBaseDirectory, storeName, taskName, taskMode);
- LOG.info("Marking current directory: {} for store: {} in task: {}.", currentDir, storeName, taskName);
+ File currentDir = storageManagerUtil.getTaskStoreDir(loggedStoreBaseDirectory, storeName, taskName, taskMode);
+ LOG.info("Marking current directory: {} for store: {} in task: {} for deletion due to clean.on.container.start config.",
+ currentDir, storeName, taskName);
storeDirsToDelete.put(storeName, currentDir);
+
+ storageManagerUtil.getTaskStoreCheckpointDirs(loggedStoreBaseDirectory, storeName, taskName, taskMode)
+ .forEach(checkpointDir -> {
+ LOG.info("Marking checkpoint directory: {} for store: {} in task: {} for deletion due to clean.on.container.start config.",
+ checkpointDir, storeName, taskName);
+ storeDirsToDelete.put(storeName, checkpointDir);
+ });
+
LOG.info("Marking restore offsets for store: {} in task: {} to {}, {} ", storeName, taskName, oldestOffset, checkpointedOffset);
storesToRestore.put(storeName, new RestoreOffsets(oldestOffset, checkpointedOffset));
return;
}
-
Optional<File> currentDirOptional;
Optional<List<File>> checkpointDirsOptional;
diff --git a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
index 0bb88d5..c37cca3 100644
--- a/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
+++ b/samza-core/src/test/java/org/apache/samza/storage/TestTransactionalStateTaskRestoreManager.java
@@ -38,6 +38,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.TaskName;
+import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.storage.TransactionalStateTaskRestoreManager.RestoreOffsets;
import org.apache.samza.storage.TransactionalStateTaskRestoreManager.StoreActions;
@@ -166,6 +167,7 @@ public class TestTransactionalStateTaskRestoreManager {
when(mockTaskModel.getTaskName()).thenReturn(taskName);
Partition taskChangelogPartition = new Partition(0);
when(mockTaskModel.getChangelogPartition()).thenReturn(taskChangelogPartition);
+ when(mockTaskModel.getTaskMode()).thenReturn(TaskMode.Active);
String store1Name = "store1";
StorageEngine store1Engine = mock(StorageEngine.class);
@@ -208,13 +210,22 @@ public class TestTransactionalStateTaskRestoreManager {
return Long.valueOf(offset1).compareTo(Long.valueOf(offset2));
});
+ File dummyCurrentDir = new File("currentDir");
+ File dummyCheckpointDir = new File("checkpointDir1");
+ when(mockStorageManagerUtil.getTaskStoreDir(mockLoggedStoreBaseDir, store1Name, taskName, TaskMode.Active))
+ .thenReturn(dummyCurrentDir);
+ when(mockStorageManagerUtil.getTaskStoreCheckpointDirs(mockLoggedStoreBaseDir, store1Name, taskName, TaskMode.Active))
+ .thenReturn(ImmutableList.of(dummyCheckpointDir));
+
StoreActions storeActions = TransactionalStateTaskRestoreManager.getStoreActions(
mockTaskModel, mockStoreEngines, mockStoreChangelogs, mockCheckpointedChangelogOffset,
mockCurrentChangelogOffsets, mockSystemAdmins, mockStorageManagerUtil,
mockLoggedStoreBaseDir, mockNonLoggedStoreBaseDir, mockConfig, mockClock);
- // ensure that there is one directory to delete
- assertEquals(1, storeActions.storeDirsToDelete.size());
+ // ensure that current and checkpoint directories are marked for deletion
+ assertEquals(2, storeActions.storeDirsToDelete.size());
+ assertTrue(storeActions.storeDirsToDelete.containsValue(dummyCheckpointDir));
+ assertTrue(storeActions.storeDirsToDelete.containsValue(dummyCurrentDir));
// ensure that we restore from the oldest changelog offset to checkpointed changelog offset
assertEquals("0", storeActions.storesToRestore.get(store1Name).startingOffset);
assertEquals(changelog1CheckpointedOffset, storeActions.storesToRestore.get(store1Name).endingOffset);