You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/27 08:10:37 UTC

[kylin] branch master updated: KYLIN-3430 Global Dictionary Cleanup

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new a20f04f  KYLIN-3430 Global Dictionary Cleanup
a20f04f is described below

commit a20f04fb17ccb58162471868e5adf3594fb41c77
Author: Temple Zhou <db...@gmail.com>
AuthorDate: Thu Dec 27 15:04:58 2018 +0800

    KYLIN-3430 Global Dictionary Cleanup
---
 .../apache/kylin/rest/job/MetadataCleanupJob.java  | 49 +++++++++++++++++++++-
 1 file changed, 47 insertions(+), 2 deletions(-)

diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index 5ee5c7a..e11fe74 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.job;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -25,11 +26,18 @@ import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -68,6 +76,7 @@ public class MetadataCleanupJob {
         CubeManager cubeManager = CubeManager.getInstance(config);
         ResourceStore store = ResourceStore.getStore(config);
         long newResourceTimeCut = System.currentTimeMillis() - NEW_RESOURCE_THREADSHOLD_MS;
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
 
         List<String> toDeleteCandidates = Lists.newArrayList();
 
@@ -82,6 +91,23 @@ public class MetadataCleanupJob {
             }
         }
 
+        // find all of the global dictionaries in HDFS
+        try {
+            FileStatus[] fStatus = new FileStatus[0];
+            fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/dict")));
+            fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict/dict")));
+            for (FileStatus status : fStatus) {
+                String path = status.getPath().toString();
+                FileStatus[] globalDicts = fs.listStatus(new Path(path));
+                for (FileStatus globalDict : globalDicts) {
+                    String globalDictPath = globalDict.getPath().toString();
+                    toDeleteCandidates.add(globalDictPath);
+                }
+            }
+        } catch (FileNotFoundException e) {
+            logger.info("Working Directory does not exist on HDFS. ");
+        }
+
         // three level resources, only dictionaries
         for (String resourceRoot : new String[] { ResourceStore.DICT_RESOURCE_ROOT }) {
             for (String dir : noNull(store.listResources(resourceRoot))) {
@@ -102,6 +128,20 @@ public class MetadataCleanupJob {
                 activeResources.addAll(segment.getSnapshotPaths());
                 activeResources.addAll(segment.getDictionaryPaths());
                 activeResources.add(segment.getStatisticsResourcePath());
+                for (String dictPath : segment.getDictionaryPaths()) {
+                    DictionaryInfo dictInfo = store.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER);
+                    if ("org.apache.kylin.dict.AppendTrieDictionary".equals(dictInfo != null ? dictInfo.getDictionaryClass() : null)){
+                        String dictObj = dictInfo.getDictionaryObject().toString();
+                        String basedir = dictObj.substring(dictObj.indexOf("(") + 1, dictObj.indexOf(")") - 1);
+                        if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict")) {
+                            activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+                                    + "resources/GlobalDict" + dictInfo.getResourceDir());
+                        } else if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/SegmentDict")) {
+                            activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+                                    + "resources/SegmentDict" + dictInfo.getResourceDir());
+                        }
+                    }
+                }
             }
         }
         toDeleteCandidates.removeAll(activeResources);
@@ -129,7 +169,7 @@ public class MetadataCleanupJob {
         return garbageResources;
     }
 
-    private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) {
+    private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) throws IOException {
         if (toDeleteResources.isEmpty()) {
             logger.info("No metadata resource to clean up");
             return toDeleteResources;
@@ -139,10 +179,15 @@ public class MetadataCleanupJob {
 
         if (delete) {
             ResourceStore store = ResourceStore.getStore(config);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
             for (String res : toDeleteResources) {
                 logger.info("Deleting metadata " + res);
                 try {
-                    store.deleteResource(res);
+                    if (res.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())) {
+                        fs.delete(new Path(res), true);
+                    } else {
+                        store.deleteResource(res);
+                    }
                 } catch (IOException e) {
                     logger.error("Failed to delete resource " + res, e);
                 }