You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/04 11:37:08 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4817 Refine CubeMigrationCLI for kylin4

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new cd449ea  KYLIN-4817 Refine CubeMigrationCLI for kylin4
cd449ea is described below

commit cd449eab1c6f49a27bc97ecfff8e1b29af92aead
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Mon Nov 30 15:36:43 2020 +0800

    KYLIN-4817 Refine CubeMigrationCLI for kylin4
---
 .../apache/kylin/common/restclient/RestClient.java |   4 +-
 .../engine/spark/metadata/cube/PathManager.java    |   6 +
 .../org/apache/kylin/tool/CubeMigrationCLI.java    | 439 ++++++++-------------
 3 files changed, 168 insertions(+), 281 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 955b0ff..2e99809 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -202,7 +202,7 @@ public class RestClient {
 
     public String getKylinProperties() throws IOException {
         String url = baseUrl + "/admin/config";
-        HttpGet request = new HttpGet(url);
+        HttpGet request = newGet(url);
         HttpResponse response = null;
         try {
             response = client.execute(request);
@@ -380,7 +380,7 @@ public class RestClient {
     }
 
     private HttpGet newGet(String url) {
-        HttpGet get = new HttpGet();
+        HttpGet get = new HttpGet(url);
         addHttpHeaders(get);
         return get;
     }
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
index 0484bfc..6444715 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
@@ -49,6 +49,12 @@ public final class PathManager {
         return hdfsWorkDir + "parquet" + File.separator + cube.getName() + File.separator + segName + "_" + identifier;
     }
 
+    public static String getSegmentParquetStoragePath(String hdfsWorkDir, String cubeName, CubeSegment segment) {
+        String segmentName = segment.getName();
+        String identifier = segment.getStorageLocationIdentifier();
+        return hdfsWorkDir + "parquet" + File.separator + cubeName + File.separator + segmentName + "_" + identifier;
+    }
+
     /**
      * Delete segment path
      */
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 4612cef..550da0c 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -18,22 +18,20 @@
 
 package org.apache.kylin.tool;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -42,30 +40,23 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.cube.model.DictionaryDesc;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,14 +77,17 @@ public class CubeMigrationCLI extends AbstractApplication {
     protected KylinConfig dstConfig;
     protected ResourceStore srcStore;
     protected ResourceStore dstStore;
-    protected FileSystem hdfsFS;
-    private HBaseAdmin hbaseAdmin;
+    protected FileSystem hdfsFs;
+    protected Configuration conf;
     protected boolean doAclCopy = false;
     protected boolean doOverwrite = false;
     protected boolean doMigrateSegment = true;
     protected String dstProject;
+    protected String srcHdfsWorkDir;
+    protected String dstHdfsWorkDir;
 
     private static final String ACL_PREFIX = "/acl/";
+    private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/";
 
     public static void main(String[] args) throws IOException, InterruptedException {
 
@@ -125,7 +119,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
-            String purgeAndDisable, String overwriteIfExists, String realExecute)
+                         String purgeAndDisable, String overwriteIfExists, String realExecute)
             throws IOException, InterruptedException {
 
         moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
@@ -133,7 +127,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl,
-            String purgeAndDisable, String overwriteIfExists, String realExecute)
+                         String purgeAndDisable, String overwriteIfExists, String realExecute)
             throws IOException, InterruptedException {
 
         moveCube(srcCfg, dstCfg, cubeName, projectName, Boolean.parseBoolean(copyAcl),
@@ -142,7 +136,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
-            String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment)
+                         String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment)
             throws IOException, InterruptedException {
 
         moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
@@ -152,7 +146,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, boolean copyAcl,
-            boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment)
+                         boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment)
             throws IOException, InterruptedException {
         doAclCopy = copyAcl;
         doOverwrite = overwriteIfExists;
@@ -162,26 +156,24 @@ public class CubeMigrationCLI extends AbstractApplication {
         dstConfig = dstCfg;
         dstStore = ResourceStore.getStore(dstConfig);
         dstProject = projectName;
+        conf = HadoopUtil.getCurrentConfiguration();
 
         CubeManager cubeManager = CubeManager.getInstance(srcConfig);
         CubeInstance cube = cubeManager.getCube(cubeName);
+        srcHdfsWorkDir = srcConfig.getHdfsWorkingDirectory(cube.getProject());
+        dstHdfsWorkDir = dstConfig.getHdfsWorkingDirectory(dstProject);
         logger.info("cube to be moved is : " + cubeName);
 
         if (migrateSegment) {
             checkCubeState(cube);
         }
 
-        checkAndGetHbaseUrl();
+        checkAndGetMetadataUrl();
 
-        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-        hbaseAdmin = new HBaseAdmin(conf);
-        hdfsFS = HadoopUtil.getWorkingFileSystem();
+        hdfsFs = HadoopUtil.getWorkingFileSystem();
         operations = new ArrayList<Opt>();
         copyFilesInMetaStore(cube);
-        if (migrateSegment) {
-            renameFoldersInHdfs(cube);
-            changeHtableHost(cube);
-        } else {
+        if (!migrateSegment) {
             clearSegments(cubeName); // this should be after copyFilesInMetaStore
         }
         addCubeAndModelIntoProject(cube, cubeName);
@@ -192,20 +184,12 @@ public class CubeMigrationCLI extends AbstractApplication {
 
         if (realExecute) {
             doOpts();
-            if (migrateSegment) {
-                checkMigrationSuccess(dstConfig, cubeName, true);
-            }
             updateMeta(dstConfig, projectName, cubeName, cube.getModel());
         } else {
             showOpts();
         }
     }
 
-    public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException {
-        CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix);
-        checkCLI.execute(cubeName);
-    }
-
     protected void checkCubeState(CubeInstance cube) {
         if (cube.getStatus() != RealizationStatusEnum.READY)
             throw new IllegalStateException("Cannot migrate cube that is not in READY state.");
@@ -217,40 +201,16 @@ public class CubeMigrationCLI extends AbstractApplication {
         }
     }
 
-    protected void checkAndGetHbaseUrl() {
+    protected void checkAndGetMetadataUrl() {
         StorageURL srcMetadataUrl = srcConfig.getMetadataUrl();
         StorageURL dstMetadataUrl = dstConfig.getMetadataUrl();
 
         logger.info("src metadata url is " + srcMetadataUrl);
         logger.info("dst metadata url is " + dstMetadataUrl);
-
-        if (!"hbase".equals(srcMetadataUrl.getScheme()) || !"hbase".equals(dstMetadataUrl.getScheme()))
-            throw new IllegalStateException("Both metadata urls should be hbase metadata url");
-    }
-
-    protected void renameFoldersInHdfs(CubeInstance cube) throws IOException {
-        for (CubeSegment segment : cube.getSegments()) {
-
-            String jobUuid = segment.getLastBuildJobID();
-            String src = JobBuilderSupport.getJobWorkingDir(srcConfig.getHdfsWorkingDirectory(), jobUuid);
-            String tgt = JobBuilderSupport.getJobWorkingDir(dstConfig.getHdfsWorkingDirectory(), jobUuid);
-
-            operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt }));
-        }
-
-    }
-
-    protected void changeHtableHost(CubeInstance cube) {
-        if (cube.getDescriptor().getStorageType() != IStorageAware.ID_SHARDED_HBASE)
-            return;
-        for (CubeSegment segment : cube.getSegments()) {
-            operations
-                    .add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() }));
-        }
     }
 
     protected void clearSegments(String cubeName) throws IOException {
-        operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[] { cubeName }));
+        operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[]{cubeName}));
     }
 
     protected void copyFilesInMetaStore(CubeInstance cube) throws IOException {
@@ -260,16 +220,22 @@ public class CubeMigrationCLI extends AbstractApplication {
                     + " already exists on target metadata store. Use overwriteIfExists to overwrite it");
 
         List<String> metaItems = new ArrayList<String>();
+        List<String> srcParquetFiles = new ArrayList<String>();
+        List<String> dstParquetFiles = new ArrayList<String>();
         Set<String> dictAndSnapshot = new HashSet<String>();
-        listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+        listCubeRelatedResources(cube, metaItems, dictAndSnapshot, srcParquetFiles, dstParquetFiles);
 
         for (String item : metaItems) {
-            operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
+            operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[]{item}));
         }
 
         if (doMigrateSegment) {
             for (String item : dictAndSnapshot) {
-                operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
+                operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[]{item, cube.getName()}));
+            }
+
+            for (int i = 0; i < srcParquetFiles.size(); i++) {
+                operations.add(new Opt(OptType.COPY_PARQUET_FILE, new Object[]{srcParquetFiles.get(i), dstParquetFiles.get(i)}));
             }
         }
     }
@@ -279,11 +245,11 @@ public class CubeMigrationCLI extends AbstractApplication {
         if (!dstStore.exists(projectResPath))
             throw new IllegalStateException("The target project " + dstProject + " does not exist");
 
-        operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { srcCube, cubeName, dstProject }));
+        operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[]{srcCube, cubeName, dstProject}));
     }
 
     private void purgeAndDisable(String cubeName) throws IOException {
-        operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName }));
+        operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[]{cubeName}));
     }
 
     private List<String> getCompatibleTablePath(Set<TableRef> tableRefs, String project, String rootPath)
@@ -311,7 +277,7 @@ public class CubeMigrationCLI extends AbstractApplication {
         return toResource;
     }
 
-    protected void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot)
+    protected void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot, List<String> srcParquetFiles, List<String> dstParquetFiles)
             throws IOException {
 
         CubeDesc cubeDesc = cube.getDescriptor();
@@ -326,10 +292,25 @@ public class CubeMigrationCLI extends AbstractApplication {
         metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_EXD_RESOURCE_ROOT));
 
         if (doMigrateSegment) {
+            for (DictionaryDesc dictionaryDesc : cubeDesc.getDictionaries()) {
+                String[] columnInfo = dictionaryDesc.getColumnRef().getColumnWithTable().split("\\.");
+                String globalDictPath;
+                if (columnInfo.length == 3) {
+                    globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX + columnInfo[1] + File.separator + columnInfo[2];
+                } else {
+                    globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX + columnInfo[0] + File.separator + columnInfo[1];
+                }
+                if (globalDictPath != null) {
+                    logger.info("Add " + globalDictPath + " to migrate dict list");
+                    dictAndSnapshot.add(globalDictPath);
+                }
+            }
             for (CubeSegment segment : cube.getSegments()) {
                 metaResource.add(segment.getStatisticsResourcePath());
                 dictAndSnapshot.addAll(segment.getSnapshotPaths());
-                dictAndSnapshot.addAll(segment.getDictionaryPaths());
+                srcParquetFiles.add(PathManager.getSegmentParquetStoragePath(srcHdfsWorkDir, cube.getName(), segment));
+                dstParquetFiles.add(PathManager.getSegmentParquetStoragePath(dstHdfsWorkDir, cube.getName(), segment));
+                logger.info("Add " + PathManager.getSegmentParquetStoragePath(cube, segment.getName(), segment.getStorageLocationIdentifier()) + " to migrate parquet file list");
             }
         }
 
@@ -337,11 +318,6 @@ public class CubeMigrationCLI extends AbstractApplication {
             metaResource.add(ACL_PREFIX + cube.getUuid());
             metaResource.add(ACL_PREFIX + cube.getModel().getUuid());
         }
-
-//        if (cubeDesc.isStreamingCube()) {
-//            // add streaming source config info for streaming cube
-//            metaResource.add(StreamingSourceConfig.concatResourcePath(cubeDesc.getModel().getRootFactTableName()));
-//        }
     }
 
     @Override
@@ -355,7 +331,7 @@ public class CubeMigrationCLI extends AbstractApplication {
     }
 
     protected enum OptType {
-        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE, CLEAR_SEGMENTS
+        COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, COPY_PARQUET_FILE, ADD_INTO_PROJECT, PURGE_AND_DISABLE, CLEAR_SEGMENTS
     }
 
     protected void addOpt(OptType type, Object[] params) {
@@ -420,161 +396,94 @@ public class CubeMigrationCLI extends AbstractApplication {
         logger.info("Executing operation: " + opt.toString());
 
         switch (opt.type) {
-        case CHANGE_HTABLE_HOST: {
-            String tableName = (String) opt.params[0];
-            System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName);
-            HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
-            desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
-            logger.info("CHANGE_HTABLE_HOST is completed");
-            break;
-        }
-        case COPY_FILE_IN_META: {
-            String item = (String) opt.params[0];
-            RawResource res = srcStore.getResource(item);
-            if (res == null) {
-                logger.info("Item: {} doesn't exist, ignore it.", item);
+            case COPY_FILE_IN_META: {
+                String item = (String) opt.params[0];
+                RawResource res = srcStore.getResource(item);
+                if (res == null) {
+                    logger.info("Item: {} doesn't exist, ignore it.", item);
+                    break;
+                }
+                dstStore.putResource(renameTableWithinProject(item), res.content(), res.lastModified());
+                res.content().close();
+                logger.info("Item " + item + " is copied");
                 break;
             }
-            dstStore.putResource(renameTableWithinProject(item), res.content(), res.lastModified());
-            res.content().close();
-            logger.info("Item " + item + " is copied");
-            break;
-        }
-        case COPY_DICT_OR_SNAPSHOT: {
-            String item = (String) opt.params[0];
-
-            if (item.toLowerCase(Locale.ROOT).endsWith(".dict")) {
-                DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig);
-                DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig);
-                DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
-
-                long ts = dictSrc.getLastModified();
-                dictSrc.setLastModified(0);//to avoid resource store write conflict
-                Dictionary dictObj = dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig);
-                DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj, dictSrc);
-                dictSrc.setLastModified(ts);
-
-                if (dictSaved == dictSrc) {
-                    //no dup found, already saved to dest
-                    logger.info("Item " + item + " is copied");
+            case COPY_DICT_OR_SNAPSHOT: {
+                String item = (String) opt.params[0];
+                String itemPath = item.substring(item.substring(0, item.indexOf("/")).length()+1);
+                Path srcPath = new Path(srcHdfsWorkDir + itemPath);
+                Path dstPath = new Path(dstHdfsWorkDir + itemPath);
+                if (hdfsFs.exists(srcPath)) {
+                    FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false, true, conf);
+                    logger.info("Copy " + srcPath + " to " + dstPath);
                 } else {
-                    //dictSrc is rejected because of duplication
-                    //modify cube's dictionary path
-                    String cubeName = (String) opt.params[1];
-                    String cubeResPath = CubeInstance.concatResourcePath(cubeName);
-                    Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
-                    CubeInstance cube = dstStore.getResource(cubeResPath, cubeSerializer);
-                    for (CubeSegment segment : cube.getSegments()) {
-                        for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) {
-                            if (entry.getValue().equalsIgnoreCase(item)) {
-                                entry.setValue(dictSaved.getResourcePath());
-                            }
-                        }
-                    }
-                    dstStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
-                    logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused");
+                    logger.info("Dict or snapshot " + srcPath + " is not exists, ignore it");
                 }
-
-            } else if (item.toLowerCase(Locale.ROOT).endsWith(".snapshot")) {
-                SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig);
-                SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig);
-                SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
-
-                long ts = snapSrc.getLastModified();
-                snapSrc.setLastModified(0);
-                SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc);
-                snapSrc.setLastModified(ts);
-
-                if (snapSaved == snapSrc) {
-                    //no dup found, already saved to dest
-                    logger.info("Item " + item + " is copied");
-
+                break;
+            }
+            case COPY_PARQUET_FILE: {
+                Path srcPath = new Path((String) opt.params[0]);
+                Path dstPath = new Path((String) opt.params[1]);
+                if (hdfsFs.exists(srcPath)) {
+                    FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false, true, conf);
+                    logger.info("Copy " + srcPath + " to " + dstPath);
                 } else {
-                    String cubeName = (String) opt.params[1];
-                    String cubeResPath = CubeInstance.concatResourcePath(cubeName);
-                    Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
-                    CubeInstance cube = dstStore.getResource(cubeResPath, cubeSerializer);
-                    for (CubeSegment segment : cube.getSegments()) {
-                        for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) {
-                            if (entry.getValue().equalsIgnoreCase(item)) {
-                                entry.setValue(snapSaved.getResourcePath());
-                            }
-                        }
-                    }
-                    dstStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
-                    logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused");
-
+                    logger.info("Parquet file " + srcPath + " is not exists, ignore it");
                 }
-
-            } else {
-                logger.error("unknown item found: " + item);
-                logger.info("ignore it");
+                break;
             }
+            case ADD_INTO_PROJECT: {
+                CubeInstance srcCube = (CubeInstance) opt.params[0];
+                String cubeName = (String) opt.params[1];
+                String projectName = (String) opt.params[2];
+                String modelName = srcCube.getDescriptor().getModelName();
+
+                String projectResPath = ProjectInstance.concatResourcePath(projectName);
+                Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
+                ProjectInstance project = dstStore.getResource(projectResPath, projectSerializer);
+
+                for (TableRef tableRef : srcCube.getModel().getAllTables()) {
+                    project.addTable(tableRef.getTableIdentity());
+                }
 
-            break;
-        }
-        case RENAME_FOLDER_IN_HDFS: {
-            String srcPath = (String) opt.params[0];
-            String dstPath = (String) opt.params[1];
-            renameHDFSPath(srcPath, dstPath);
-            logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
-            break;
-        }
-        case ADD_INTO_PROJECT: {
-            CubeInstance srcCube = (CubeInstance) opt.params[0];
-            String cubeName = (String) opt.params[1];
-            String projectName = (String) opt.params[2];
-            String modelName = srcCube.getDescriptor().getModelName();
-
-            String projectResPath = ProjectInstance.concatResourcePath(projectName);
-            Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
-            ProjectInstance project = dstStore.getResource(projectResPath, projectSerializer);
-
-            for (TableRef tableRef : srcCube.getModel().getAllTables()) {
-                project.addTable(tableRef.getTableIdentity());
-            }
+                if (!project.getModels().contains(modelName))
+                    project.addModel(modelName);
+                project.removeRealization(RealizationType.CUBE, cubeName);
+                project.addRealizationEntry(RealizationType.CUBE, cubeName);
 
-            if (!project.getModels().contains(modelName))
-                project.addModel(modelName);
-            project.removeRealization(RealizationType.CUBE, cubeName);
-            project.addRealizationEntry(RealizationType.CUBE, cubeName);
+                dstStore.checkAndPutResource(projectResPath, project, projectSerializer);
+                logger.info("Project instance for " + projectName + " is corrected");
+                break;
+            }
+            case CLEAR_SEGMENTS: {
+                String cubeName = (String) opt.params[0];
+                String cubeInstancePath = CubeInstance.concatResourcePath(cubeName);
+                Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+                CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, cubeInstanceSerializer);
+                cubeInstance.getSegments().clear();
+                cubeInstance.clearCuboids();
+                cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
+                cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
+                dstStore.checkAndPutResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer);
+                logger.info("Cleared segments for " + cubeName + ", since segments has not been copied");
+                break;
+            }
+            case PURGE_AND_DISABLE: {
+                String cubeName = (String) opt.params[0];
+                String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+                Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+                CubeInstance cube = srcStore.getResource(cubeResPath, cubeSerializer);
+                cube.getSegments().clear();
+                cube.setStatus(RealizationStatusEnum.DISABLED);
+                srcStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
+                logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl());
 
-            dstStore.checkAndPutResource(projectResPath, project, projectSerializer);
-            logger.info("Project instance for " + projectName + " is corrected");
-            break;
-        }
-        case CLEAR_SEGMENTS: {
-            String cubeName = (String) opt.params[0];
-            String cubeInstancePath = CubeInstance.concatResourcePath(cubeName);
-            Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
-            CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, cubeInstanceSerializer);
-            cubeInstance.getSegments().clear();
-            cubeInstance.clearCuboids();
-            cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
-            cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
-            dstStore.checkAndPutResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer);
-            logger.info("Cleared segments for " + cubeName + ", since segments has not been copied");
-            break;
-        }
-        case PURGE_AND_DISABLE: {
-            String cubeName = (String) opt.params[0];
-            String cubeResPath = CubeInstance.concatResourcePath(cubeName);
-            Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
-            CubeInstance cube = srcStore.getResource(cubeResPath, cubeSerializer);
-            cube.getSegments().clear();
-            cube.setStatus(RealizationStatusEnum.DISABLED);
-            srcStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
-            logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl());
-
-            break;
-        }
-        default: {
-            //do nothing
-            break;
-        }
+                break;
+            }
+            default: {
+                //do nothing
+                break;
+            }
         }
     }
 
@@ -582,53 +491,38 @@ public class CubeMigrationCLI extends AbstractApplication {
         logger.info("Undo operation: " + opt.toString());
 
         switch (opt.type) {
-        case CHANGE_HTABLE_HOST: {
-            String tableName = (String) opt.params[0];
-            HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-            hbaseAdmin.disableTable(tableName);
-            desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
-            hbaseAdmin.modifyTable(tableName, desc);
-            hbaseAdmin.enableTable(tableName);
-            break;
-        }
-        case COPY_FILE_IN_META: {
-            // no harm
-            logger.info("Undo for COPY_FILE_IN_META is ignored");
-            String item = (String) opt.params[0];
-
-            if (item.startsWith(ACL_PREFIX) && doAclCopy) {
-                logger.info("Remove acl record");
-                dstStore.deleteResource(item);
+            case COPY_FILE_IN_META: {
+                // no harm
+                logger.info("Undo for COPY_FILE_IN_META is ignored");
+                String item = (String) opt.params[0];
+
+                if (item.startsWith(ACL_PREFIX) && doAclCopy) {
+                    logger.info("Remove acl record");
+                    dstStore.deleteResource(item);
+                }
+                break;
             }
-            break;
-        }
-        case COPY_DICT_OR_SNAPSHOT: {
-            // no harm
-            logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
-            break;
-        }
-        case RENAME_FOLDER_IN_HDFS: {
-            String srcPath = (String) opt.params[1];
-            String dstPath = (String) opt.params[0];
-
-            if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) {
-                renameHDFSPath(srcPath, dstPath);
-                logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
+            case COPY_DICT_OR_SNAPSHOT: {
+                // no harm
+                logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
+                break;
+            }
+            case ADD_INTO_PROJECT: {
+                logger.info("Undo for ADD_INTO_PROJECT is ignored");
+                break;
+            }
+            case PURGE_AND_DISABLE: {
+                logger.info("Undo for PURGE_AND_DISABLE is not supported");
+                break;
+            }
+            case COPY_PARQUET_FILE: {
+                logger.info("Undo for COPY_PARQUET_FILE is ignored");
+                break;
+            }
+            default: {
+                //do nothing
+                break;
             }
-            break;
-        }
-        case ADD_INTO_PROJECT: {
-            logger.info("Undo for ADD_INTO_PROJECT is ignored");
-            break;
-        }
-        case PURGE_AND_DISABLE: {
-            logger.info("Undo for PURGE_AND_DISABLE is not supported");
-            break;
-        }
-        default: {
-            //do nothing
-            break;
-        }
         }
     }
 
@@ -660,17 +554,4 @@ public class CubeMigrationCLI extends AbstractApplication {
             }
         }
     }
-
-    private void renameHDFSPath(String srcPath, String dstPath) throws IOException, InterruptedException {
-        int nRetry = 0;
-        int sleepTime = 5000;
-        while (!hdfsFS.rename(new Path(srcPath), new Path(dstPath))) {
-            ++nRetry;
-            if (nRetry > 3) {
-                throw new InterruptedException("Cannot rename folder " + srcPath + " to folder " + dstPath);
-            } else {
-                Thread.sleep((long) sleepTime * nRetry * nRetry);
-            }
-        }
-    }
-}
+}
\ No newline at end of file