You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/02/04 00:51:45 UTC

[42/50] [abbrv] kylin git commit: KYLIN-2908 Add one option for migration tool to indicate whether to migrate segment data

KYLIN-2908 Add one option for migration tool to indicate whether to migrate segment data

Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c704f7cd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c704f7cd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c704f7cd

Branch: refs/heads/sync
Commit: c704f7cda79921c5a22645492a52c94734847541
Parents: fee5730
Author: Wang Vic <jw...@ebay.com>
Authored: Wed Sep 27 11:18:22 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Tue Jan 30 19:10:51 2018 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/CubeInstance.java     |   6 +
 .../org/apache/kylin/tool/CubeMigrationCLI.java | 114 +++++++++++++------
 2 files changed, 87 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c704f7cd/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 27918bc..ea5006e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -383,6 +383,12 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
         this.createTimeUTC = createTimeUTC;
     }
 
+    public void clearCuboids() {
+        cuboidBytes = null;
+        cuboidBytesRecommend = null;
+        cuboidLastOptimized = 0L;
+    }
+
     public Set<Long> getCuboidsByMode(String cuboidModeName) {
         return getCuboidsByMode(cuboidModeName == null ? null : CuboidModeEnum.getByModeName(cuboidModeName));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c704f7cd/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index f95139e..a4a6ab9 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -90,6 +90,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     private HBaseAdmin hbaseAdmin;
     protected boolean doAclCopy = false;
     protected boolean doOverwrite = false;
+    protected boolean doMigrateSegment = true;
     protected String dstProject;
 
     private static final String ACL_PREFIX = "/acl/";
@@ -97,16 +98,20 @@ public class CubeMigrationCLI extends AbstractApplication {
     public static void main(String[] args) throws IOException, InterruptedException {
 
         CubeMigrationCLI cli = new CubeMigrationCLI();
-        if (args.length != 8) {
+        if (args.length != 8 && args.length != 9) {
             cli.usage();
             System.exit(1);
         }
-        cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]);
+        if (args.length == 8) {
+            cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]);
+        } else if (args.length == 9) {
+            cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8]);
+        }
     }
 
     protected void usage() {
         System.out.println(
-                "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute");
+                "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute migrateSegmentOrNot");
         System.out.println("srcKylinConfigUri: The KylinConfig of the cube’s source \n"
                 + "dstKylinConfigUri: The KylinConfig of the cube’s new home \n"
                 + "cubeName: the name of cube to be migrated. \n"
@@ -114,16 +119,36 @@ public class CubeMigrationCLI extends AbstractApplication {
                 + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n"
                 + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n"
                 + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n"
-                + "realExecute: if false, just print the operations to take, if true, do the real migration. \n");
+                + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"
+                + "migrateSegmentOrNot:(optional) true or false: whether copy segment data to target environment. \n");
 
     }
 
-    public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl,
+    public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
             String purgeAndDisable, String overwriteIfExists, String realExecute)
             throws IOException, InterruptedException {
 
-        doAclCopy = Boolean.parseBoolean(copyAcl);
-        doOverwrite = Boolean.parseBoolean(overwriteIfExists);
+        moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
+                projectName, Boolean.parseBoolean(copyAcl), Boolean.parseBoolean(purgeAndDisable),
+                Boolean.parseBoolean(overwriteIfExists), Boolean.parseBoolean(realExecute), true);
+    }
+
+    public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
+            String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment)
+            throws IOException, InterruptedException {
+
+        moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
+                projectName, Boolean.parseBoolean(copyAcl), Boolean.parseBoolean(purgeAndDisable),
+                Boolean.parseBoolean(overwriteIfExists), Boolean.parseBoolean(realExecute),
+                Boolean.parseBoolean(migrateSegment));
+    }
+
+    public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, boolean copyAcl,
+            boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment)
+            throws IOException, InterruptedException {
+        doAclCopy = copyAcl;
+        doOverwrite = overwriteIfExists;
+        doMigrateSegment = migrateSegment;
         srcConfig = srcCfg;
         srcStore = ResourceStore.getStore(srcConfig);
         dstConfig = dstCfg;
@@ -134,7 +159,10 @@ public class CubeMigrationCLI extends AbstractApplication {
         CubeInstance cube = cubeManager.getCube(cubeName);
         logger.info("cube to be moved is : " + cubeName);
 
-        checkCubeState(cube);
+        if (migrateSegment) {
+            checkCubeState(cube);
+        }
+
         checkAndGetHbaseUrl();
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
@@ -142,31 +170,29 @@ public class CubeMigrationCLI extends AbstractApplication {
         hdfsFS = HadoopUtil.getWorkingFileSystem();
         operations = new ArrayList<Opt>();
         copyFilesInMetaStore(cube);
-        renameFoldersInHdfs(cube);
-        changeHtableHost(cube);
+        if (migrateSegment) {
+            renameFoldersInHdfs(cube);
+            changeHtableHost(cube);
+        } else {
+            clearSegments(cubeName); // this should be after copyFilesInMetaStore
+        }
         addCubeAndModelIntoProject(cube, cubeName);
 
-        if (Boolean.parseBoolean(purgeAndDisable) == true) {
+        if (migrateSegment && purgeAndDisable) {
             purgeAndDisable(cubeName); // this should be the last action
         }
 
-        if (Boolean.parseBoolean(realExecute) == true) {
+        if (realExecute) {
             doOpts();
-            checkMigrationSuccess(dstConfig, cubeName, true);
+            if (migrateSegment) {
+                checkMigrationSuccess(dstConfig, cubeName, true);
+            }
             updateMeta(dstConfig);
         } else {
             showOpts();
         }
     }
 
-    public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
-            String purgeAndDisable, String overwriteIfExists, String realExecute)
-            throws IOException, InterruptedException {
-
-        moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
-                projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute);
-    }
-
     public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException {
         CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix);
         checkCLI.execute(cubeName);
@@ -215,22 +241,28 @@ public class CubeMigrationCLI extends AbstractApplication {
         }
     }
 
-    protected void copyFilesInMetaStore(CubeInstance cube) throws IOException {
+    protected void clearSegments(String cubeName) throws IOException {
+        operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[] { cubeName }));
+    }
 
-        List<String> metaItems = new ArrayList<String>();
-        Set<String> dictAndSnapshot = new HashSet<String>();
-        listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+    protected void copyFilesInMetaStore(CubeInstance cube) throws IOException {
 
         if (dstStore.exists(cube.getResourcePath()) && !doOverwrite)
             throw new IllegalStateException("The cube named " + cube.getName()
                     + " already exists on target metadata store. Use overwriteIfExists to overwrite it");
 
+        List<String> metaItems = new ArrayList<String>();
+        Set<String> dictAndSnapshot = new HashSet<String>();
+        listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+
         for (String item : metaItems) {
             operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
         }
 
-        for (String item : dictAndSnapshot) {
-            operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
+        if (doMigrateSegment) {
+            for (String item : dictAndSnapshot) {
+                operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
+            }
         }
     }
 
@@ -285,10 +317,12 @@ public class CubeMigrationCLI extends AbstractApplication {
         metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_RESOURCE_ROOT));
         metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_EXD_RESOURCE_ROOT));
 
-        for (CubeSegment segment : cube.getSegments()) {
-            metaResource.add(segment.getStatisticsResourcePath());
-            dictAndSnapshot.addAll(segment.getSnapshotPaths());
-            dictAndSnapshot.addAll(segment.getDictionaryPaths());
+        if (doMigrateSegment) {
+            for (CubeSegment segment : cube.getSegments()) {
+                metaResource.add(segment.getStatisticsResourcePath());
+                dictAndSnapshot.addAll(segment.getSnapshotPaths());
+                dictAndSnapshot.addAll(segment.getDictionaryPaths());
+            }
         }
 
         if (doAclCopy) {
@@ -308,7 +342,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     protected enum OptType {
-        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE
+        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE, CLEAR_SEGMENTS
     }
 
     protected void addOpt(OptType type, Object[] params) {
@@ -490,7 +524,7 @@ public class CubeMigrationCLI extends AbstractApplication {
                 project.addTable(tableRef.getTableIdentity());
             }
 
-            if (project.getModels().contains(modelName) == false)
+            if (!project.getModels().contains(modelName))
                 project.addModel(modelName);
             project.removeRealization(RealizationType.CUBE, cubeName);
             project.addRealizationEntry(RealizationType.CUBE, cubeName);
@@ -499,6 +533,20 @@ public class CubeMigrationCLI extends AbstractApplication {
             logger.info("Project instance for " + projectName + " is corrected");
             break;
         }
+        case CLEAR_SEGMENTS: {
+            String cubeName = (String) opt.params[0];
+            String cubeInstancePath = CubeInstance.concatResourcePath(cubeName);
+            Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+            CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, CubeInstance.class,
+                    cubeInstanceSerializer);
+            cubeInstance.getSegments().clear();
+            cubeInstance.clearCuboids();
+            cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
+            cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
+            dstStore.putResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer);
+            logger.info("Cleared segments for " + cubeName + ", since segments has not been copied");
+            break;
+        }
         case PURGE_AND_DISABLE: {
             String cubeName = (String) opt.params[0];
             String cubeResPath = CubeInstance.concatResourcePath(cubeName);