You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/08/18 08:25:51 UTC
kylin git commit: minor, remove legacy code from tool module
Repository: kylin
Updated Branches:
refs/heads/master 2b929c209 -> f77e032d0
minor, remove legacy code from tool module
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f77e032d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f77e032d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f77e032d
Branch: refs/heads/master
Commit: f77e032d0a1f8d3e0249dfa625b07c1aa1feafc9
Parents: 2b929c2
Author: lidongsjtu <li...@apache.org>
Authored: Thu Aug 18 16:25:35 2016 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Aug 18 16:25:40 2016 +0800
----------------------------------------------------------------------
tool/pom.xml | 8 ++
.../apache/kylin/tool/CubeMetaExtractor.java | 62 ++++++---------
.../apache/kylin/tool/JobDiagnosisInfoCLI.java | 13 ++--
.../kylin/tool/util/ResourceStoreUtil.java | 82 --------------------
4 files changed, 40 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f77e032d/tool/pom.xml
----------------------------------------------------------------------
diff --git a/tool/pom.xml b/tool/pom.xml
index 7655f53..7561569 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -38,6 +38,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-mr</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-source-kafka</artifactId>
+ </dependency>
<!--Env-->
<dependency>
@@ -45,6 +49,10 @@
<artifactId>hbase-client</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-source-kafka</artifactId>
+ </dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/kylin/blob/f77e032d/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
index baeb78f..e744549 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
@@ -28,12 +28,15 @@ import org.apache.commons.cli.OptionGroup;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.exception.PersistentException;
@@ -48,12 +51,13 @@ import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.RealizationRegistry;
import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.apache.kylin.storage.hybrid.HybridManager;
-import org.apache.kylin.tool.util.ResourceStoreUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
@@ -88,7 +92,7 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
private ProjectManager projectManager;
private HybridManager hybridManager;
private CubeManager cubeManager;
- // private StreamingManager streamingManager;
+ private StreamingManager streamingManager;
private CubeDescManager cubeDescManager;
private ExecutableDao executableDao;
private RealizationRegistry realizationRegistry;
@@ -198,17 +202,17 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
KylinConfig dstConfig = KylinConfig.createInstanceFromUri(dest);
- ResourceStoreUtil.copy(srcConfig, dstConfig, requiredResources);
+ ResourceTool.copy(srcConfig, dstConfig, Lists.newArrayList(requiredResources));
try {
- ResourceStoreUtil.copy(srcConfig, dstConfig, optionalResources);
+ ResourceTool.copy(srcConfig, dstConfig, Lists.newArrayList(optionalResources));
} catch (Exception e) {
logger.warn("Exception when copying optional resource {}. May be caused by resource missing. Ignore it.");
}
ResourceStore dstStore = ResourceStore.getStore(dstConfig);
for (CubeInstance cube : cubesToTrimAndSave) {
- CubeInstance trimmedCube = copyCubeInstance(cube);
+ CubeInstance trimmedCube = CubeInstance.getCopyOf(cube);
trimmedCube.getSegments().clear();
trimmedCube.setUuid(cube.getUuid());
dstStore.putResource(trimmedCube.getResourcePath(), trimmedCube, CubeManager.CUBE_SERIALIZER);
@@ -223,30 +227,16 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
}
- // do not call CubeInstance.getCopyOf() to keep backward compatible with 1.3.x
- private static CubeInstance copyCubeInstance(CubeInstance cubeInstance) {
- CubeInstance newCube = new CubeInstance();
- newCube.setName(cubeInstance.getName());
- newCube.setSegments(cubeInstance.getSegments());
- newCube.setDescName(cubeInstance.getDescName());
- newCube.setStatus(cubeInstance.getStatus());
- newCube.setOwner(cubeInstance.getOwner());
- newCube.setCost(cubeInstance.getCost());
- newCube.setCreateTimeUTC(System.currentTimeMillis());
- newCube.updateRandomUuid();
- return newCube;
+ private void dealWithStreaming(CubeInstance cube) {
+ streamingManager = StreamingManager.getInstance(kylinConfig);
+ for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
+ if (streamingConfig.getName() != null && streamingConfig.getName().equalsIgnoreCase(cube.getFactTable())) {
+ addRequired(StreamingConfig.concatResourcePath(streamingConfig.getName()));
+ addRequired(KafkaConfig.concatResourcePath(streamingConfig.getName()));
+ }
+ }
}
- // private void dealWithStreaming(CubeInstance cube) {
- // streamingManager = StreamingManager.getInstance(kylinConfig);
- // for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
- // if (streamingConfig.getName() != null && streamingConfig.getName().equalsIgnoreCase(cube.getFactTable())) {
- // addRequired(StreamingConfig.concatResourcePath(streamingConfig.getName()));
- // addRequired(KafkaConfig.concatResourcePath(streamingConfig.getName()));
- // }
- // }
- // }
-
private void retrieveResourcePath(IRealization realization) {
logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
@@ -258,7 +248,7 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
String modelName = cubeDesc.getModelName();
DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
- // dealWithStreaming(cube);
+ dealWithStreaming(cube);
for (String tableName : modelDesc.getAllTables()) {
addRequired(TableDesc.concatResourcePath(tableName));
@@ -266,14 +256,12 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
}
addRequired(DataModelDesc.concatResourcePath(modelDesc.getName()));
-
- // backward compatible with 1.3
- addRequired(ResourceStoreUtil.concatCubeDescResourcePath(cubeDesc.getName()));
+ addRequired(CubeDesc.concatResourcePath(cubeDesc.getName()));
if (includeSegments) {
addRequired(CubeInstance.concatResourcePath(cube.getName()));
for (CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
- addRequired(ResourceStoreUtil.concatCubeSegmentStatisticsResourcePath(cube.getName(), segment.getUuid()));
+ addRequired(CubeSegment.getStatisticsResourcePath(cube.getName(), segment.getUuid()));
if (includeSegmentDetails) {
for (String dictPat : segment.getDictionaryPaths()) {
addRequired(dictPat);
@@ -291,14 +279,14 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
try {
if (onlyJobOutput) {
ExecutablePO executablePO = executableDao.getJob(lastJobId);
- addRequired(ResourceStoreUtil.concatJobOutputPath(lastJobId));
+ addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + lastJobId);
} else {
ExecutablePO executablePO = executableDao.getJob(lastJobId);
- addRequired(ResourceStoreUtil.concatJobPath(lastJobId));
- addRequired(ResourceStoreUtil.concatJobOutputPath(lastJobId));
+ addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + lastJobId);
+ addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + lastJobId);
for (ExecutablePO task : executablePO.getTasks()) {
- addRequired(ResourceStoreUtil.concatJobPath(task.getUuid()));
- addRequired(ResourceStoreUtil.concatJobOutputPath(task.getUuid()));
+ addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + task.getUuid());
+ addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid());
}
}
} catch (PersistentException e) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/f77e032d/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
index e8ef024..635a2c3 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
@@ -29,11 +29,12 @@ import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.tool.util.ResourceStoreUtil;
import org.apache.kylin.tool.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,11 +96,11 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor {
// dump job output
logger.info("Start to dump job output");
ExecutablePO executablePO = executableDao.getJob(jobId);
- addRequired(ResourceStoreUtil.concatJobPath(jobId));
- addRequired(ResourceStoreUtil.concatJobOutputPath(jobId));
+ addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + jobId);
+ addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + jobId);
for (ExecutablePO task : executablePO.getTasks()) {
- addRequired(ResourceStoreUtil.concatJobPath(task.getUuid()));
- addRequired(ResourceStoreUtil.concatJobOutputPath(task.getUuid()));
+ addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executablePO.getTasks());
+ addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executablePO.getTasks());
if (includeYarnLogs) {
yarnLogsResources.add(task.getUuid());
}
@@ -164,7 +165,7 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor {
try {
KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
KylinConfig dstConfig = KylinConfig.createInstanceFromUri(destDir.getAbsolutePath());
- ResourceStoreUtil.copy(srcConfig, dstConfig, requiredResources);
+ ResourceTool.copy(srcConfig, dstConfig, requiredResources);
} catch (Exception e) {
throw new RuntimeException("Failed to extract job resources. ", e);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f77e032d/tool/src/main/java/org/apache/kylin/tool/util/ResourceStoreUtil.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/util/ResourceStoreUtil.java b/tool/src/main/java/org/apache/kylin/tool/util/ResourceStoreUtil.java
deleted file mode 100644
index f7e2617..0000000
--- a/tool/src/main/java/org/apache/kylin/tool/util/ResourceStoreUtil.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.kylin.tool.util;
-
-import java.lang.reflect.Method;
-import java.util.Collection;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.MetadataConstants;
-
-/**
- * Created by dongli on 5/5/16.
- */
-public class ResourceStoreUtil {
- public static void copy(KylinConfig srcConfig, KylinConfig dstConfig, Collection<String> paths) throws Exception {
- ResourceStore src = ResourceStore.getStore(srcConfig);
- ResourceStore dst = ResourceStore.getStore(dstConfig);
- for (String path : paths) {
- rCopy(src, dst, path);
- }
- }
-
- public static void rCopy(ResourceStore src, ResourceStore dst, String path) throws Exception {
- Method listResourceMethod = ResourceStore.class.getMethod("listResources", String.class);
- Iterable<String> children = (Iterable<String>) listResourceMethod.invoke(src, path);
-
- if (children == null) {
- // case of resource (not a folder)
- try {
- RawResource res = src.getResource(path);
- if (res != null) {
- dst.putResource(path, res.inputStream, res.timestamp);
- res.inputStream.close();
- } else {
- System.out.println("Resource not exist for " + path);
- }
- } catch (Exception ex) {
- System.err.println("Failed to open " + path);
- ex.printStackTrace();
- }
- } else {
- // case of folder
- for (String child : children)
- rCopy(src, dst, child);
- }
- }
-
- public static String concatCubeDescResourcePath(String descName) {
- return ResourceStore.CUBE_DESC_RESOURCE_ROOT + "/" + descName + MetadataConstants.FILE_SURFIX;
- }
-
- public static String concatCubeSegmentStatisticsResourcePath(String cubeName, String cubeSegmentId) {
- return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
- }
-
- public static String concatJobPath(String uuid) {
- return ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + uuid;
- }
-
- public static String concatJobOutputPath(String uuid) {
- return ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + uuid;
- }
-}