You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by GitBox <gi...@apache.org> on 2018/12/27 08:10:35 UTC

[GitHub] shaofengshi closed pull request #422: KYLIN-3430 Global Dictionary Cleanup

shaofengshi closed pull request #422: KYLIN-3430 Global Dictionary Cleanup
URL: https://github.com/apache/kylin/pull/422
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
index 5ee5c7a370..e11fe74c87 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/MetadataCleanupJob.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.rest.job;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -25,11 +26,18 @@
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -68,6 +76,7 @@ public MetadataCleanupJob(KylinConfig config) {
         CubeManager cubeManager = CubeManager.getInstance(config);
         ResourceStore store = ResourceStore.getStore(config);
         long newResourceTimeCut = System.currentTimeMillis() - NEW_RESOURCE_THREADSHOLD_MS;
+        FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
 
         List<String> toDeleteCandidates = Lists.newArrayList();
 
@@ -82,6 +91,23 @@ public MetadataCleanupJob(KylinConfig config) {
             }
         }
 
+        // find all of the global dictionaries in HDFS
+        try {
+            FileStatus[] fStatus = new FileStatus[0];
+            fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/dict")));
+            fStatus = ArrayUtils.addAll(fStatus, fs.listStatus(new Path(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/SegmentDict/dict")));
+            for (FileStatus status : fStatus) {
+                String path = status.getPath().toString();
+                FileStatus[] globalDicts = fs.listStatus(new Path(path));
+                for (FileStatus globalDict : globalDicts) {
+                    String globalDictPath = globalDict.getPath().toString();
+                    toDeleteCandidates.add(globalDictPath);
+                }
+            }
+        } catch (FileNotFoundException e) {
+            logger.info("Working Directory does not exist on HDFS. ");
+        }
+
         // three level resources, only dictionaries
         for (String resourceRoot : new String[] { ResourceStore.DICT_RESOURCE_ROOT }) {
             for (String dir : noNull(store.listResources(resourceRoot))) {
@@ -102,6 +128,20 @@ public MetadataCleanupJob(KylinConfig config) {
                 activeResources.addAll(segment.getSnapshotPaths());
                 activeResources.addAll(segment.getDictionaryPaths());
                 activeResources.add(segment.getStatisticsResourcePath());
+                for (String dictPath : segment.getDictionaryPaths()) {
+                    DictionaryInfo dictInfo = store.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER);
+                    if ("org.apache.kylin.dict.AppendTrieDictionary".equals(dictInfo != null ? dictInfo.getDictionaryClass() : null)){
+                        String dictObj = dictInfo.getDictionaryObject().toString();
+                        String basedir = dictObj.substring(dictObj.indexOf("(") + 1, dictObj.indexOf(")") - 1);
+                        if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict")) {
+                            activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+                                    + "resources/GlobalDict" + dictInfo.getResourceDir());
+                        } else if (basedir.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/SegmentDict")) {
+                            activeResources.add(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory()
+                                    + "resources/SegmentDict" + dictInfo.getResourceDir());
+                        }
+                    }
+                }
             }
         }
         toDeleteCandidates.removeAll(activeResources);
@@ -129,7 +169,7 @@ public MetadataCleanupJob(KylinConfig config) {
         return garbageResources;
     }
 
-    private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) {
+    private List<String> cleanupConclude(boolean delete, List<String> toDeleteResources) throws IOException {
         if (toDeleteResources.isEmpty()) {
             logger.info("No metadata resource to clean up");
             return toDeleteResources;
@@ -139,10 +179,15 @@ public MetadataCleanupJob(KylinConfig config) {
 
         if (delete) {
             ResourceStore store = ResourceStore.getStore(config);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem(HadoopUtil.getCurrentConfiguration());
             for (String res : toDeleteResources) {
                 logger.info("Deleting metadata " + res);
                 try {
-                    store.deleteResource(res);
+                    if (res.startsWith(KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory())) {
+                        fs.delete(new Path(res), true);
+                    } else {
+                        store.deleteResource(res);
+                    }
                 } catch (IOException e) {
                     logger.error("Failed to delete resource " + res, e);
                 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services