You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/06/04 20:23:45 UTC

[incubator-heron] branch master updated: make LocalFSStorage clean checkpoints before store (#2910)

This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new c13198b  make LocalFSStorage clean checkpoints before store (#2910)
c13198b is described below

commit c13198b79037246d5baecd8cc35f5454b5db27d5
Author: Neng Lu <fr...@gmail.com>
AuthorDate: Mon Jun 4 13:23:41 2018 -0700

    make LocalFSStorage clean checkpoints before store (#2910)
    
    * make LocalFSStorage clean checkpoints before store
    
    * update and add unit tests
---
 .../config/src/yaml/conf/kubernetes/stateful.yaml  |  1 +
 heron/config/src/yaml/conf/local/stateful.yaml     |  1 +
 heron/config/src/yaml/conf/localzk/stateful.yaml   |  1 +
 heron/config/src/yaml/conf/marathon/stateful.yaml  |  1 +
 heron/config/src/yaml/conf/mesos/stateful.yaml     |  1 +
 heron/config/src/yaml/conf/nomad/stateful.yaml     |  1 +
 heron/config/src/yaml/conf/sandbox/stateful.yaml   |  1 +
 heron/config/src/yaml/conf/slurm/stateful.yaml     |  1 +
 .../config/src/yaml/conf/standalone/stateful.yaml  |  1 +
 heron/config/src/yaml/conf/yarn/stateful.yaml      |  1 +
 .../localfs/LocalFileSystemStorage.java            | 38 ++++++++++++++++++++--
 .../localfs/LocalFileSystemStorageTest.java        | 23 +++++++++++++
 12 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/heron/config/src/yaml/conf/kubernetes/stateful.yaml b/heron/config/src/yaml/conf/kubernetes/stateful.yaml
index 417b882..a00d9f6 100644
--- a/heron/config/src/yaml/conf/kubernetes/stateful.yaml
+++ b/heron/config/src/yaml/conf/kubernetes/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
 heron.statefulstorage.config:
   heron.statefulstorage.classpath: ""
   heron.statefulstorage.localfs.root.path: "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/local/stateful.yaml b/heron/config/src/yaml/conf/local/stateful.yaml
index d72e496..95083dd 100644
--- a/heron/config/src/yaml/conf/local/stateful.yaml
+++ b/heron/config/src/yaml/conf/local/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   ~/.herondata/checkpoints
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/localzk/stateful.yaml b/heron/config/src/yaml/conf/localzk/stateful.yaml
index d72e496..95083dd 100644
--- a/heron/config/src/yaml/conf/localzk/stateful.yaml
+++ b/heron/config/src/yaml/conf/localzk/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   ~/.herondata/checkpoints
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/marathon/stateful.yaml b/heron/config/src/yaml/conf/marathon/stateful.yaml
index 4ea3bfb..ce8785b 100644
--- a/heron/config/src/yaml/conf/marathon/stateful.yaml
+++ b/heron/config/src/yaml/conf/marathon/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/mesos/stateful.yaml b/heron/config/src/yaml/conf/mesos/stateful.yaml
index 4ea3bfb..ce8785b 100644
--- a/heron/config/src/yaml/conf/mesos/stateful.yaml
+++ b/heron/config/src/yaml/conf/mesos/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/nomad/stateful.yaml b/heron/config/src/yaml/conf/nomad/stateful.yaml
index 417b882..a00d9f6 100644
--- a/heron/config/src/yaml/conf/nomad/stateful.yaml
+++ b/heron/config/src/yaml/conf/nomad/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
 heron.statefulstorage.config:
   heron.statefulstorage.classpath: ""
   heron.statefulstorage.localfs.root.path: "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/sandbox/stateful.yaml b/heron/config/src/yaml/conf/sandbox/stateful.yaml
index d72e496..95083dd 100644
--- a/heron/config/src/yaml/conf/sandbox/stateful.yaml
+++ b/heron/config/src/yaml/conf/sandbox/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   ~/.herondata/checkpoints
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/slurm/stateful.yaml b/heron/config/src/yaml/conf/slurm/stateful.yaml
index 4ea3bfb..ce8785b 100644
--- a/heron/config/src/yaml/conf/slurm/stateful.yaml
+++ b/heron/config/src/yaml/conf/slurm/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/standalone/stateful.yaml b/heron/config/src/yaml/conf/standalone/stateful.yaml
index 417b882..a00d9f6 100644
--- a/heron/config/src/yaml/conf/standalone/stateful.yaml
+++ b/heron/config/src/yaml/conf/standalone/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
 heron.statefulstorage.config:
   heron.statefulstorage.classpath: ""
   heron.statefulstorage.localfs.root.path: "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/yarn/stateful.yaml b/heron/config/src/yaml/conf/yarn/stateful.yaml
index 4ea3bfb..ce8785b 100644
--- a/heron/config/src/yaml/conf/yarn/stateful.yaml
+++ b/heron/config/src/yaml/conf/yarn/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname:            "org.apache.heron.statefulstorage.lo
 heron.statefulstorage.config:
   heron.statefulstorage.classpath:           ""
   heron.statefulstorage.localfs.root.path:   "./checkpoints"
+  heron.statefulstorage.localfs.max.checkpoints: 10
 
 # Following are configs for socket between ckptmgr and stateful storage
 heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
index 6e40053..60fdc53 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
@@ -20,6 +20,7 @@
 package org.apache.heron.statefulstorage.localfs;
 
 import java.io.File;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.logging.Logger;
 
@@ -32,20 +33,28 @@ import org.apache.heron.spi.statefulstorage.Checkpoint;
 import org.apache.heron.spi.statefulstorage.IStatefulStorage;
 import org.apache.heron.spi.statefulstorage.StatefulStorageException;
 
-public class LocalFileSystemStorage implements IStatefulStorage {
+public class  LocalFileSystemStorage implements IStatefulStorage {
   private static final Logger LOG = Logger.getLogger(LocalFileSystemStorage.class.getName());
 
   private static final String ROOT_PATH_KEY = "heron.statefulstorage.localfs.root.path";
+  private static final String MAX_CHECKPOINTS_KEY = "heron.statefulstorage.localfs.max.checkpoints";
+
+  private static final int DEFAULT_MAX_CHECKPOINTS = 10;
 
   private String checkpointRootPath;
+  private int maxCheckpoints;
 
   @Override
   public void init(Map<String, Object> conf) throws StatefulStorageException {
     checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
+    maxCheckpoints = (int) conf.getOrDefault(MAX_CHECKPOINTS_KEY, DEFAULT_MAX_CHECKPOINTS);
+
     if (checkpointRootPath != null) {
-      checkpointRootPath = checkpointRootPath.replaceFirst("^~", System.getProperty("user.home"));
+      checkpointRootPath =
+          checkpointRootPath.replaceFirst("^~", System.getProperty("user.home"));
     }
-    LOG.info("Initialing... Checkpoint root path: " + checkpointRootPath);
+    LOG.info("Initialing LocalFileSystemStorage with Checkpoint root path: "
+        + checkpointRootPath + " Max checkpoints: " + maxCheckpoints);
   }
 
   @Override
@@ -55,6 +64,11 @@ public class LocalFileSystemStorage implements IStatefulStorage {
 
   @Override
   public void store(Checkpoint checkpoint) throws StatefulStorageException {
+    // heron doesn't clean checkpoints stored on local disk automatically
+    // localFS cleans checkpoints before store and limits the number of checkpoints saved
+    String rootPath = getTopologyCheckpointRoot(checkpoint.getTopologyName());
+    cleanCheckpoints(new File(rootPath), maxCheckpoints);
+
     String path = getCheckpointPath(checkpoint.getTopologyName(), checkpoint.getCheckpointId(),
                                     checkpoint.getComponent(), checkpoint.getTaskId());
 
@@ -134,6 +148,24 @@ public class LocalFileSystemStorage implements IStatefulStorage {
     }
   }
 
+  protected void cleanCheckpoints(File rootFile, int remaining) throws StatefulStorageException {
+    if (FileUtils.isDirectoryExists(rootFile.getAbsolutePath())
+        && FileUtils.hasChildren(rootFile.getAbsolutePath())) {
+      String[] children = rootFile.list();
+      Arrays.sort(children);
+
+      // only keep the latest N remaining files, delete others
+      for (int i = 0; i < children.length - remaining; i++) {
+        File ckptFile = new File(rootFile.getAbsolutePath(), children[i]);
+        FileUtils.deleteDir(ckptFile, true);
+
+        if (FileUtils.isDirectoryExists(ckptFile.getAbsolutePath())) {
+          throw new StatefulStorageException("Failed to delete " + ckptFile.getAbsolutePath());
+        }
+      }
+    }
+  }
+
   private String getTopologyCheckpointRoot(String topologyName) {
     return String.format("%s/%s", checkpointRootPath, topologyName);
   }
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
index eb4c4af..2875f64 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.heron.statefulstorage.localfs;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -79,6 +80,7 @@ public class LocalFileSystemStorageTest {
     PowerMockito.doReturn(true).when(FileUtils.class, "isDirectoryExists", anyString());
     PowerMockito.doReturn(true)
         .when(FileUtils.class, "writeToFile", anyString(), any(byte[].class), anyBoolean());
+    PowerMockito.doReturn(false).when(FileUtils.class, "hasChildren", anyString());
 
     Checkpoint mockCheckpoint = mock(Checkpoint.class);
     when(mockCheckpoint.getCheckpoint()).thenReturn(checkpoint);
@@ -90,6 +92,27 @@ public class LocalFileSystemStorageTest {
   }
 
   @Test
+  public void testCleanCheckpoints() throws Exception {
+    String fakeRootPath = "/fake/root/path";
+
+    PowerMockito.spy(FileUtils.class);
+    PowerMockito.doReturn(true).when(FileUtils.class, "isDirectoryExists", fakeRootPath);
+    PowerMockito.doReturn(true).when(FileUtils.class, "hasChildren", anyString());
+    PowerMockito.doReturn(true).when(FileUtils.class, "deleteDir", any(File.class), anyBoolean());
+
+    File mockRootFile = mock(File.class);
+    when(mockRootFile.getAbsolutePath()).thenReturn(fakeRootPath);
+    String[] files = {"1", "2", "3"};
+    when(mockRootFile.list()).thenReturn(files);
+
+    localFileSystemStorage.cleanCheckpoints(mockRootFile, 1);
+
+    PowerMockito.verifyStatic(times(1));
+    FileUtils.deleteDir(new File(String.format("%s%s%s", fakeRootPath, File.separator, "1")), true);
+    FileUtils.deleteDir(new File(String.format("%s%s%s", fakeRootPath, File.separator, "2")), true);
+  }
+
+  @Test
   public void testRestore() throws Exception {
     PowerMockito.spy(FileUtils.class);
     PowerMockito.doReturn(checkpoint.toByteArray())

-- 
To stop receiving notification emails like this one, please contact
nlu90@apache.org.