You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/08/18 08:25:51 UTC

kylin git commit: minor, remove legacy code from tool module

Repository: kylin
Updated Branches:
  refs/heads/master 2b929c209 -> f77e032d0


minor, remove legacy code from tool module


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f77e032d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f77e032d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f77e032d

Branch: refs/heads/master
Commit: f77e032d0a1f8d3e0249dfa625b07c1aa1feafc9
Parents: 2b929c2
Author: lidongsjtu <li...@apache.org>
Authored: Thu Aug 18 16:25:35 2016 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Aug 18 16:25:40 2016 +0800

----------------------------------------------------------------------
 tool/pom.xml                                    |  8 ++
 .../apache/kylin/tool/CubeMetaExtractor.java    | 62 ++++++---------
 .../apache/kylin/tool/JobDiagnosisInfoCLI.java  | 13 ++--
 .../kylin/tool/util/ResourceStoreUtil.java      | 82 --------------------
 4 files changed, 40 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f77e032d/tool/pom.xml
----------------------------------------------------------------------
diff --git a/tool/pom.xml b/tool/pom.xml
index 7655f53..7561569 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -38,6 +38,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-kafka</artifactId>
+        </dependency>
 
         <!--Env-->
         <dependency>
@@ -45,6 +49,10 @@
             <artifactId>hbase-client</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-source-kafka</artifactId>
+        </dependency>
     </dependencies>
 
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f77e032d/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
index baeb78f..e744549 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMetaExtractor.java
@@ -28,12 +28,15 @@ import org.apache.commons.cli.OptionGroup;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.exception.PersistentException;
@@ -48,12 +51,13 @@ import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.apache.kylin.storage.hybrid.HybridManager;
-import org.apache.kylin.tool.util.ResourceStoreUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 /**
@@ -88,7 +92,7 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
     private ProjectManager projectManager;
     private HybridManager hybridManager;
     private CubeManager cubeManager;
-    //    private StreamingManager streamingManager;
+    private StreamingManager streamingManager;
     private CubeDescManager cubeDescManager;
     private ExecutableDao executableDao;
     private RealizationRegistry realizationRegistry;
@@ -198,17 +202,17 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
             KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
             KylinConfig dstConfig = KylinConfig.createInstanceFromUri(dest);
 
-            ResourceStoreUtil.copy(srcConfig, dstConfig, requiredResources);
+            ResourceTool.copy(srcConfig, dstConfig, Lists.newArrayList(requiredResources));
 
             try {
-                ResourceStoreUtil.copy(srcConfig, dstConfig, optionalResources);
+                ResourceTool.copy(srcConfig, dstConfig, Lists.newArrayList(optionalResources));
             } catch (Exception e) {
                 logger.warn("Exception when copying optional resource {}. May be caused by resource missing. Ignore it.");
             }
 
             ResourceStore dstStore = ResourceStore.getStore(dstConfig);
             for (CubeInstance cube : cubesToTrimAndSave) {
-                CubeInstance trimmedCube = copyCubeInstance(cube);
+                CubeInstance trimmedCube = CubeInstance.getCopyOf(cube);
                 trimmedCube.getSegments().clear();
                 trimmedCube.setUuid(cube.getUuid());
                 dstStore.putResource(trimmedCube.getResourcePath(), trimmedCube, CubeManager.CUBE_SERIALIZER);
@@ -223,30 +227,16 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
         return realizationRegistry.getRealization(realizationEntry.getType(), realizationEntry.getRealization());
     }
 
-    // do not call CubeInstance.getCopyOf() to keep backward compatible with 1.3.x
-    private static CubeInstance copyCubeInstance(CubeInstance cubeInstance) {
-        CubeInstance newCube = new CubeInstance();
-        newCube.setName(cubeInstance.getName());
-        newCube.setSegments(cubeInstance.getSegments());
-        newCube.setDescName(cubeInstance.getDescName());
-        newCube.setStatus(cubeInstance.getStatus());
-        newCube.setOwner(cubeInstance.getOwner());
-        newCube.setCost(cubeInstance.getCost());
-        newCube.setCreateTimeUTC(System.currentTimeMillis());
-        newCube.updateRandomUuid();
-        return newCube;
+    private void dealWithStreaming(CubeInstance cube) {
+        streamingManager = StreamingManager.getInstance(kylinConfig);
+        for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
+            if (streamingConfig.getName() != null && streamingConfig.getName().equalsIgnoreCase(cube.getFactTable())) {
+                addRequired(StreamingConfig.concatResourcePath(streamingConfig.getName()));
+                addRequired(KafkaConfig.concatResourcePath(streamingConfig.getName()));
+            }
+        }
     }
 
-    //    private void dealWithStreaming(CubeInstance cube) {
-    //        streamingManager = StreamingManager.getInstance(kylinConfig);
-    //        for (StreamingConfig streamingConfig : streamingManager.listAllStreaming()) {
-    //            if (streamingConfig.getName() != null && streamingConfig.getName().equalsIgnoreCase(cube.getFactTable())) {
-    //                addRequired(StreamingConfig.concatResourcePath(streamingConfig.getName()));
-    //                addRequired(KafkaConfig.concatResourcePath(streamingConfig.getName()));
-    //            }
-    //        }
-    //    }
-
     private void retrieveResourcePath(IRealization realization) {
 
         logger.info("Deal with realization {} of type {}", realization.getName(), realization.getType());
@@ -258,7 +248,7 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
             String modelName = cubeDesc.getModelName();
             DataModelDesc modelDesc = metadataManager.getDataModelDesc(modelName);
 
-            //            dealWithStreaming(cube);
+            dealWithStreaming(cube);
 
             for (String tableName : modelDesc.getAllTables()) {
                 addRequired(TableDesc.concatResourcePath(tableName));
@@ -266,14 +256,12 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
             }
 
             addRequired(DataModelDesc.concatResourcePath(modelDesc.getName()));
-
-            // backward compatible with 1.3
-            addRequired(ResourceStoreUtil.concatCubeDescResourcePath(cubeDesc.getName()));
+            addRequired(CubeDesc.concatResourcePath(cubeDesc.getName()));
 
             if (includeSegments) {
                 addRequired(CubeInstance.concatResourcePath(cube.getName()));
                 for (CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
-                    addRequired(ResourceStoreUtil.concatCubeSegmentStatisticsResourcePath(cube.getName(), segment.getUuid()));
+                    addRequired(CubeSegment.getStatisticsResourcePath(cube.getName(), segment.getUuid()));
                     if (includeSegmentDetails) {
                         for (String dictPat : segment.getDictionaryPaths()) {
                             addRequired(dictPat);
@@ -291,14 +279,14 @@ public class CubeMetaExtractor extends AbstractInfoExtractor {
                             try {
                                 if (onlyJobOutput) {
                                     ExecutablePO executablePO = executableDao.getJob(lastJobId);
-                                    addRequired(ResourceStoreUtil.concatJobOutputPath(lastJobId));
+                                    addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + lastJobId);
                                 } else {
                                     ExecutablePO executablePO = executableDao.getJob(lastJobId);
-                                    addRequired(ResourceStoreUtil.concatJobPath(lastJobId));
-                                    addRequired(ResourceStoreUtil.concatJobOutputPath(lastJobId));
+                                    addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + lastJobId);
+                                    addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + lastJobId);
                                     for (ExecutablePO task : executablePO.getTasks()) {
-                                        addRequired(ResourceStoreUtil.concatJobPath(task.getUuid()));
-                                        addRequired(ResourceStoreUtil.concatJobOutputPath(task.getUuid()));
+                                        addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + task.getUuid());
+                                        addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + task.getUuid());
                                     }
                                 }
                             } catch (PersistentException e) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f77e032d/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
index e8ef024..635a2c3 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
@@ -29,11 +29,12 @@ import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.tool.util.ResourceStoreUtil;
 import org.apache.kylin.tool.util.ToolUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -95,11 +96,11 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor {
         // dump job output
         logger.info("Start to dump job output");
         ExecutablePO executablePO = executableDao.getJob(jobId);
-        addRequired(ResourceStoreUtil.concatJobPath(jobId));
-        addRequired(ResourceStoreUtil.concatJobOutputPath(jobId));
+        addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + jobId);
+        addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + jobId);
         for (ExecutablePO task : executablePO.getTasks()) {
-            addRequired(ResourceStoreUtil.concatJobPath(task.getUuid()));
-            addRequired(ResourceStoreUtil.concatJobOutputPath(task.getUuid()));
+            addRequired(ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + executablePO.getTasks());
+            addRequired(ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + executablePO.getTasks());
             if (includeYarnLogs) {
                 yarnLogsResources.add(task.getUuid());
             }
@@ -164,7 +165,7 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor {
         try {
             KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
             KylinConfig dstConfig = KylinConfig.createInstanceFromUri(destDir.getAbsolutePath());
-            ResourceStoreUtil.copy(srcConfig, dstConfig, requiredResources);
+            ResourceTool.copy(srcConfig, dstConfig, requiredResources);
         } catch (Exception e) {
             throw new RuntimeException("Failed to extract job resources. ", e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/f77e032d/tool/src/main/java/org/apache/kylin/tool/util/ResourceStoreUtil.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/util/ResourceStoreUtil.java b/tool/src/main/java/org/apache/kylin/tool/util/ResourceStoreUtil.java
deleted file mode 100644
index f7e2617..0000000
--- a/tool/src/main/java/org/apache/kylin/tool/util/ResourceStoreUtil.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.kylin.tool.util;
-
-import java.lang.reflect.Method;
-import java.util.Collection;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.MetadataConstants;
-
-/**
- * Created by dongli on 5/5/16.
- */
-public class ResourceStoreUtil {
-    public static void copy(KylinConfig srcConfig, KylinConfig dstConfig, Collection<String> paths) throws Exception {
-        ResourceStore src = ResourceStore.getStore(srcConfig);
-        ResourceStore dst = ResourceStore.getStore(dstConfig);
-        for (String path : paths) {
-            rCopy(src, dst, path);
-        }
-    }
-
-    public static void rCopy(ResourceStore src, ResourceStore dst, String path) throws Exception {
-        Method listResourceMethod = ResourceStore.class.getMethod("listResources", String.class);
-        Iterable<String> children = (Iterable<String>) listResourceMethod.invoke(src, path);
-
-        if (children == null) {
-            // case of resource (not a folder)
-            try {
-                RawResource res = src.getResource(path);
-                if (res != null) {
-                    dst.putResource(path, res.inputStream, res.timestamp);
-                    res.inputStream.close();
-                } else {
-                    System.out.println("Resource not exist for " + path);
-                }
-            } catch (Exception ex) {
-                System.err.println("Failed to open " + path);
-                ex.printStackTrace();
-            }
-        } else {
-            // case of folder
-            for (String child : children)
-                rCopy(src, dst, child);
-        }
-    }
-
-    public static String concatCubeDescResourcePath(String descName) {
-        return ResourceStore.CUBE_DESC_RESOURCE_ROOT + "/" + descName + MetadataConstants.FILE_SURFIX;
-    }
-
-    public static String concatCubeSegmentStatisticsResourcePath(String cubeName, String cubeSegmentId) {
-        return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
-    }
-
-    public static String concatJobPath(String uuid) {
-        return ResourceStore.EXECUTE_RESOURCE_ROOT + "/" + uuid;
-    }
-
-    public static String concatJobOutputPath(String uuid) {
-        return ResourceStore.EXECUTE_OUTPUT_RESOURCE_ROOT + "/" + uuid;
-    }
-}