You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/23 03:24:23 UTC

incubator-kylin git commit: KYLIN-1154 cleanup old job from metadata store

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 6c4148ec7 -> 4378ca569


KYLIN-1154 cleanup old job from metadata store

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4378ca56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4378ca56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4378ca56

Branch: refs/heads/2.x-staging
Commit: 4378ca5690eca4c647e7afdab3835187e7ae6db7
Parents: 6c4148e
Author: shaofengshi <sh...@apache.org>
Authored: Mon Nov 23 10:22:15 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Nov 23 10:22:15 2015 +0800

----------------------------------------------------------------------
 build/bin/find-hive-dependency.sh               | 30 +++++++++++++++++---
 build/bin/metastore.sh                          |  1 -
 .../kylin/common/persistence/ResourceStore.java |  6 ++--
 .../org/apache/kylin/job/dao/ExecutableDao.java | 12 ++++----
 .../engine/mr/steps/MetadataCleanupJob.java     | 24 +++++++++++++++-
 .../kylin/source/kafka/KafkaConfigManager.java  |  6 ++--
 .../kylin/source/kafka/config/KafkaConfig.java  |  2 +-
 .../kylin/storage/hbase/HBaseResourceStore.java |  6 ++--
 8 files changed, 63 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/build/bin/find-hive-dependency.sh
----------------------------------------------------------------------
diff --git a/build/bin/find-hive-dependency.sh b/build/bin/find-hive-dependency.sh
index 4982c93..ee36fc3 100644
--- a/build/bin/find-hive-dependency.sh
+++ b/build/bin/find-hive-dependency.sh
@@ -1,5 +1,22 @@
 #!/bin/bash
 
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 hive_env=`hive -e set | grep 'env:CLASSPATH'`
 
 hive_classpath=`echo $hive_env | grep 'env:CLASSPATH' | awk -F '=' '{print $2}'`
@@ -20,12 +37,17 @@ do
     fi
 done
 
-# in some versions of hdp hcatalog is not in hive's classpath, find it separately
+# in some versions of hive hcatalog is not in hive's classpath, find it separately
 if [ -z "$HCAT_HOME" ]
 then
-    echo "HCAT_HOME not found, try to find hcatalog path from hdp home"
-    hdp_home=`echo $hive_exec_path | awk -F '/hive.*/lib/' '{print $1}'`
-    hcatalog_home=${hdp_home}/hive-hcatalog
+    echo "HCAT_HOME not found, try to find hcatalog path from hive home"
+    hive_home=`echo $hive_exec_path | awk -F '/hive.*/lib/' '{print $1}'`
+    if [ -d "${hive_home}/hive-hcatalog" ]; then
+      hcatalog_home=${hive_home}/hive-hcatalog
+    else 
+      echo "Couldn't locate hcatalog installation, please make sure it is installed and set HCAT_HOME to the path."
+      exit 1
+    fi
 else
     echo "HCAT_HOME is set to: $HCAT_HOME, use it to find hcatalog path:"
     hcatalog_home=${HCAT_HOME}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/build/bin/metastore.sh
----------------------------------------------------------------------
diff --git a/build/bin/metastore.sh b/build/bin/metastore.sh
index 39593d4..ed6e8d3 100755
--- a/build/bin/metastore.sh
+++ b/build/bin/metastore.sh
@@ -24,7 +24,6 @@
 # take a look at SandboxMetastoreCLI
 
 
-
 dir=$(dirname ${0})
 source ${dir}/check-env.sh
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 7a2178c..848d412 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -46,15 +46,15 @@ abstract public class ResourceStore {
     public static final String II_DESC_RESOURCE_ROOT = "/invertedindex_desc";
     public static final String DATA_MODEL_DESC_RESOURCE_ROOT = "/model_desc";
     public static final String DICT_RESOURCE_ROOT = "/dict";
-    public static final String JOB_PATH_ROOT = "/job";
-    public static final String JOB_OUTPUT_PATH_ROOT = "/job_output";
     public static final String PROJECT_RESOURCE_ROOT = "/project";
     public static final String SNAPSHOT_RESOURCE_ROOT = "/table_snapshot";
     public static final String TABLE_EXD_RESOURCE_ROOT = "/table_exd";
     public static final String TABLE_RESOURCE_ROOT = "/table";
     public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
+    public static final String EXECUTE_RESOURCE_ROOT = "/execute";
+    public static final String EXECUTE_OUTPUT_RESOURCE_ROOT = "/execute_output";
     public static final String STREAMING_RESOURCE_ROOT = "/streaming";
-    public static final String KAfKA_RESOURCE_ROOT = "/kafka";
+    public static final String KAFKA_RESOURCE_ROOT = "/kafka";
     public static final String STREAMING_OUTPUT_RESOURCE_ROOT = "/streaming_output";
     public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 2f43037..18e36b4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -43,8 +43,6 @@ public class ExecutableDao {
     private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
     private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
     private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
-    public static final String JOB_PATH_ROOT = "/execute";
-    public static final String JOB_OUTPUT_ROOT = "/execute_output";
 
     private ResourceStore store;
 
@@ -71,11 +69,11 @@ public class ExecutableDao {
     }
 
     private String pathOfJob(String uuid) {
-        return JOB_PATH_ROOT + "/" + uuid;
+        return ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + uuid;
     }
 
     private String pathOfJobOutput(String uuid) {
-        return JOB_OUTPUT_ROOT + "/" + uuid;
+        return ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + uuid;
     }
 
     private ExecutablePO readJobResource(String path) throws IOException {
@@ -96,7 +94,7 @@ public class ExecutableDao {
 
     public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
         try {
-            ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
+            ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
             if (resources == null || resources.isEmpty()) {
                 return Collections.emptyList();
             }
@@ -112,7 +110,7 @@ public class ExecutableDao {
 
     public List<ExecutablePO> getJobs() throws PersistentException {
         try {
-            final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
+            final List<String> jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
             if (jobIds == null || jobIds.isEmpty()) {
                 return Collections.emptyList();
             }
@@ -128,7 +126,7 @@ public class ExecutableDao {
 
     public List<String> getJobIds() throws PersistentException {
         try {
-            ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
+            ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
             if (resources == null) {
                 return Collections.emptyList();
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
index 3fbeb35..99f8a9f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
@@ -31,6 +31,10 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
+import org.apache.kylin.job.dao.ExecutablePO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +55,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
     private KylinConfig config = null;
 
     public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
+    public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000l; // 30 days
 
     /*
      * (non-Javadoc)
@@ -141,9 +146,26 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
                             }
                 }
         }
+        
+        // delete old and completed jobs
+        ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv());
+        List<ExecutablePO> allExecutable = executableDao.getJobs();
+        for (ExecutablePO executable : allExecutable) {
+            long lastModified = executable.getLastModified();
+            ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid());
+            if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (output.getStatus().equals(JobStatusEnum.FINISHED.toString()) || output.getStatus().equals(JobStatusEnum.DISCARDED.toString()))) {
+                toDeleteResource.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executable.getUuid());
+                toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid());
+
+                for (ExecutablePO task : executable.getTasks()) {
+                    toDeleteResource.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + task.getUuid());
+                    toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid());
+                }
+            }
+        }
 
         if (toDeleteResource.size() > 0) {
-            logger.info("The following resources have no reference, will be cleaned from metadata store: \n");
+            logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n");
 
             for (String s : toDeleteResource) {
                 logger.info(s);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 8cf51b6..ac20fc3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -140,7 +140,7 @@ public class KafkaConfigManager {
     }
 
     private String formatStreamingConfigPath(String name) {
-        return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + name + ".json";
+        return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + name + ".json";
     }
 
     public boolean createKafkaConfig(String name, KafkaConfig config) {
@@ -217,11 +217,11 @@ public class KafkaConfigManager {
 
     private void reloadAllKafkaConfig() throws IOException {
         ResourceStore store = getStore();
-        logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAfKA_RESOURCE_ROOT));
+        logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
 
         kafkaMap.clear();
 
-        List<String> paths = store.collectResourceRecursively(ResourceStore.KAfKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+        List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
         for (String path : paths) {
             KafkaConfig kafkaConfig;
             try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 2d8951f..100ca2d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -89,7 +89,7 @@ public class KafkaConfig extends RootPersistentEntity {
     }
 
     public static String getKafkaResourcePath(String streamingName) {
-        return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
+        return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
     }
 
     public List<KafkaClusterConfig> getKafkaClusterConfigs() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index eecbfb5..13c657e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -69,8 +69,6 @@ public class HBaseResourceStore extends ResourceStore {
         TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube");
         TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict");
         TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex");
-        TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job");
-        TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output");
         TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj");
         TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot");
         TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE
@@ -184,7 +182,7 @@ public class HBaseResourceStore extends ResourceStore {
         byte[] value = r.getValue(B_FAMILY, B_COLUMN);
         if (value.length == 0) {
             Path redirectPath = bigCellHDFSPath(resPath);
-            Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+            Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
             FileSystem fileSystem = FileSystem.get(hconf);
 
             return fileSystem.open(redirectPath);
@@ -308,7 +306,7 @@ public class HBaseResourceStore extends ResourceStore {
 
     private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
         Path redirectPath = bigCellHDFSPath(resPath);
-        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         if (fileSystem.exists(redirectPath)) {