You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/27 08:10:37 UTC
[kylin] branch master updated: KYLIN-3430 Global Dictionary Cleanup
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new a20f04f KYLIN-3430 Global Dictionary Cleanup
a20f04f is described below
commit a20f04fb17ccb58162471868e5adf3594fb41c77
Author: Temple Zhou <db...@gmail.com>
AuthorDate: Thu Dec 27 15:04:58 2018 +0800
KYLIN-3430 Global Dictionary Cleanup
---
.../apache/kylin/rest/job/MetadataCleanupJob.java | 49 +++++++++++++++++++++-
1 file changed, 47 insertions(+), 2 deletions(-)
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index 5ee5c7a..e11fe74 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest.job;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@@ -25,11 +26,18 @@ import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.execution.ExecutableState;
@@ -68,6 +76,7 @@ public class MetadataCleanupJob {
CubeManager cubeManager = CubeManager.getInstance(config);
ResourceStore store = ResourceStore.getStore(config);
long newResourceTimeCut = System.currentTimeMillis() - NEW_RESOURCE_THREADSHOLD_MS;
+ FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
List<String> toDeleteCandidates = Lists.newArrayList();
@@ -82,6 +91,23 @@ public class MetadataCleanupJob {
}
}
+ // find all of the global dictionaries in HDFS
+ try {
+ FileStatus[] fStatus = new FileStatus[0];
+ fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/dict")));
+ fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict/dict")));
+ for (FileStatus status : fStatus) {
+ String path = status.getPath().toString();
+ FileStatus[] globalDicts = fs.listStatus(new Path(path));
+ for (FileStatus globalDict : globalDicts) {
+ String globalDictPath = globalDict.getPath().toString();
+ toDeleteCandidates.add(globalDictPath);
+ }
+ }
+ } catch (FileNotFoundException e) {
+ logger.info("Working Directory does not exist on HDFS. ");
+ }
+
// three level resources, only dictionaries
for (String resourceRoot : new String[] { ResourceStore.DICT_RESOURCE_ROOT }) {
for (String dir : noNull(store.listResources(resourceRoot))) {
@@ -102,6 +128,20 @@ public class MetadataCleanupJob {
activeResources.addAll(segment.getSnapshotPaths());
activeResources.addAll(segment.getDictionaryPaths());
activeResources.add(segment.getStatisticsResourcePath());
+ for (String dictPath : segment.getDictionaryPaths()) {
+ DictionaryInfo dictInfo = store.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER);
+ if ("org.apache.kylin.dict.AppendTrieDictionary".equals(dictInfo != null ? dictInfo.getDictionaryClass() : null)){
+ String dictObj = dictInfo.getDictionaryObject().toString();
+ String basedir = dictObj.substring(dictObj.indexOf("(") + 1, dictObj.indexOf(")") - 1);
+ if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict")) {
+ activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ + "resources/GlobalDict" + dictInfo.getResourceDir());
+ } else if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/SegmentDict")) {
+ activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+ + "resources/SegmentDict" + dictInfo.getResourceDir());
+ }
+ }
+ }
}
}
toDeleteCandidates.removeAll(activeResources);
@@ -129,7 +169,7 @@ public class MetadataCleanupJob {
return garbageResources;
}
- private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) {
+ private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) throws IOException {
if (toDeleteResources.isEmpty()) {
logger.info("No metadata resource to clean up");
return toDeleteResources;
@@ -139,10 +179,15 @@ public class MetadataCleanupJob {
if (delete) {
ResourceStore store = ResourceStore.getStore(config);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
for (String res : toDeleteResources) {
logger.info("Deleting metadata " + res);
try {
- store.deleteResource(res);
+ if (res.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())) {
+ fs.delete(new Path(res), true);
+ } else {
+ store.deleteResource(res);
+ }
} catch (IOException e) {
logger.error("Failed to delete resource " + res, e);
}