You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/18 07:47:24 UTC

incubator-kylin git commit: KYLIN-1154 cleanup old jobs and job outputs

Repository: incubator-kylin
Updated Branches:
  refs/heads/1.x-staging 363d765d3 -> 87cf6f32e


KYLIN-1154 cleanup old jobs and job outputs


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/87cf6f32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/87cf6f32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/87cf6f32

Branch: refs/heads/1.x-staging
Commit: 87cf6f32e8302cd2ddd18e9758f72277b8233fed
Parents: 363d765
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 18 14:46:54 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 18 14:47:22 2015 +0800

----------------------------------------------------------------------
 bin/metastore.sh                                |  1 -
 .../kylin/common/persistence/ResourceStore.java |  3 ++
 .../common/util/HBaseMetadataTestCase.java      |  7 +++-
 .../org/apache/kylin/job/dao/ExecutableDao.java | 12 +++----
 .../job/hadoop/cube/MetadataCleanupJob.java     | 37 +++++++++++++++-----
 5 files changed, 43 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87cf6f32/bin/metastore.sh
----------------------------------------------------------------------
diff --git a/bin/metastore.sh b/bin/metastore.sh
index 39593d4..6d2279f 100755
--- a/bin/metastore.sh
+++ b/bin/metastore.sh
@@ -26,7 +26,6 @@
 
 
 dir=$(dirname ${0})
-source ${dir}/check-env.sh
 
 if [ $1 == "backup" ]
 then

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87cf6f32/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index db70997..5375597 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -52,6 +52,9 @@ abstract public class ResourceStore {
     public static final String TABLE_EXD_RESOURCE_ROOT = "/table_exd";
     public static final String TABLE_RESOURCE_ROOT = "/table";
     public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
+    public static final String EXECUTE_PATH_ROOT = "/execute";
+    public static final String EXECUTE_OUTPUT_ROOT = "/execute_output";
+
 
     private static ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig, ResourceStore>();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87cf6f32/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java b/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
index 0fab2db..d2e3238 100644
--- a/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
+++ b/common/src/test/java/org/apache/kylin/common/util/HBaseMetadataTestCase.java
@@ -21,6 +21,7 @@ package org.apache.kylin.common.util;
 import org.apache.commons.lang.StringUtils;
 
 import java.io.File;
+import java.io.IOException;
 
 /**
  * @author ysong1
@@ -30,7 +31,11 @@ public class HBaseMetadataTestCase extends AbstractKylinTestCase {
     static {
         if (useSandbox()) {
             try {
-                ClassUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath());
+                File sandboxFolder = new File("../examples/test_case_data/sandbox/");
+                if (sandboxFolder.exists() == false) {
+                    throw new IOException("The sandbox folder doesn't exist: " + sandboxFolder.getAbsolutePath());
+                }
+                ClassUtil.addClasspath(sandboxFolder.getAbsolutePath());
             } catch (Exception e) {
                 e.printStackTrace();
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87cf6f32/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 9b79abf..482f7a0 100644
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -43,8 +43,6 @@ public class ExecutableDao {
     private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
     private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
     private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
-    public static final String JOB_PATH_ROOT = "/execute";
-    public static final String JOB_OUTPUT_ROOT = "/execute_output";
 
     private ResourceStore store;
 
@@ -71,11 +69,11 @@ public class ExecutableDao {
     }
 
     private String pathOfJob(String uuid) {
-        return JOB_PATH_ROOT + "/" + uuid;
+        return ResourceStore.EXECUTE_PATH_ROOT + "/" + uuid;
     }
 
     private String pathOfJobOutput(String uuid) {
-        return JOB_OUTPUT_ROOT + "/" + uuid;
+        return ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + uuid;
     }
 
     private ExecutablePO readJobResource(String path) throws IOException {
@@ -96,7 +94,7 @@ public class ExecutableDao {
 
     public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
         try {
-            ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
+            ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_ROOT);
             if (resources == null || resources.isEmpty()) {
                 return Collections.emptyList();
             }
@@ -112,7 +110,7 @@ public class ExecutableDao {
 
     public List<ExecutablePO> getJobs() throws PersistentException {
         try {
-            final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
+            final List<String> jobIds = store.listResources(ResourceStore.EXECUTE_PATH_ROOT);
             if (jobIds == null || jobIds.isEmpty()) {
                 return Collections.emptyList();
             }
@@ -128,7 +126,7 @@ public class ExecutableDao {
 
     public List<String> getJobIds() throws PersistentException {
         try {
-            ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
+            ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_PATH_ROOT);
             if (resources == null) {
                 return Collections.emptyList();
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87cf6f32/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
index 6d06dcc..e46093c 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/MetadataCleanupJob.java
@@ -18,11 +18,8 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -30,12 +27,18 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
+import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
 
 public class MetadataCleanupJob extends AbstractHadoopJob {
 
@@ -49,6 +52,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
     private KylinConfig config = null;
 
     public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
+    public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000l; // 30 days
 
     /*
      * (non-Javadoc)
@@ -143,8 +147,25 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
             }
         }
 
+        // delete old and completed jobs
+        ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv());
+        List<ExecutablePO> allExecutable = executableDao.getJobs();
+        for (ExecutablePO executable : allExecutable) {
+            long lastModified = executable.getLastModified();
+            ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid());
+            if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (output.getStatus().equals(JobStatusEnum.FINISHED.toString()) || output.getStatus().equals(JobStatusEnum.DISCARDED.toString()))) {
+                toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + executable.getUuid());
+                toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + executable.getUuid());
+
+                for (ExecutablePO task : executable.getTasks()) {
+                    toDeleteResource.add(ResourceStore.EXECUTE_PATH_ROOT + "/" + task.getUuid());
+                    toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_ROOT + "/" + task.getUuid());
+                }
+            }
+        }
+
         if (toDeleteResource.size() > 0) {
-            logger.info("The following resources have no reference, will be cleaned from metadata store: \n");
+            logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n");
 
             for (String s : toDeleteResource) {
                 logger.info(s);