You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/06/04 20:23:45 UTC
[incubator-heron] branch master updated: make LocalFSStorage clean
checkpoints before store (#2910)
This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new c13198b make LocalFSStorage clean checkpoints before store (#2910)
c13198b is described below
commit c13198b79037246d5baecd8cc35f5454b5db27d5
Author: Neng Lu <fr...@gmail.com>
AuthorDate: Mon Jun 4 13:23:41 2018 -0700
make LocalFSStorage clean checkpoints before store (#2910)
* make LocalFSStorage clean checkpoints before store
* update and add unit tests
---
.../config/src/yaml/conf/kubernetes/stateful.yaml | 1 +
heron/config/src/yaml/conf/local/stateful.yaml | 1 +
heron/config/src/yaml/conf/localzk/stateful.yaml | 1 +
heron/config/src/yaml/conf/marathon/stateful.yaml | 1 +
heron/config/src/yaml/conf/mesos/stateful.yaml | 1 +
heron/config/src/yaml/conf/nomad/stateful.yaml | 1 +
heron/config/src/yaml/conf/sandbox/stateful.yaml | 1 +
heron/config/src/yaml/conf/slurm/stateful.yaml | 1 +
.../config/src/yaml/conf/standalone/stateful.yaml | 1 +
heron/config/src/yaml/conf/yarn/stateful.yaml | 1 +
.../localfs/LocalFileSystemStorage.java | 38 ++++++++++++++++++++--
.../localfs/LocalFileSystemStorageTest.java | 23 +++++++++++++
12 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/heron/config/src/yaml/conf/kubernetes/stateful.yaml b/heron/config/src/yaml/conf/kubernetes/stateful.yaml
index 417b882..a00d9f6 100644
--- a/heron/config/src/yaml/conf/kubernetes/stateful.yaml
+++ b/heron/config/src/yaml/conf/kubernetes/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: "./checkpoints"
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/local/stateful.yaml b/heron/config/src/yaml/conf/local/stateful.yaml
index d72e496..95083dd 100644
--- a/heron/config/src/yaml/conf/local/stateful.yaml
+++ b/heron/config/src/yaml/conf/local/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.lo
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: ~/.herondata/checkpoints
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/localzk/stateful.yaml b/heron/config/src/yaml/conf/localzk/stateful.yaml
index d72e496..95083dd 100644
--- a/heron/config/src/yaml/conf/localzk/stateful.yaml
+++ b/heron/config/src/yaml/conf/localzk/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.lo
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: ~/.herondata/checkpoints
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/marathon/stateful.yaml b/heron/config/src/yaml/conf/marathon/stateful.yaml
index 4ea3bfb..ce8785b 100644
--- a/heron/config/src/yaml/conf/marathon/stateful.yaml
+++ b/heron/config/src/yaml/conf/marathon/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.lo
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: "./checkpoints"
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/mesos/stateful.yaml b/heron/config/src/yaml/conf/mesos/stateful.yaml
index 4ea3bfb..ce8785b 100644
--- a/heron/config/src/yaml/conf/mesos/stateful.yaml
+++ b/heron/config/src/yaml/conf/mesos/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.lo
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: "./checkpoints"
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/nomad/stateful.yaml b/heron/config/src/yaml/conf/nomad/stateful.yaml
index 417b882..a00d9f6 100644
--- a/heron/config/src/yaml/conf/nomad/stateful.yaml
+++ b/heron/config/src/yaml/conf/nomad/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: "./checkpoints"
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/sandbox/stateful.yaml b/heron/config/src/yaml/conf/sandbox/stateful.yaml
index d72e496..95083dd 100644
--- a/heron/config/src/yaml/conf/sandbox/stateful.yaml
+++ b/heron/config/src/yaml/conf/sandbox/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.lo
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: ~/.herondata/checkpoints
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/slurm/stateful.yaml b/heron/config/src/yaml/conf/slurm/stateful.yaml
index 4ea3bfb..ce8785b 100644
--- a/heron/config/src/yaml/conf/slurm/stateful.yaml
+++ b/heron/config/src/yaml/conf/slurm/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.lo
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: "./checkpoints"
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/standalone/stateful.yaml b/heron/config/src/yaml/conf/standalone/stateful.yaml
index 417b882..a00d9f6 100644
--- a/heron/config/src/yaml/conf/standalone/stateful.yaml
+++ b/heron/config/src/yaml/conf/standalone/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.localfs.Local
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: "./checkpoints"
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/config/src/yaml/conf/yarn/stateful.yaml b/heron/config/src/yaml/conf/yarn/stateful.yaml
index 4ea3bfb..ce8785b 100644
--- a/heron/config/src/yaml/conf/yarn/stateful.yaml
+++ b/heron/config/src/yaml/conf/yarn/stateful.yaml
@@ -21,6 +21,7 @@ heron.statefulstorage.classname: "org.apache.heron.statefulstorage.lo
heron.statefulstorage.config:
heron.statefulstorage.classpath: ""
heron.statefulstorage.localfs.root.path: "./checkpoints"
+ heron.statefulstorage.localfs.max.checkpoints: 10
# Following are configs for socket between ckptmgr and stateful storage
heron.ckptmgr.network.write.batch.size.bytes: 32768
diff --git a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
index 6e40053..60fdc53 100644
--- a/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
+++ b/heron/statefulstorages/src/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorage.java
@@ -20,6 +20,7 @@
package org.apache.heron.statefulstorage.localfs;
import java.io.File;
+import java.util.Arrays;
import java.util.Map;
import java.util.logging.Logger;
@@ -32,20 +33,28 @@ import org.apache.heron.spi.statefulstorage.Checkpoint;
import org.apache.heron.spi.statefulstorage.IStatefulStorage;
import org.apache.heron.spi.statefulstorage.StatefulStorageException;
-public class LocalFileSystemStorage implements IStatefulStorage {
+public class LocalFileSystemStorage implements IStatefulStorage {
private static final Logger LOG = Logger.getLogger(LocalFileSystemStorage.class.getName());
private static final String ROOT_PATH_KEY = "heron.statefulstorage.localfs.root.path";
+ private static final String MAX_CHECKPOINTS_KEY = "heron.statefulstorage.localfs.max.checkpoints";
+
+ private static final int DEFAULT_MAX_CHECKPOINTS = 10;
private String checkpointRootPath;
+ private int maxCheckpoints;
@Override
public void init(Map<String, Object> conf) throws StatefulStorageException {
checkpointRootPath = (String) conf.get(ROOT_PATH_KEY);
+ maxCheckpoints = (int) conf.getOrDefault(MAX_CHECKPOINTS_KEY, DEFAULT_MAX_CHECKPOINTS);
+
if (checkpointRootPath != null) {
- checkpointRootPath = checkpointRootPath.replaceFirst("^~", System.getProperty("user.home"));
+ checkpointRootPath =
+ checkpointRootPath.replaceFirst("^~", System.getProperty("user.home"));
}
- LOG.info("Initialing... Checkpoint root path: " + checkpointRootPath);
+ LOG.info("Initialing LocalFileSystemStorage with Checkpoint root path: "
+ + checkpointRootPath + " Max checkpoints: " + maxCheckpoints);
}
@Override
@@ -55,6 +64,11 @@ public class LocalFileSystemStorage implements IStatefulStorage {
@Override
public void store(Checkpoint checkpoint) throws StatefulStorageException {
+ // heron doesn't clean checkpoints stored on local disk automatically
+ // localFS cleans checkpoints before store and limits the number of checkpoints saved
+ String rootPath = getTopologyCheckpointRoot(checkpoint.getTopologyName());
+ cleanCheckpoints(new File(rootPath), maxCheckpoints);
+
String path = getCheckpointPath(checkpoint.getTopologyName(), checkpoint.getCheckpointId(),
checkpoint.getComponent(), checkpoint.getTaskId());
@@ -134,6 +148,24 @@ public class LocalFileSystemStorage implements IStatefulStorage {
}
}
+ protected void cleanCheckpoints(File rootFile, int remaining) throws StatefulStorageException {
+ if (FileUtils.isDirectoryExists(rootFile.getAbsolutePath())
+ && FileUtils.hasChildren(rootFile.getAbsolutePath())) {
+ String[] children = rootFile.list();
+ Arrays.sort(children);
+
+ // only keep the latest N remaining files, delete others
+ for (int i = 0; i < children.length - remaining; i++) {
+ File ckptFile = new File(rootFile.getAbsolutePath(), children[i]);
+ FileUtils.deleteDir(ckptFile, true);
+
+ if (FileUtils.isDirectoryExists(ckptFile.getAbsolutePath())) {
+ throw new StatefulStorageException("Failed to delete " + ckptFile.getAbsolutePath());
+ }
+ }
+ }
+ }
+
private String getTopologyCheckpointRoot(String topologyName) {
return String.format("%s/%s", checkpointRootPath, topologyName);
}
diff --git a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
index eb4c4af..2875f64 100644
--- a/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
+++ b/heron/statefulstorages/tests/java/org/apache/heron/statefulstorage/localfs/LocalFileSystemStorageTest.java
@@ -19,6 +19,7 @@
package org.apache.heron.statefulstorage.localfs;
+import java.io.File;
import java.util.HashMap;
import java.util.Map;
@@ -79,6 +80,7 @@ public class LocalFileSystemStorageTest {
PowerMockito.doReturn(true).when(FileUtils.class, "isDirectoryExists", anyString());
PowerMockito.doReturn(true)
.when(FileUtils.class, "writeToFile", anyString(), any(byte[].class), anyBoolean());
+ PowerMockito.doReturn(false).when(FileUtils.class, "hasChildren", anyString());
Checkpoint mockCheckpoint = mock(Checkpoint.class);
when(mockCheckpoint.getCheckpoint()).thenReturn(checkpoint);
@@ -90,6 +92,27 @@ public class LocalFileSystemStorageTest {
}
@Test
+ public void testCleanCheckpoints() throws Exception {
+ String fakeRootPath = "/fake/root/path";
+
+ PowerMockito.spy(FileUtils.class);
+ PowerMockito.doReturn(true).when(FileUtils.class, "isDirectoryExists", fakeRootPath);
+ PowerMockito.doReturn(true).when(FileUtils.class, "hasChildren", anyString());
+ PowerMockito.doReturn(true).when(FileUtils.class, "deleteDir", any(File.class), anyBoolean());
+
+ File mockRootFile = mock(File.class);
+ when(mockRootFile.getAbsolutePath()).thenReturn(fakeRootPath);
+ String[] files = {"1", "2", "3"};
+ when(mockRootFile.list()).thenReturn(files);
+
+ localFileSystemStorage.cleanCheckpoints(mockRootFile, 1);
+
+ PowerMockito.verifyStatic(times(1));
+ FileUtils.deleteDir(new File(String.format("%s%s%s", fakeRootPath, File.separator, "1")), true);
+ FileUtils.deleteDir(new File(String.format("%s%s%s", fakeRootPath, File.separator, "2")), true);
+ }
+
+ @Test
public void testRestore() throws Exception {
PowerMockito.spy(FileUtils.class);
PowerMockito.doReturn(checkpoint.toByteArray())
--
To stop receiving notification emails like this one, please contact
nlu90@apache.org.