You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/23 03:24:23 UTC
incubator-kylin git commit: KYLIN-1154 cleanup old job from metadata
store
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 6c4148ec7 -> 4378ca569
KYLIN-1154 cleanup old job from metadata store
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/4378ca56
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/4378ca56
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/4378ca56
Branch: refs/heads/2.x-staging
Commit: 4378ca5690eca4c647e7afdab3835187e7ae6db7
Parents: 6c4148e
Author: shaofengshi <sh...@apache.org>
Authored: Mon Nov 23 10:22:15 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Mon Nov 23 10:22:15 2015 +0800
----------------------------------------------------------------------
build/bin/find-hive-dependency.sh | 30 +++++++++++++++++---
build/bin/metastore.sh | 1 -
.../kylin/common/persistence/ResourceStore.java | 6 ++--
.../org/apache/kylin/job/dao/ExecutableDao.java | 12 ++++----
.../engine/mr/steps/MetadataCleanupJob.java | 24 +++++++++++++++-
.../kylin/source/kafka/KafkaConfigManager.java | 6 ++--
.../kylin/source/kafka/config/KafkaConfig.java | 2 +-
.../kylin/storage/hbase/HBaseResourceStore.java | 6 ++--
8 files changed, 63 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/build/bin/find-hive-dependency.sh
----------------------------------------------------------------------
diff --git a/build/bin/find-hive-dependency.sh b/build/bin/find-hive-dependency.sh
index 4982c93..ee36fc3 100644
--- a/build/bin/find-hive-dependency.sh
+++ b/build/bin/find-hive-dependency.sh
@@ -1,5 +1,22 @@
#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
hive_env=`hive -e set | grep 'env:CLASSPATH'`
hive_classpath=`echo $hive_env | grep 'env:CLASSPATH' | awk -F '=' '{print $2}'`
@@ -20,12 +37,17 @@ do
fi
done
-# in some versions of hdp hcatalog is not in hive's classpath, find it separately
+# in some versions of hive hcatalog is not in hive's classpath, find it separately
if [ -z "$HCAT_HOME" ]
then
- echo "HCAT_HOME not found, try to find hcatalog path from hdp home"
- hdp_home=`echo $hive_exec_path | awk -F '/hive.*/lib/' '{print $1}'`
- hcatalog_home=${hdp_home}/hive-hcatalog
+ echo "HCAT_HOME not found, try to find hcatalog path from hive home"
+ hive_home=`echo $hive_exec_path | awk -F '/hive.*/lib/' '{print $1}'`
+ if [ -d "${hive_home}/hive-hcatalog" ]; then
+ hcatalog_home=${hive_home}/hive-hcatalog
+ else
+ echo "Couldn't locate hcatalog installation, please make sure it is installed and set HCAT_HOME to the path."
+ exit 1
+ fi
else
echo "HCAT_HOME is set to: $HCAT_HOME, use it to find hcatalog path:"
hcatalog_home=${HCAT_HOME}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/build/bin/metastore.sh
----------------------------------------------------------------------
diff --git a/build/bin/metastore.sh b/build/bin/metastore.sh
index 39593d4..ed6e8d3 100755
--- a/build/bin/metastore.sh
+++ b/build/bin/metastore.sh
@@ -24,7 +24,6 @@
# take a look at SandboxMetastoreCLI
-
dir=$(dirname ${0})
source ${dir}/check-env.sh
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 7a2178c..848d412 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -46,15 +46,15 @@ abstract public class ResourceStore {
public static final String II_DESC_RESOURCE_ROOT = "/invertedindex_desc";
public static final String DATA_MODEL_DESC_RESOURCE_ROOT = "/model_desc";
public static final String DICT_RESOURCE_ROOT = "/dict";
- public static final String JOB_PATH_ROOT = "/job";
- public static final String JOB_OUTPUT_PATH_ROOT = "/job_output";
public static final String PROJECT_RESOURCE_ROOT = "/project";
public static final String SNAPSHOT_RESOURCE_ROOT = "/table_snapshot";
public static final String TABLE_EXD_RESOURCE_ROOT = "/table_exd";
public static final String TABLE_RESOURCE_ROOT = "/table";
public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
+ public static final String EXECUTE_RESOURCE_ROOT = "/execute";
+ public static final String EXECUTE_OUTPUT_RESOURCE_ROOT = "/execute_output";
public static final String STREAMING_RESOURCE_ROOT = "/streaming";
- public static final String KAfKA_RESOURCE_ROOT = "/kafka";
+ public static final String KAFKA_RESOURCE_ROOT = "/kafka";
public static final String STREAMING_OUTPUT_RESOURCE_ROOT = "/streaming_output";
public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 2f43037..18e36b4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -43,8 +43,6 @@ public class ExecutableDao {
private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
- public static final String JOB_PATH_ROOT = "/execute";
- public static final String JOB_OUTPUT_ROOT = "/execute_output";
private ResourceStore store;
@@ -71,11 +69,11 @@ public class ExecutableDao {
}
private String pathOfJob(String uuid) {
- return JOB_PATH_ROOT + "/" + uuid;
+ return ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + uuid;
}
private String pathOfJobOutput(String uuid) {
- return JOB_OUTPUT_ROOT + "/" + uuid;
+ return ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + uuid;
}
private ExecutablePO readJobResource(String path) throws IOException {
@@ -96,7 +94,7 @@ public class ExecutableDao {
public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
try {
- ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
+ ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT);
if (resources == null || resources.isEmpty()) {
return Collections.emptyList();
}
@@ -112,7 +110,7 @@ public class ExecutableDao {
public List<ExecutablePO> getJobs() throws PersistentException {
try {
- final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
+ final List<String> jobIds = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
if (jobIds == null || jobIds.isEmpty()) {
return Collections.emptyList();
}
@@ -128,7 +126,7 @@ public class ExecutableDao {
public List<String> getJobIds() throws PersistentException {
try {
- ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
+ ArrayList<String> resources = store.listResources(ResourceStore.EXECUTE_RESOURCE_ROOT);
if (resources == null) {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
index 3fbeb35..99f8a9f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MetadataCleanupJob.java
@@ -31,6 +31,10 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
+import org.apache.kylin.job.dao.ExecutablePO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +55,7 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
private KylinConfig config = null;
public static final long TIME_THREADSHOLD = 2 * 24 * 3600 * 1000l; // 2 days
+ public static final long TIME_THREADSHOLD_FOR_JOB = 30 * 24 * 3600 * 1000l; // 30 days
/*
* (non-Javadoc)
@@ -141,9 +146,26 @@ public class MetadataCleanupJob extends AbstractHadoopJob {
}
}
}
+
+ // delete old and completed jobs
+ ExecutableDao executableDao = ExecutableDao.getInstance(KylinConfig.getInstanceFromEnv());
+ List<ExecutablePO> allExecutable = executableDao.getJobs();
+ for (ExecutablePO executable : allExecutable) {
+ long lastModified = executable.getLastModified();
+ ExecutableOutputPO output = executableDao.getJobOutput(executable.getUuid());
+ if (System.currentTimeMillis() - lastModified > TIME_THREADSHOLD_FOR_JOB && (output.getStatus().equals(JobStatusEnum.FINISHED.toString()) || output.getStatus().equals(JobStatusEnum.DISCARDED.toString()))) {
+ toDeleteResource.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executable.getUuid());
+ toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executable.getUuid());
+
+ for (ExecutablePO task : executable.getTasks()) {
+ toDeleteResource.add(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + task.getUuid());
+ toDeleteResource.add(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid());
+ }
+ }
+ }
if (toDeleteResource.size() > 0) {
- logger.info("The following resources have no reference, will be cleaned from metadata store: \n");
+ logger.info("The following resources have no reference or is too old, will be cleaned from metadata store: \n");
for (String s : toDeleteResource) {
logger.info(s);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index 8cf51b6..ac20fc3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -140,7 +140,7 @@ public class KafkaConfigManager {
}
private String formatStreamingConfigPath(String name) {
- return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + name + ".json";
+ return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + name + ".json";
}
public boolean createKafkaConfig(String name, KafkaConfig config) {
@@ -217,11 +217,11 @@ public class KafkaConfigManager {
private void reloadAllKafkaConfig() throws IOException {
ResourceStore store = getStore();
- logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAfKA_RESOURCE_ROOT));
+ logger.info("Reloading Kafka Metadata from folder " + store.getReadableResourcePath(ResourceStore.KAFKA_RESOURCE_ROOT));
kafkaMap.clear();
- List<String> paths = store.collectResourceRecursively(ResourceStore.KAfKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+ List<String> paths = store.collectResourceRecursively(ResourceStore.KAFKA_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
for (String path : paths) {
KafkaConfig kafkaConfig;
try {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 2d8951f..100ca2d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -89,7 +89,7 @@ public class KafkaConfig extends RootPersistentEntity {
}
public static String getKafkaResourcePath(String streamingName) {
- return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
+ return ResourceStore.KAFKA_RESOURCE_ROOT + "/" + streamingName + MetadataConstants.FILE_SURFIX;
}
public List<KafkaClusterConfig> getKafkaClusterConfigs() {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4378ca56/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index eecbfb5..13c657e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -69,8 +69,6 @@ public class HBaseResourceStore extends ResourceStore {
TABLE_SUFFIX_MAP.put(CUBE_RESOURCE_ROOT + "/", "_cube");
TABLE_SUFFIX_MAP.put(DICT_RESOURCE_ROOT + "/", "_dict");
TABLE_SUFFIX_MAP.put("/invertedindex/", "_invertedindex");
- TABLE_SUFFIX_MAP.put(JOB_PATH_ROOT + "/", "_job");
- TABLE_SUFFIX_MAP.put(JOB_OUTPUT_PATH_ROOT + "/", "_job_output");
TABLE_SUFFIX_MAP.put(PROJECT_RESOURCE_ROOT + "/", "_proj");
TABLE_SUFFIX_MAP.put(SNAPSHOT_RESOURCE_ROOT + "/", "_table_snapshot");
TABLE_SUFFIX_MAP.put("", ""); // DEFAULT CASE
@@ -184,7 +182,7 @@ public class HBaseResourceStore extends ResourceStore {
byte[] value = r.getValue(B_FAMILY, B_COLUMN);
if (value.length == 0) {
Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+ Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
return fileSystem.open(redirectPath);
@@ -308,7 +306,7 @@ public class HBaseResourceStore extends ResourceStore {
private Path writeLargeCellToHdfs(String resPath, byte[] largeColumn, HTableInterface table) throws IOException {
Path redirectPath = bigCellHDFSPath(resPath);
- Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+ Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
FileSystem fileSystem = FileSystem.get(hconf);
if (fileSystem.exists(redirectPath)) {