You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/06/04 20:23:43 UTC

[GitHub] nlu90 closed pull request #2910: make LocalFSStorage clean checkpoints before store

nlu90 closed pull request #2910: make LocalFSStorage clean checkpoints before store
URL: https://github.com/apache/incubator-heron/pull/2910
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/config/src/yaml/conf/kubernetes/stateful.yaml b/heron/config/src/yaml/conf/kubernetes/stateful.yaml
index 417b882ec2..a00d9f6557 100644
--- a/heron/config/src/yaml/conf/kubernetes/stateful.yaml
+++ b/heron/config/src/yaml/conf/kubernetes/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
 heron.statefulstorage.config:
   heron.statefulstorage.classpath: ""
   heron.statefulstorage.localfs.root.path: "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/local/stateful.yaml b/heron/config/src/yaml/conf/local/stateful.yaml
index d72e496afa..95083dd207 100644
--- a/heron/config/src/yaml/conf/local/stateful.yaml
+++ b/heron/config/src/yaml/conf/local/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   ~/.herondata/checkpoints
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/localzk/stateful.yaml b/heron/config/src/yaml/conf/localzk/stateful.yaml
index d72e496afa..95083dd207 100644
--- a/heron/config/src/yaml/conf/localzk/stateful.yaml
+++ b/heron/config/src/yaml/conf/localzk/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   ~/.herondata/checkpoints
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/marathon/stateful.yaml b/heron/config/src/yaml/conf/marathon/stateful.yaml
index 4ea3bfbe62..ce8785b0f3 100644
--- a/heron/config/src/yaml/conf/marathon/stateful.yaml
+++ b/heron/config/src/yaml/conf/marathon/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/mesos/stateful.yaml b/heron/config/src/yaml/conf/mesos/stateful.yaml
index 4ea3bfbe62..ce8785b0f3 100644
--- a/heron/config/src/yaml/conf/mesos/stateful.yaml
+++ b/heron/config/src/yaml/conf/mesos/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/nomad/stateful.yaml b/heron/config/src/yaml/conf/nomad/stateful.yaml
index 417b882ec2..a00d9f6557 100644
--- a/heron/config/src/yaml/conf/nomad/stateful.yaml
+++ b/heron/config/src/yaml/conf/nomad/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
 heron.statefulstorage.config:
   heron.statefulstorage.classpath: ""
   heron.statefulstorage.localfs.root.path: "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/sandbox/stateful.yaml b/heron/config/src/yaml/conf/sandbox/stateful.yaml
index d72e496afa..95083dd207 100644
--- a/heron/config/src/yaml/conf/sandbox/stateful.yaml
+++ b/heron/config/src/yaml/conf/sandbox/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   ~/.herondata/checkpoints
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/slurm/stateful.yaml b/heron/config/src/yaml/conf/slurm/stateful.yaml
index 4ea3bfbe62..ce8785b0f3 100644
--- a/heron/config/src/yaml/conf/slurm/stateful.yaml
+++ b/heron/config/src/yaml/conf/slurm/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/standalone/stateful.yaml b/heron/config/src/yaml/conf/standalone/stateful.yaml
index 417b882ec2..a00d9f6557 100644
--- a/heron/config/src/yaml/conf/standalone/stateful.yaml
+++ b/heron/config/src/yaml/conf/standalone/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
 heron.statefulstorage.config:
   heron.statefulstorage.classpath: ""
   heron.statefulstorage.localfs.root.path: "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/yarn/stateful.yaml b/heron/config/src/yaml/conf/yarn/stateful.yaml
index 4ea3bfbe62..ce8785b0f3 100644
--- a/heron/config/src/yaml/conf/yarn/stateful.yaml
+++ b/heron/config/src/yaml/conf/yarn/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
index 6e40053d83..60fdc53d6d 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
@@ -20,6 +20,7 @@
 package org.apache.heron.statefulstorage.localfs;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.logging.Logger;
 
@@ -32,20 +33,28 @@
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
-public class LocalFileSystemStorage implements IStatefulStorage {
+public class  LocalFileSystemStorage implements IStatefulStorage {
   private static final Logger LOG = Logger.getLogger(LocalFileSystemStorage.class.getName());
 
   private static final String ROOT_PATH_KEY = "heron.statefulstorage.localfs.root.path";
+  private static final String MAX_CHECKPOINTS_KEY = "heron.statefulstorage.localfs.max.checkpoints";
+
+  private static final int DEFAULT_MAX_CHECKPOINTS = 10;
 
   private String checkpointRootPath;
+  private int maxCheckpoints;
 
   @Override
   public void init(Map<String, Object> conf) throws StatefulStorageException {
     checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
+    maxCheckpoints = (int) conf.getOrDefault(MAX_CHECKPOINTS_KEY, DEFAULT_MAX_CHECKPOINTS);
+
     if (checkpointRootPath != null) {
-      checkpointRootPath = checkpointRootPath.replaceFirst("^~", System.getProperty("user.home"));
+      checkpointRootPath =
+          checkpointRootPath.replaceFirst("^~", System.getProperty("user.home"));
     }
-    LOG.info("Initialing... Checkpoint root path: " + checkpointRootPath);
+    LOG.info("Initialing LocalFileSystemStorage with Checkpoint root path: "
+        + checkpointRootPath + " Max checkpoints: " + maxCheckpoints);
   }
 
   @Override
@@ -55,6 +64,11 @@ public void close() {
 
   @Override
   public void store(Checkpoint checkpoint) throws StatefulStorageException {
+    // heron doesn't clean checkpoints stored on local disk automatically
+    // localFS cleans checkpoints before store and limits the number of checkpoints saved
+    String rootPath = getTopologyCheckpointRoot(checkpoint.getTopologyName());
+    cleanCheckpoints(new File(rootPath), maxCheckpoints);
+
     String path = getCheckpointPath(checkpoint.getTopologyName(), checkpoint.getCheckpointId(),
                                     checkpoint.getComponent(), checkpoint.getTaskId());
 
@@ -134,6 +148,24 @@ public void dispose(String topologyName, String oldestCheckpointPreserved,
     }
   }
 
+  protected void cleanCheckpoints(File rootFile, int remaining) throws StatefulStorageException {
+    if (FileUtils.isDirectoryExists(rootFile.getAbsolutePath())
+        && FileUtils.hasChildren(rootFile.getAbsolutePath())) {
+      String[] children = rootFile.list();
+      Arrays.sort(children);
+
+      // only keep the latest N remaining files, delete others
+      for (int i = 0; i < children.length - remaining; i++) {
+        File ckptFile = new File(rootFile.getAbsolutePath(), children[i]);
+        FileUtils.deleteDir(ckptFile, true);
+
+        if (FileUtils.isDirectoryExists(ckptFile.getAbsolutePath())) {
+          throw new StatefulStorageException("Failed to delete " + ckptFile.getAbsolutePath());
+        }
+      }
+    }
+  }
+
   private String getTopologyCheckpointRoot(String topologyName) {
     return String.format("%s/%s", checkpointRootPath, topologyName);
   }
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
index eb4c4af093..2875f64f83 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.heron.statefulstorage.localfs;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -79,6 +80,7 @@ public void testStore() throws Exception {
     PowerMockito.doReturn(true).when(FileUtils.class, "isDirectoryExists", anyString());
     PowerMockito.doReturn(true)
         .when(FileUtils.class, "writeToFile", anyString(), any(byte[].class), anyBoolean());
+    PowerMockito.doReturn(false).when(FileUtils.class, "hasChildren", anyString());
 
     Checkpoint mockCheckpoint = mock(Checkpoint.class);
     when(mockCheckpoint.getCheckpoint()).thenReturn(checkpoint);
@@ -89,6 +91,27 @@ public void testStore() throws Exception {
     FileUtils.writeToFile(anyString(), eq(checkpoint.toByteArray()), eq(true));
   }
 
+  @Test
+  public void testCleanCheckpoints() throws Exception {
+    String fakeRootPath = "/fake/root/path";
+
+    PowerMockito.spy(FileUtils.class);
+    PowerMockito.doReturn(true).when(FileUtils.class, "isDirectoryExists", fakeRootPath);
+    PowerMockito.doReturn(true).when(FileUtils.class, "hasChildren", anyString());
+    PowerMockito.doReturn(true).when(FileUtils.class, "deleteDir", any(File.class), anyBoolean());
+
+    File mockRootFile = mock(File.class);
+    when(mockRootFile.getAbsolutePath()).thenReturn(fakeRootPath);
+    String[] files = {"1", "2", "3"};
+    when(mockRootFile.list()).thenReturn(files);
+
+    localFileSystemStorage.cleanCheckpoints(mockRootFile, 1);
+
+    PowerMockito.verifyStatic(times(1));
+    FileUtils.deleteDir(new File(String.format("%s%s%s", fakeRootPath, File.separator, "1")), true);
+    FileUtils.deleteDir(new File(String.format("%s%s%s", fakeRootPath, File.separator, "2")), true);
+  }
+
   @Test
   public void testRestore() throws Exception {
     PowerMockito.spy(FileUtils.class);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services