You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/06/15 02:51:29 UTC

[kylin] 13/15: KYLIN-4489 Create a tool for migration cross clusters

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2e2a4dff9ca9c08351084f53d293e189083db140
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue May 19 11:14:42 2020 +0800

    KYLIN-4489 Create a tool for migration cross clusters
---
 .../java/org/apache/kylin/common/KylinConfig.java  |  20 +
 .../apache/kylin/common/restclient/RestClient.java |  40 +-
 .../kylin/metadata/model/DataModelManager.java     |   3 +
 .../kylin/metadata/project/ProjectManager.java     |   6 +-
 .../kylin/rest/controller/MigrationController.java |   2 +-
 pom.xml                                            |   5 +
 .../kylin/rest/controller/AdminController.java     |  22 +
 .../apache/kylin/rest/service/AdminService.java    |  14 +
 .../apache/kylin/rest/service/HBaseInfoUtil.java   |  17 +-
 .../apache/kylin/rest/service/ModelService.java    |   8 +-
 .../kylin/storage/hbase/HBaseResourceStore.java    |   7 +-
 .../storage/hbase/util/DeployCoprocessorCLI.java   |  21 +-
 tool/pom.xml                                       |  11 +
 .../apache/kylin/tool/migration/ClusterUtil.java   | 165 +++++
 .../migration/CubeMigrationCrossClusterCLI.java    | 757 +++++++++++++++++++++
 .../kylin/tool/migration/DstClusterUtil.java       | 371 ++++++++++
 .../kylin/tool/migration/SrcClusterUtil.java       | 148 ++++
 17 files changed, 1595 insertions(+), 22 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 7b0888b..c9001f0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -20,6 +20,7 @@ package org.apache.kylin.common;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.kylin.common.restclient.RestClient;
 import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.ClassUtil;
@@ -571,6 +572,25 @@ public class KylinConfig extends KylinConfigBase {
         reloadKylinConfig(buildSiteProperties());
     }
 
+    public static String getConfigAsString(Configuration conf) {
+        final StringBuilder sb = new StringBuilder();
+        for (Map.Entry<String, String> entry : conf) {
+            sb.append(entry.getKey() + "=" + entry.getValue()).append('\n');
+        }
+        return sb.toString();
+    }
+
+    public static Configuration getConfigFromString(String configInStr) throws IOException {
+        Properties props = new Properties();
+        props.load(new StringReader(configInStr));
+
+        Configuration config = new Configuration();
+        for (Map.Entry<Object, Object> entry : props.entrySet()) {
+            config.set((String) entry.getKey(), (String) entry.getValue());
+        }
+        return config;
+    }
+
     public KylinConfig base() {
         return this;
     }
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index d908f58..21b08f8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -30,7 +30,6 @@ import java.util.regex.Pattern;
 
 import javax.xml.bind.DatatypeConverter;
 
-import com.google.common.base.Strings;
 import org.apache.http.HttpResponse;
 import org.apache.http.auth.AuthScope;
 import org.apache.http.auth.UsernamePasswordCredentials;
@@ -55,6 +54,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
 
 /**
  */
@@ -175,6 +175,25 @@ public class RestClient {
         }
     }
 
+    public void announceWipeCache(String entity, String event, String cacheKey) throws IOException {
+        String url = baseUrl + "/cache/announce/" + entity + "/" + cacheKey + "/" + event;
+        HttpPut request = new HttpPut(url);
+
+        try {
+            HttpResponse response = client.execute(request);
+
+            if (response.getStatusLine().getStatusCode() != 200) {
+                String msg = EntityUtils.toString(response.getEntity());
+                throw new IOException("Invalid response " + response.getStatusLine().getStatusCode()
+                        + " with announce cache wipe url " + url + "\n" + msg);
+            }
+        } catch (Exception ex) {
+            throw new IOException(ex);
+        } finally {
+            request.releaseConnection();
+        }
+    }
+    
     public void wipeCache(String entity, String event, String cacheKey) throws IOException {
         HttpPut request;
         String url;
@@ -202,8 +221,19 @@ public class RestClient {
     }
 
     public String getKylinProperties() throws IOException {
-        String url = baseUrl + "/admin/config";
-        HttpGet request = new HttpGet(url);
+        return getConfiguration(baseUrl + "/admin/config", false);
+    }
+
+    public String getHDFSConfiguration() throws IOException {
+        return getConfiguration(baseUrl + "/admin/config/hdfs", true);
+    }
+
+    public String getHBaseConfiguration() throws IOException {
+        return getConfiguration(baseUrl + "/admin/config/hbase", true);
+    }
+
+    private String getConfiguration(String url, boolean ifAuth) throws IOException {
+        HttpGet request = ifAuth ? newGet(url) : new HttpGet(url);
         HttpResponse response = null;
         try {
             response = client.execute(request);
@@ -372,7 +402,7 @@ public class RestClient {
                 String msg = getContent(response);
                 Map<String, String> kvMap = JsonUtil.readValueAsMap(msg);
                 String exception = kvMap.containsKey("exception") ? kvMap.get("exception") : "unknown";
-                throw new IOException(exception);
+                throw new IOException("Error code: " + response.getStatusLine().getStatusCode() + "\n" + exception);
             }
         } finally {
             post.releaseConnection();
@@ -411,7 +441,7 @@ public class RestClient {
     }
 
     private HttpGet newGet(String url) {
-        HttpGet get = new HttpGet();
+        HttpGet get = new HttpGet(url);
         addHttpHeaders(get);
         return get;
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
index 47e2c3d..f483a2c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.persistence.WriteConflictException;
@@ -48,6 +49,8 @@ import org.apache.kylin.shaded.com.google.common.collect.Lists;
 public class DataModelManager {
 
     private static final Logger logger = LoggerFactory.getLogger(DataModelManager.class);
+    public static final Serializer<DataModelDesc> MODELDESC_SERIALIZER = new JsonSerializer<DataModelDesc>(
+            DataModelDesc.class);
 
     public static DataModelManager getInstance(KylinConfig config) {
         return config.getManager(DataModelManager.class);
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 15b6a2d..ebcd45c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -30,7 +30,9 @@ import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.AutoReadWriteLock;
 import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
 import org.apache.kylin.metadata.TableMetadataManager;
@@ -54,7 +56,9 @@ import org.apache.kylin.shaded.com.google.common.collect.Sets;
 
 public class ProjectManager {
     private static final Logger logger = LoggerFactory.getLogger(ProjectManager.class);
-
+    public static final Serializer<ProjectInstance> PROJECT_SERIALIZER = new JsonSerializer<ProjectInstance>(
+            ProjectInstance.class);
+    
     public static ProjectManager getInstance(KylinConfig config) {
         return config.getManager(ProjectManager.class);
     }
diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
index efef5cf..45f83ea 100644
--- a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
+++ b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
@@ -155,7 +155,7 @@ public class MigrationController extends BasicController {
             }
             DataModelDesc dataModelDesc = JsonUtil.readValue(request.getModelDescData(), DataModelDesc.class);
             logger.info("Schema compatibility check for model {}", dataModelDesc.getName());
-            modelService.checkModelCompatibility(request.getProjectName(), dataModelDesc, tableDescList);
+            modelService.checkModelCompatibility(dataModelDesc, tableDescList);
             logger.info("Pass schema compatibility check for model {}", dataModelDesc.getName());
         } catch (Exception e) {
             logger.error(e.getMessage(), e);
diff --git a/pom.xml b/pom.xml
index c73bf28..f60b7c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -542,6 +542,11 @@
         <optional>true</optional>
       </dependency>
       <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-distcp</artifactId>
+        <version>${hadoop2.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.zookeeper</groupId>
         <artifactId>zookeeper</artifactId>
         <version>${zookeeper.version}</version>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
index 4d90db8..780e069 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
@@ -112,6 +112,28 @@ public class AdminController extends BasicController {
         return configRes;
     }
 
+    @RequestMapping(value = "/config/hdfs", method = { RequestMethod.GET }, produces = { "application/json" })
+    @ResponseBody
+    public GeneralResponse getHDFSConfig() throws IOException {
+        String config = adminService.getHadoopConfigAsString();
+
+        GeneralResponse configRes = new GeneralResponse();
+        configRes.put("config", config);
+
+        return configRes;
+    }
+
+    @RequestMapping(value = "/config/hbase", method = { RequestMethod.GET }, produces = { "application/json" })
+    @ResponseBody
+    public GeneralResponse getHBaseConfig() throws IOException {
+        String config = adminService.getHBaseConfigAsString();
+
+        GeneralResponse configRes = new GeneralResponse();
+        configRes.put("config", config);
+
+        return configRes;
+    }
+
     @RequestMapping(value = "/metrics/cubes", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
     public MetricsResponse cubeMetrics(MetricsRequest request) {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
index c133a28..4e9cd03 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
@@ -31,10 +31,12 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OrderedProperties;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.job.StorageCleanupJob;
+import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.security.access.prepost.PreAuthorize;
@@ -117,4 +119,16 @@ public class AdminService extends BasicService {
 
         return KylinConfig.getInstanceFromEnv().exportToString(propertyKeys);
     }
+
+    public String getHadoopConfigAsString() throws IOException {
+        logger.debug("Get Kylin Hadoop Config");
+
+        return KylinConfig.getConfigAsString(HadoopUtil.getCurrentConfiguration());
+    }
+
+    public String getHBaseConfigAsString() throws IOException {
+        logger.debug("Get Kylin HBase Config");
+
+        return KylinConfig.getConfigAsString(HBaseConnection.getCurrentHBaseConfiguration());
+    }
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java b/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java
index 8fc1de9..012f14a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.service;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.kylin.common.KylinConfig;
@@ -28,13 +29,17 @@ import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
 import org.apache.kylin.storage.hbase.util.HBaseUnionUtil;
 
 public class HBaseInfoUtil {
-    
+
     @SuppressWarnings("unused") // used by reflection
     public static HBaseResponse getHBaseInfo(String tableName, KylinConfig config) throws IOException {
         if (!config.getStorageUrl().getScheme().equals("hbase"))
             return null;
-        
+
         Connection conn = HBaseUnionUtil.getConnection(config, tableName);
+        return getHBaseInfo(tableName, conn);
+    }
+
+    public static HBaseResponse getHBaseInfo(String tableName, Connection conn) throws IOException {
         HBaseResponse hr = null;
         long tableSize = 0;
         int regionCount = 0;
@@ -54,4 +59,12 @@ public class HBaseInfoUtil {
         hr.setRegionCount(regionCount);
         return hr;
     }
+
+    public static boolean checkEquals(HBaseResponse hbaseR1, HBaseResponse hbaseR2) {
+        if (hbaseR1 == hbaseR2)
+            return true;
+        return Objects.equals(hbaseR1.getTableName(), hbaseR2.getTableName())
+                && hbaseR1.getTableSize() == hbaseR2.getTableSize()
+                && hbaseR1.getRegionCount() == hbaseR2.getRegionCount();
+    }
 }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 5a3a1ca..7ff3919 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -168,11 +168,7 @@ public class ModelService extends BasicService {
         result.raiseExceptionWhenInvalid();
     }
 
-    public void checkModelCompatibility(String project, DataModelDesc dataModalDesc, List<TableDesc> tableDescList) {
-        ProjectInstance prjInstance = getProjectManager().getProject(project);
-        if (prjInstance == null) {
-            throw new BadRequestException("Project " + project + " does not exist");
-        }
+    public void checkModelCompatibility(DataModelDesc dataModalDesc, List<TableDesc> tableDescList) {
         ModelSchemaUpdateChecker checker = new ModelSchemaUpdateChecker(getTableManager(), getCubeManager(),
                 getDataModelManager());
 
@@ -181,7 +177,7 @@ public class ModelService extends BasicService {
             tableDescMap.put(tableDesc.getIdentity(), tableDesc);
         }
         dataModalDesc.init(getConfig(), tableDescMap);
-        ModelSchemaUpdateChecker.CheckResult result = checker.allowEdit(dataModalDesc, project, false);
+        ModelSchemaUpdateChecker.CheckResult result = checker.allowEdit(dataModalDesc, null, false);
         result.raiseExceptionWhenInvalid();
     }
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index dd63b52..e26cd74 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HTableDescriptor;
@@ -91,10 +92,14 @@ public class HBaseResourceStore extends PushdownResourceStore {
                 .parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760"));
     }
 
-    Connection getConnection() throws IOException {
+    protected Connection getConnection() throws IOException {
         return HBaseConnection.get(metadataUrl);
     }
 
+    protected Configuration getCurrentHBaseConfiguration() {
+        return HBaseConnection.getCurrentHBaseConfiguration();
+    }
+
     private StorageURL buildMetadataUrl(KylinConfig kylinConfig) throws IOException {
         StorageURL url = kylinConfig.getMetadataUrl();
         if (!url.getScheme().equals("hbase"))
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 845a4e5..52d1c14 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -420,7 +420,7 @@ public class DeployCoprocessorCLI {
     }
 
     public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
-        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, getHDFSWorkingDirectory(config));
         FileStatus newestJar = null;
         for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
             if (fileStatus.getPath().toString().endsWith(".jar")) {
@@ -440,8 +440,14 @@ public class DeployCoprocessorCLI {
         return path;
     }
 
-    public static synchronized Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
-            Set<String> oldJarPaths) throws IOException {
+    public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
+                                                         Set<String> oldJarPaths) throws IOException {
+        String hdfsWorkingDirectory = getHDFSWorkingDirectory(KylinConfig.getInstanceFromEnv());
+        return uploadCoprocessorJar(localCoprocessorJar, fileSystem, hdfsWorkingDirectory, oldJarPaths);
+    }
+
+    public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
+                                                         String hdfsWorkingDirectory, Set<String> oldJarPaths) throws IOException {
         Path uploadPath = null;
         File localCoprocessorFile = new File(localCoprocessorJar);
 
@@ -449,7 +455,7 @@ public class DeployCoprocessorCLI {
         if (oldJarPaths == null) {
             oldJarPaths = new HashSet<String>();
         }
-        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+        Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, hdfsWorkingDirectory);
         for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
             if (isSame(localCoprocessorFile, fileStatus)) {
                 uploadPath = fileStatus.getPath();
@@ -511,9 +517,12 @@ public class DeployCoprocessorCLI {
         return baseName;
     }
 
-    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+    private static String getHDFSWorkingDirectory(KylinConfig config) {
         String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
-        hdfsWorkingDirectory = HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory);
+        return HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory);
+    }
+
+    private static Path getCoprocessorHDFSDir(FileSystem fileSystem, String hdfsWorkingDirectory) throws IOException {
         Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
         fileSystem.mkdirs(coprocessorDir);
         return coprocessorDir;
diff --git a/tool/pom.xml b/tool/pom.xml
index 140ff93..8d4c2b4 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -76,6 +76,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-server</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-yarn-api</artifactId>
             <scope>provided</scope>
@@ -86,6 +91,12 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-distcp</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <!--Spring-->
         <dependency>
             <groupId>org.springframework</groupId>
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java
new file mode 100644
index 0000000..14f8e90
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.rest.security.ManagedUser;
+import org.apache.kylin.rest.service.KylinUserService;
+import org.apache.kylin.storage.hbase.HBaseResourceStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
+
+public abstract class ClusterUtil {
+    private static final Logger logger = LoggerFactory.getLogger(ClusterUtil.class);
+
+    protected final KylinConfig kylinConfig;
+    protected final RestClient restClient;
+    protected final String hdfsWorkingDirectory;
+
+    protected final Configuration hbaseConf;
+    protected final Connection hbaseConn;
+    protected final ResourceStore resourceStore;
+    protected final Admin hbaseAdmin;
+
+    final Configuration jobConf;
+    final FileSystem jobFS;
+    final String jobHdfsWorkingDirectoryQualified;
+    final FileSystem hbaseFS;
+    final String hbaseHdfsWorkingDirectoryQualified;
+
+    public ClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled) throws IOException {
+        this.kylinConfig = KylinConfig.createInstanceFromUri(configURI);
+        this.restClient = new RestClient(configURI);
+        Path hdfsWorkingPath = Path.getPathWithoutSchemeAndAuthority(new Path(kylinConfig.getHdfsWorkingDirectory()));
+        String tmpHdfsWorkingDirectory = hdfsWorkingPath.toString();
+        this.hdfsWorkingDirectory = tmpHdfsWorkingDirectory.endsWith("/") ? tmpHdfsWorkingDirectory
+                : tmpHdfsWorkingDirectory + "/";
+
+        this.jobConf = KylinConfig.getConfigFromString(restClient.getHDFSConfiguration());
+        this.jobFS = FileSystem.get(jobConf);
+        this.jobHdfsWorkingDirectoryQualified = getQualifiedPath(jobConf, hdfsWorkingDirectory, ifJobFSHAEnabled);
+
+        this.hbaseConf = KylinConfig.getConfigFromString(restClient.getHBaseConfiguration());
+        this.hbaseFS = FileSystem.get(hbaseConf);
+        this.hbaseHdfsWorkingDirectoryQualified = getQualifiedPath(hbaseConf, hdfsWorkingDirectory, ifHBaseFSHAEnabled);
+
+        this.hbaseConn = ConnectionFactory.createConnection(hbaseConf);
+        this.resourceStore = new HBaseResourceStore(kylinConfig) {
+            @Override
+            protected Connection getConnection() {
+                return hbaseConn;
+            }
+
+            @Override
+            protected Configuration getCurrentHBaseConfiguration() {
+                return hbaseConf;
+            }
+        };
+        this.hbaseAdmin = hbaseConn.getAdmin();
+    }
+
+    public abstract ProjectInstance getProject(String projName) throws IOException;
+
+    public abstract DictionaryInfo getDictionaryInfo(String dictPath) throws IOException;
+
+    public abstract SnapshotTable getSnapshotTable(String snapshotPath) throws IOException;
+
+    public abstract String getRootDirQualifiedOfHTable(String tableName);
+
+    public ManagedUser getUserDetails(String userKey) throws IOException {
+        return resourceStore.getResource(userKey, KylinUserService.SERIALIZER);
+    }
+
+    public final RawResource getResource(String resPath) throws IOException {
+        return resourceStore.getResource(resPath);
+    }
+
+    public String getJobWorkingDirQualified(String jobId) {
+        return JobBuilderSupport.getJobWorkingDir(jobHdfsWorkingDirectoryQualified, jobId);
+    }
+
+    private static String getQualifiedPath(Configuration conf, String path, boolean ifHAEnabled) throws IOException {
+        String hdfsSchema = getReplacedDefaultFS(conf, !ifHAEnabled);
+        return hdfsSchema + path;
+    }
+
+    private static String getReplacedDefaultFS(Configuration conf, boolean ifNeedReplace) throws IOException {
+        String defaultFS = conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
+        if (!ifNeedReplace) {
+            return defaultFS;
+        }
+
+        String nameServices = conf.get("dfs.nameservices");
+        if (Strings.isNullOrEmpty(nameServices)) {
+            return defaultFS;
+        }
+
+        // check whether name service is defined for the default fs
+        Set<String> nameServiceSet = Sets.newHashSet(nameServices.split(","));
+        String defaultNameService = URI.create(defaultFS).getHost();
+        if (!nameServiceSet.contains(defaultNameService)) {
+            logger.info("name service {} is not defined among {}", defaultNameService, nameServices);
+            return defaultFS;
+        }
+
+        // select one usable node as the default fs
+        String haHostNames = conf.get("dfs.ha.namenodes." + defaultNameService);
+        if (!Strings.isNullOrEmpty(haHostNames)) {
+            conf = new Configuration(conf);
+            for (String oneNodeAlias : haHostNames.split(",")) {
+                String rpcNode = conf.get("dfs.namenode.rpc-address." + defaultNameService + "." + oneNodeAlias);
+                String replaced = "hdfs://" + rpcNode;
+                conf.set(FileSystem.FS_DEFAULT_NAME_KEY, replaced);
+
+                Path rootPath = new Path(replaced + "/");
+                FileSystem fs = FileSystem.get(conf);
+                try {
+                    fs.getStatus(rootPath);
+                } catch (Exception e) {
+                    logger.warn("cannot use {} as default fs due to ", replaced, e);
+                    continue;
+                }
+                logger.info("replaced the default fs {} by {}", defaultFS, replaced);
+                return replaced;
+            }
+            throw new IllegalArgumentException("fail to replace the default fs " + defaultFS);
+        }
+        throw new IllegalArgumentException("dfs.ha.namenodes." + defaultNameService + " is not set");
+    }
+}
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
new file mode 100644
index 0000000..95efab0
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.OptionsParser;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DictionaryDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.rest.response.HBaseResponse;
+import org.apache.kylin.rest.security.ManagedUser;
+import org.apache.kylin.rest.service.HBaseInfoUtil;
+import org.apache.kylin.rest.service.KylinUserService;
+import org.apache.kylin.rest.service.update.TableSchemaUpdateMapping;
+import org.apache.kylin.rest.service.update.TableSchemaUpdater;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class CubeMigrationCrossClusterCLI extends AbstractApplication {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCrossClusterCLI.class);
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_KYLIN_URI_SRC = OptionBuilder.withArgName("kylinUriSrc").hasArg().isRequired(true)
+            .withDescription("Specify the source kylin uri with format user:pwd@host:port").create("kylinUriSrc");
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_KYLIN_URI_DST = OptionBuilder.withArgName("kylinUriDst").hasArg().isRequired(true)
+            .withDescription("Specify the destination kylin uri with format user:pwd@host:port").create("kylinUriDst");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_UPDATE_MAPPING = OptionBuilder.withArgName("updateMappingPath").hasArg()
+            .isRequired(false).withDescription("Specify the path for the update mapping file")
+            .create("updateMappingPath");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false)
+            .withDescription("Specify which cube to extract").create("cube");
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false)
+            .withDescription("Specify which hybrid to extract").create("hybrid");
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false)
+            .withDescription("Specify realizations in which project to extract").create("project");
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_All = OptionBuilder.withArgName("all").hasArg(false).isRequired(false)
+            .withDescription("Specify realizations in all projects to extract").create("all");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_DST_HIVE_CHECK = OptionBuilder.withArgName("dstHiveCheck").hasArg()
+            .isRequired(false).withDescription("Specify whether to check destination hive tables")
+            .create("dstHiveCheck");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_OVERWRITE = OptionBuilder.withArgName("overwrite").hasArg().isRequired(false)
+            .withDescription("Specify whether to overwrite existing cubes").create("overwrite");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_SCHEMA_ONLY = OptionBuilder.withArgName("schemaOnly").hasArg().isRequired(false)
+            .withDescription("Specify whether only migrate cube related schema").create("schemaOnly");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_EXECUTE = OptionBuilder.withArgName("execute").hasArg().isRequired(false)
+            .withDescription("Specify whether it's to execute the migration").create("execute");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_COPROCESSOR_PATH = OptionBuilder.withArgName("coprocessorPath").hasArg()
+            .isRequired(false).withDescription("Specify the path of coprocessor to be deployed")
+            .create("coprocessorPath");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_FS_HA_ENABLED_CODE = OptionBuilder.withArgName("codeOfFSHAEnabled").hasArg()
+            .isRequired(false).withDescription("Specify whether to enable the namenode ha of clusters")
+            .create("codeOfFSHAEnabled");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_DISTCP_JOB_QUEUE = OptionBuilder.withArgName("distCpJobQueue").hasArg()
+            .isRequired(false).withDescription("Specify the mapreduce.job.queuename for DistCp job ")
+            .create("distCpJobQueue");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_DISTCP_JOB_MEMORY = OptionBuilder.withArgName("distCpJobMemory").hasArg()
+            .isRequired(false).withDescription("Specify the mapreduce.map.memory.mb for DistCp job ")
+            .create("distCpJobMemory");
+
+    @SuppressWarnings("static-access")
+    public static final Option OPTION_THREAD_NUM = OptionBuilder.withArgName("nThread").hasArg().isRequired(false)
+            .withDescription("Specify the number of threads for migrating cube data in parallel ").create("nThread");
+
+    protected final Options options;
+
+    private Configuration distCpConf;
+
+    protected SrcClusterUtil srcCluster;
+    protected DstClusterUtil dstCluster;
+
+    private int codeOfFSHAEnabled = 3;
+    protected int nThread;
+
+    private boolean ifDstHiveCheck = true;
+    private boolean ifSchemaOnly = true;
+    private boolean ifExecute = false;
+    private boolean ifOverwrite = false;
+
+    private String coprocessorJarPath;
+
+    private Set<CubeInstance> cubes = Sets.newHashSet();
+    private Set<HybridInstance> hybrids = Sets.newHashSet();
+    private Set<ProjectInstance> projects = Sets.newHashSet();
+
+    private Map<String, TableSchemaUpdateMapping> mappings = Maps.newHashMap();
+
+    private Map<String, ProjectInstance> dstProjects = Maps.newHashMap();
+
+    public CubeMigrationCrossClusterCLI() {
+        OptionGroup realizationOrProject = new OptionGroup();
+        realizationOrProject.addOption(OPTION_CUBE);
+        realizationOrProject.addOption(OPTION_HYBRID);
+        realizationOrProject.addOption(OPTION_PROJECT);
+        realizationOrProject.addOption(OPTION_All);
+        realizationOrProject.setRequired(true);
+
+        options = new Options();
+        options.addOption(OPTION_KYLIN_URI_SRC);
+        options.addOption(OPTION_KYLIN_URI_DST);
+        options.addOption(OPTION_FS_HA_ENABLED_CODE);
+        options.addOption(OPTION_UPDATE_MAPPING);
+        options.addOptionGroup(realizationOrProject);
+        options.addOption(OPTION_DST_HIVE_CHECK);
+        options.addOption(OPTION_SCHEMA_ONLY);
+        options.addOption(OPTION_OVERWRITE);
+        options.addOption(OPTION_EXECUTE);
+        options.addOption(OPTION_COPROCESSOR_PATH);
+        options.addOption(OPTION_DISTCP_JOB_QUEUE);
+        options.addOption(OPTION_THREAD_NUM);
+        options.addOption(OPTION_DISTCP_JOB_MEMORY);
+    }
+
+    protected Options getOptions() {
+        return options;
+    }
+
+    public static boolean ifFSHAEnabled(int code, int pos) {
+        int which = 1 << pos;
+        return (code & which) == which;
+    }
+
+    protected void init(OptionsHelper optionsHelper) throws Exception {
+        if (optionsHelper.hasOption(OPTION_UPDATE_MAPPING)) {
+            File mappingFile = new File(optionsHelper.getOptionValue(OPTION_UPDATE_MAPPING));
+            String content = new String(Files.readAllBytes(mappingFile.toPath()), Charset.defaultCharset());
+            Map<String, TableSchemaUpdateMapping> tmpMappings = JsonUtil.readValue(content,
+                    new TypeReference<Map<String, TableSchemaUpdateMapping>>() {
+                    });
+            mappings = Maps.newHashMapWithExpectedSize(tmpMappings.size());
+            for (Map.Entry<String, TableSchemaUpdateMapping> entry : tmpMappings.entrySet()) {
+                mappings.put(entry.getKey().toUpperCase(Locale.ROOT), entry.getValue());
+            }
+        }
+
+        ifDstHiveCheck = optionsHelper.hasOption(OPTION_DST_HIVE_CHECK)
+                ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_DST_HIVE_CHECK))
+                : true;
+        ifSchemaOnly = optionsHelper.hasOption(OPTION_SCHEMA_ONLY)
+                ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_SCHEMA_ONLY))
+                : true;
+        ifOverwrite = optionsHelper.hasOption(OPTION_OVERWRITE)
+                ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_OVERWRITE))
+                : false;
+        ifExecute = optionsHelper.hasOption(OPTION_EXECUTE)
+                ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_EXECUTE))
+                : false;
+
+        codeOfFSHAEnabled = optionsHelper.hasOption(OPTION_FS_HA_ENABLED_CODE)
+                ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_FS_HA_ENABLED_CODE))
+                : 3;
+
+        String srcConfigURI = optionsHelper.getOptionValue(OPTION_KYLIN_URI_SRC);
+        srcCluster = new SrcClusterUtil(srcConfigURI, ifFSHAEnabled(codeOfFSHAEnabled, 0),
+                ifFSHAEnabled(codeOfFSHAEnabled, 1));
+        String dstConfigURI = optionsHelper.getOptionValue(OPTION_KYLIN_URI_DST);
+        dstCluster = new DstClusterUtil(dstConfigURI, ifFSHAEnabled(codeOfFSHAEnabled, 2),
+                ifFSHAEnabled(codeOfFSHAEnabled, 3), ifExecute);
+
+        distCpConf = new Configuration(srcCluster.jobConf);
+        if (optionsHelper.hasOption(OPTION_DISTCP_JOB_QUEUE)) {
+            distCpConf.set("mapreduce.job.queuename", optionsHelper.getOptionValue(OPTION_DISTCP_JOB_QUEUE));
+        }
+        int distCpMemory = optionsHelper.hasOption(OPTION_DISTCP_JOB_MEMORY)
+                ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_DISTCP_JOB_MEMORY))
+                : 1500;
+        int distCpJVMMemory = distCpMemory * 4 / 5;
+        distCpConf.set("mapreduce.map.memory.mb", "" + distCpMemory);
+        distCpConf.set("mapreduce.map.java.opts",
+                "-server -Xmx" + distCpJVMMemory + "m -Djava.net.preferIPv4Stack=true");
+
+        nThread = optionsHelper.hasOption(OPTION_THREAD_NUM)
+                ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_THREAD_NUM))
+                : 8;
+
+        coprocessorJarPath = optionsHelper.hasOption(OPTION_COPROCESSOR_PATH)
+                ? optionsHelper.getOptionValue(OPTION_COPROCESSOR_PATH)
+                : srcCluster.getDefaultCoprocessorJarPath();
+    }
+
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        init(optionsHelper);
+
+        if (optionsHelper.hasOption(OPTION_All)) {
+            projects.addAll(srcCluster.listAllProjects());
+        } else if (optionsHelper.hasOption(OPTION_PROJECT)) {
+            Set<String> projectNames = Sets.newHashSet(optionsHelper.getOptionValue(OPTION_PROJECT).split(","));
+            for (String projectName : projectNames) {
+                ProjectInstance project = srcCluster.getProject(projectName);
+                if (project == null) {
+                    throw new IllegalArgumentException("No project found with name of " + projectName);
+                }
+                projects.add(project);
+            }
+        } else if (optionsHelper.hasOption(OPTION_CUBE)) {
+            String cubeNames = optionsHelper.getOptionValue(OPTION_CUBE);
+            for (String cubeName : cubeNames.split(",")) {
+                CubeInstance cube = srcCluster.getCube(cubeName);
+                if (cube == null) {
+                    throw new IllegalArgumentException("No cube found with name of " + cubeName);
+                } else {
+                    cubes.add(cube);
+                }
+            }
+        } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
+            String hybridNames = optionsHelper.getOptionValue(OPTION_HYBRID);
+            for (String hybridName : hybridNames.split(",")) {
+                HybridInstance hybridInstance = srcCluster.getHybrid(hybridName);
+                if (hybridInstance != null) {
+                    hybrids.add(hybridInstance);
+                } else {
+                    throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
+                }
+            }
+        }
+
+        if (!projects.isEmpty()) {
+            for (ProjectInstance project : projects) {
+                for (RealizationEntry entry : project.getRealizationEntries()) {
+                    IRealization realization = srcCluster.getRealization(entry);
+                    addRealization(realization);
+                }
+            }
+        }
+        if (!hybrids.isEmpty()) {
+            for (HybridInstance hybrid : hybrids) {
+                addHybrid(hybrid);
+            }
+        }
+
+        Map<CubeInstance, Exception> failedCubes = Maps.newHashMap();
+
+        for (CubeInstance cube : cubes) {
+            logger.info("start to migrate cube {}", cube);
+            try {
+                migrateCube(cube);
+                logger.info("finish migrating cube {}", cube);
+            } catch (Exception e) {
+                logger.error("fail to migrate cube {} due to ", cube, e);
+                failedCubes.put(cube, e);
+            }
+        }
+
+        for (HybridInstance hybrid : hybrids) {
+            dstCluster.saveHybrid(hybrid);
+
+            // update project
+            ProjectInstance srcProject = srcCluster.getProjectByRealization(RealizationType.HYBRID, hybrid.getName());
+            ProjectInstance dstProject = getDstProject(srcProject);
+
+            // update hybrids
+            Set<RealizationEntry> projReals = Sets.newHashSet(dstProject.getRealizationEntries());
+            projReals.add(RealizationEntry.create(RealizationType.HYBRID, hybrid.getName()));
+            dstProject.setRealizationEntries(Lists.newArrayList(projReals));
+
+            dstProjects.put(dstProject.getName(), dstProject);
+        }
+
+        for (String projName : dstProjects.keySet()) {
+            dstCluster.saveProject(dstProjects.get(projName));
+        }
+
+        dstCluster.updateMeta();
+
+        if (failedCubes.isEmpty()) {
+            logger.info("Migration for cubes {}, hyrbids {} all succeed", cubes, hybrids);
+        } else {
+            logger.warn("Failed to migrate cubes {} and need to check the detailed reason and retry again!!!",
+                    failedCubes.keySet());
+        }
+    }
+
+    private void migrateCube(CubeInstance cube) throws IOException {
+        if (!ifOverwrite && dstCluster.exists(CubeInstance.concatResourcePath(cube.getName()))) {
+            throw new RuntimeException(("The cube named " + cube.getName()
+                    + " already exists on target metadata store. Please delete it firstly and try again"));
+        }
+
+        ProjectInstance srcProject = srcCluster.getProjectByRealization(RealizationType.CUBE, cube.getName());
+
+        String descName = cube.getDescName();
+        CubeDesc cubeDesc = srcCluster.getCubeDesc(descName);
+
+        String modelName = cubeDesc.getModelName();
+        DataModelDesc modelDesc = srcCluster.getDataModelDesc(modelName);
+
+        Set<TableDesc> tableSet = Sets.newHashSet();
+        for (TableRef tableRef : modelDesc.getAllTables()) {
+            TableDesc tableDescOld = srcCluster.getTableDesc(tableRef.getTableIdentity(), srcProject.getName());
+            TableDesc tableDescUpdated = TableSchemaUpdater.dealWithMappingForTable(tableDescOld, mappings);
+            tableSet.add(tableDescUpdated);
+        }
+
+        modelDesc = TableSchemaUpdater.dealWithMappingForModel(modelDesc, mappings);
+
+        cubeDesc = TableSchemaUpdater.dealWithMappingForCubeDesc(cubeDesc, mappings);
+
+        { // compatibility check before migrating to the destination cluster
+            dstCluster.checkCompatibility(srcProject.getName(), tableSet, modelDesc, ifDstHiveCheck);
+        }
+
+        {
+            for (TableDesc table : tableSet) {
+                dstCluster.saveTableDesc(table);
+            }
+
+            dstCluster.saveModelDesc(modelDesc);
+
+            dstCluster.saveCubeDesc(cubeDesc);
+
+            if (ifSchemaOnly) {
+                cube = CubeInstance.getCopyOf(cube);
+                cube.getSegments().clear();
+                cube.resetSnapshots();
+                cube.setStatus(RealizationStatusEnum.DISABLED);
+                cube.clearCuboids();
+            } else {
+                // cube with global dictionary cannot be migrated with data
+                checkGlobalDict(cubeDesc);
+
+                // delete those NEW segments and only keep the READY segments
+                cube.setSegments(cube.getSegments(SegmentStatusEnum.READY));
+
+                cube = TableSchemaUpdater.dealWithMappingForCube(cube, mappings);
+
+                ExecutorService executor = Executors.newFixedThreadPool(nThread, new ThreadFactoryBuilder()
+                        .setNameFormat("Cube-" + cube.getName() + "-data-migration-pool-%d").build());
+                try {
+                    List<Future<?>> futureList = migrateCubeData(cube, cubeDesc, executor);
+                    executor.shutdown();
+                    for (Future<?> future : futureList) {
+                        try {
+                            future.get();
+                        } catch (InterruptedException e) {
+                            logger.warn(e.getMessage());
+                        } catch (ExecutionException e) {
+                            executor.shutdownNow();
+                            logger.error(e.getMessage());
+                            throw new RuntimeException(e);
+                        }
+                    }
+                } finally {
+                    // in case that exceptions are thrown when call migrateCubeData()
+                    if (!executor.isShutdown()) {
+                        logger.warn("shut down executor for cube {}", cube);
+                        executor.shutdownNow();
+                    }
+                }
+            }
+
+            dstCluster.saveCubeInstance(cube);
+        }
+
+        {
+            ProjectInstance dstProject = getDstProject(srcProject);
+
+            // update tables in project
+            Set<String> projTables = Sets.newHashSet(dstProject.getTables());
+            projTables.addAll(tableSet.stream().map(TableDesc::getIdentity).collect(Collectors.toSet()));
+            dstProject.setTables(projTables);
+
+            // update models in project
+            Set<String> projModels = Sets.newHashSet(dstProject.getModels());
+            projModels.add(modelName);
+            dstProject.setModels(Lists.newArrayList(projModels));
+
+            // update cubes in project
+            Set<RealizationEntry> projReals = Sets.newHashSet(dstProject.getRealizationEntries());
+            projReals.add(RealizationEntry.create(RealizationType.CUBE, cube.getName()));
+            dstProject.setRealizationEntries(Lists.newArrayList(projReals));
+
+            dstProjects.put(dstProject.getName(), dstProject);
+        }
+    }
+
+    private void checkGlobalDict(CubeDesc cubeDesc) {
+        if (cubeDesc.getDictionaries() != null && !cubeDesc.getDictionaries().isEmpty()) {
+            for (DictionaryDesc dictDesc : cubeDesc.getDictionaries()) {
+                if (GlobalDictionaryBuilder.class.getName().equalsIgnoreCase(dictDesc.getBuilderClass())) {
+                    throw new RuntimeException("it's not supported to migrate global dictionaries " + dictDesc
+                            + " for cube " + cubeDesc.getName());
+                }
+            }
+        }
+    }
+
+    private List<Future<?>> migrateCubeData(CubeInstance cube, CubeDesc cubeDesc, ExecutorService executor)
+            throws IOException {
+        List<Future<?>> futureList = Lists.newLinkedList();
+
+        for (final CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
+            logger.info("start to migrate segment: {} {}", cube, segment.getName());
+            copyMetaResource(segment.getStatisticsResourcePath());
+            for (String dict : segment.getDictionaryPaths()) {
+                copyDictionary(cube, dict);
+            }
+            for (String snapshot : segment.getSnapshotPaths()) {
+                copySnapshot(cube, snapshot);
+            }
+            Future<?> future;
+            future = executor.submit(new MyRunnable() {
+                @Override
+                public void doRun() throws Exception {
+                    copyHDFSJobInfo(segment.getLastBuildJobID());
+                }
+            });
+            futureList.add(future);
+
+            future = executor.submit(new MyRunnable() {
+                @Override
+                public void doRun() throws Exception {
+                    copyHTable(segment);
+                }
+            });
+            futureList.add(future);
+
+            logger.info("add segment {} to migration list", segment);
+        }
+        if (cubeDesc.getSnapshotTableDescList() != null) {
+            for (SnapshotTableDesc snapshotTable : cubeDesc.getSnapshotTableDescList()) {
+                if (snapshotTable.isGlobal()) {
+                    String snapshotResPath = cube.getSnapshotResPath(snapshotTable.getTableName());
+                    if (snapshotTable.isExtSnapshotTable()) {
+                        final ExtTableSnapshotInfo extSnapshot = srcCluster.getExtTableSnapshotInfo(snapshotResPath);
+                        dstCluster.saveExtSnapshotTableInfo(extSnapshot);
+                        if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(extSnapshot.getStorageType())) {
+                            Future<?> future = executor.submit(new MyRunnable() {
+                                @Override
+                                public void doRun() throws Exception {
+                                    copyHTable(extSnapshot);
+                                }
+                            });
+                            futureList.add(future);
+                        }
+                    } else {
+                        copySnapshot(cube, snapshotResPath);
+                    }
+                    logger.info("add cube-level snapshot table {} for cube {} to migration list", snapshotResPath,
+                            cube);
+                }
+            }
+        }
+
+        return futureList;
+    }
+
+    private ProjectInstance getDstProject(ProjectInstance srcProject) throws IOException {
+        ProjectInstance dstProject = dstProjects.get(srcProject.getName());
+        if (dstProject == null) {
+            dstProject = dstCluster.getProject(srcProject.getName());
+        }
+        if (dstProject == null) {
+            dstProject = ProjectInstance.create(srcProject.getName(), srcProject.getOwner(),
+                    srcProject.getDescription(), srcProject.getOverrideKylinProps(), null, null);
+            dstProject.setUuid(srcProject.getUuid());
+        }
+        return dstProject;
+    }
+
+    private void putUserInfo(String userName) throws IOException {
+        String userKey = KylinUserService.getId(userName);
+        ManagedUser user = srcCluster.getUserDetails(userKey);
+        if (user == null) {
+            logger.warn("Cannot find user {}", userName);
+            return;
+        }
+        dstCluster.saveUserInfo(userKey, user);
+    }
+
+    private void copyMetaResource(String item) throws IOException {
+        RawResource res = srcCluster.getResource(item);
+        dstCluster.putResource(item, res);
+        res.content().close();
+    }
+
+    private void copyDictionary(CubeInstance cube, String dictPath) throws IOException {
+        if (dstCluster.exists(dictPath)) {
+            logger.info("Item {} has already existed in destination cluster", dictPath);
+            return;
+        }
+        DictionaryInfo dictInfo = srcCluster.getDictionaryInfo(dictPath);
+        String dupDict = dstCluster.saveDictionary(dictInfo);
+        if (dupDict != null) {
+            for (CubeSegment segment : cube.getSegments()) {
+                for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) {
+                    if (entry.getValue().equalsIgnoreCase(dictPath)) {
+                        entry.setValue(dupDict);
+                    }
+                }
+            }
+            logger.info("Item {} is dup, instead {} is reused", dictPath, dupDict);
+        }
+    }
+
+    private void copySnapshot(CubeInstance cube, String snapshotTablePath) throws IOException {
+        if (dstCluster.exists(snapshotTablePath)) {
+            logger.info("Item {} has already existed in destination cluster", snapshotTablePath);
+            return;
+        }
+        SnapshotTable snapshotTable = srcCluster.getSnapshotTable(snapshotTablePath);
+        dstCluster.saveSnapshotTable(snapshotTable);
+    }
+
+    private void copyHDFSJobInfo(String jobId) throws Exception {
+        String srcDirQualified = srcCluster.getJobWorkingDirQualified(jobId);
+        String dstDirQualified = dstCluster.getJobWorkingDirQualified(jobId);
+        if (ifExecute) {
+            dstCluster.copyInitOnJobCluster(new Path(dstDirQualified));
+            copyHDFSPath(srcDirQualified, srcCluster.jobConf, dstDirQualified, dstCluster.jobConf);
+        } else {
+            logger.info("copied hdfs directory from {} to {}", srcDirQualified, dstDirQualified);
+        }
+    }
+
+    private void copyHTable(CubeSegment segment) throws IOException {
+        String tableName = segment.getStorageLocationIdentifier();
+        if (ifExecute) {
+            if (checkHTableExist(segment)) {
+                logger.info("htable {} has already existed in dst, will skip the migration", tableName);
+            } else {
+                copyHTable(tableName, true);
+                if (!checkHTableEquals(tableName)) {
+                    logger.error("htable {} is copied to dst with different size!!!", tableName);
+                }
+            }
+        }
+        logger.info("migrated htable {} for segment {}", tableName, segment);
+    }
+
+    private boolean checkHTableExist(CubeSegment segment) throws IOException {
+        String tableName = segment.getStorageLocationIdentifier();
+        TableName htableName = TableName.valueOf(tableName);
+        if (!dstCluster.checkExist(htableName, segment)) {
+            return false;
+        }
+
+        if (!checkHTableEquals(tableName)) {
+            logger.warn("although htable {} exists in destination, the details data are different", tableName);
+            dstCluster.deleteHTable(tableName);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean checkHTableEquals(String tableName) throws IOException {
+        HBaseResponse respSrc = HBaseInfoUtil.getHBaseInfo(tableName, srcCluster.hbaseConn);
+        HBaseResponse respDst = HBaseInfoUtil.getHBaseInfo(tableName, dstCluster.hbaseConn);
+        return HBaseInfoUtil.checkEquals(respSrc, respDst);
+    }
+
+    private void copyHTable(ExtTableSnapshotInfo extTableSnapshotInfo) throws IOException {
+        String tableName = extTableSnapshotInfo.getStorageLocationIdentifier();
+        if (ifExecute) {
+            TableName htableName = TableName.valueOf(tableName);
+            if (dstCluster.htableExists(htableName)) {
+                logger.warn("htable {} already exists in the dst cluster and will skip the htable migration");
+            } else {
+                copyHTable(tableName, false);
+            }
+        }
+        logger.info("migrated htable {} for ext table snapshot {}", tableName, extTableSnapshotInfo.getTableName());
+    }
+
+    private void copyHTable(String tableName, boolean ifDeployCoprocessor) {
+        if (ifExecute) {
+            TableName htableName = TableName.valueOf(tableName);
+            try {
+                //migrate data first
+                copyHFileByDistCp(tableName);
+
+                //create htable metadata, especially the split keys for predefining the regions
+                Table table = srcCluster.hbaseConn.getTable(TableName.valueOf(tableName));
+                byte[][] endKeys = srcCluster.hbaseConn.getRegionLocator(htableName).getEndKeys();
+                byte[][] splitKeys = Arrays.copyOfRange(endKeys, 0, endKeys.length - 1);
+
+                HTableDescriptor tableDesc = new HTableDescriptor(table.getTableDescriptor());
+                //change the table host
+                dstCluster.resetTableHost(tableDesc);
+                if (ifDeployCoprocessor) {
+                    dstCluster.deployCoprocessor(tableDesc, coprocessorJarPath);
+                }
+                dstCluster.createTable(tableDesc, splitKeys);
+
+                //do bulk load to sync up htable data and metadata
+                dstCluster.bulkLoadTable(tableName);
+            } catch (Exception e) {
+                logger.error("fail to migrate htable {} due to {} ", tableName, e);
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    protected void copyHFileByDistCp(String tableName) throws Exception {
+        String srcDirQualified = srcCluster.getRootDirQualifiedOfHTable(tableName);
+        String dstDirQualified = dstCluster.getRootDirQualifiedOfHTable(tableName);
+        dstCluster.copyInitOnHBaseCluster(new Path(dstDirQualified));
+        copyHDFSPath(srcDirQualified, srcCluster.hbaseConf, dstDirQualified, dstCluster.hbaseConf);
+    }
+
+    protected void copyHDFSPath(String srcDir, Configuration srcConf, String dstDir, Configuration dstConf)
+            throws Exception {
+        logger.info("start to copy hdfs directory from {} to {}", srcDir, dstDir);
+        DistCpOptions distCpOptions = OptionsParser.parse(new String[] { srcDir, dstDir });
+        distCpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
+        distCpOptions.setBlocking(true);
+        setTargetPathExists(distCpOptions);
+        DistCp distCp = new DistCp(getConfOfDistCp(), distCpOptions);
+        distCp.execute();
+        logger.info("copied hdfs directory from {} to {}", srcDir, dstDir);
+    }
+
+    protected Configuration getConfOfDistCp() {
+        return distCpConf;
+    }
+
+    /**
+     * Set targetPathExists in both inputOptions and job config,
+     * for the benefit of CopyCommitter
+     */
+    public void setTargetPathExists(DistCpOptions inputOptions) throws IOException {
+        Path target = inputOptions.getTargetPath();
+        FileSystem targetFS = target.getFileSystem(dstCluster.jobConf);
+        boolean targetExists = targetFS.exists(target);
+        inputOptions.setTargetPathExists(targetExists);
+        dstCluster.jobConf.setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists);
+    }
+
+    private void addHybrid(HybridInstance hybrid) {
+        hybrids.add(hybrid);
+        for (IRealization realization : hybrid.getRealizations()) {
+            addRealization(realization);
+        }
+    }
+
+    private void addRealization(IRealization realization) {
+        if (realization instanceof HybridInstance) {
+            addHybrid((HybridInstance) realization);
+        } else if (realization instanceof CubeInstance) {
+            cubes.add((CubeInstance) realization);
+        } else {
+            logger.warn("Realization {} is neither hybrid nor cube", realization);
+        }
+    }
+
+    private static abstract class MyRunnable implements Runnable {
+        @Override
+        public void run() {
+            try {
+                doRun();
+            } catch (RuntimeException e) {
+                throw e;
+            } catch (Throwable e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        public abstract void doRun() throws Exception;
+    }
+
+    public static void main(String[] args) {
+        CubeMigrationCrossClusterCLI cli = new CubeMigrationCrossClusterCLI();
+        cli.execute(args);
+    }
+}
\ No newline at end of file
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
new file mode 100644
index 0000000..e578935
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+import static org.apache.kylin.metadata.realization.IRealizationConstants.HTableSegmentTag;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.rest.security.ManagedUser;
+import org.apache.kylin.rest.service.KylinUserService;
+import org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class DstClusterUtil extends ClusterUtil {
+    private static final Logger logger = LoggerFactory.getLogger(DstClusterUtil.class);
+
+    public static final String hbaseSubDir = "migration/hbase/data/default/";
+
+    private final String hbaseDataDirQualified;
+    private final String hbaseDataDir;
+
+    private final boolean ifExecute;
+
+    public DstClusterUtil(String configURI, boolean ifExecute) throws IOException {
+        this(configURI, true, true, ifExecute);
+    }
+
+    public DstClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled, boolean ifExecute)
+            throws IOException {
+        super(configURI, ifJobFSHAEnabled, ifHBaseFSHAEnabled);
+        this.hbaseDataDirQualified = hbaseHdfsWorkingDirectoryQualified + hbaseSubDir;
+        this.hbaseDataDir = hdfsWorkingDirectory + hbaseSubDir;
+        this.ifExecute = ifExecute;
+    }
+
+    @Override
+    public ProjectInstance getProject(String projName) throws IOException {
+        return resourceStore.getResource(ProjectInstance.concatResourcePath(projName),
+                ProjectManager.PROJECT_SERIALIZER);
+    }
+
+    @Override
+    public DictionaryInfo getDictionaryInfo(String dictPath) throws IOException {
+        return resourceStore.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER);
+    }
+
+    @Override
+    public SnapshotTable getSnapshotTable(String snapshotPath) throws IOException {
+        return resourceStore.getResource(snapshotPath, SnapshotTableSerializer.FULL_SERIALIZER);
+    }
+
+    @Override
+    public String getRootDirQualifiedOfHTable(String tableName) {
+        return hbaseDataDirQualified + tableName;
+    }
+
+    private String getRootDirOfHTable(String tableName) {
+        return hbaseDataDir + tableName;
+    }
+
+    public boolean exists(String resPath) throws IOException {
+        return resourceStore.exists(resPath);
+    }
+
+    public void checkCompatibility(String projectName, Set<TableDesc> tableSet, DataModelDesc modelDesc,
+            boolean ifHiveCheck) throws IOException {
+        List<String> tableDataList = Lists.newArrayList();
+        for (TableDesc table : tableSet) {
+            tableDataList.add(JsonUtil.writeValueAsIndentString(table));
+        }
+
+        String modelDescData = JsonUtil.writeValueAsIndentString(modelDesc);
+
+        CompatibilityCheckRequest request = new CompatibilityCheckRequest();
+        request.setProjectName(projectName);
+        request.setTableDescDataList(tableDataList);
+        request.setModelDescData(modelDescData);
+
+        String jsonRequest = JsonUtil.writeValueAsIndentString(request);
+        restClient.checkCompatibility(jsonRequest, ifHiveCheck);
+    }
+
+    public void saveProject(ProjectInstance projInstance) throws IOException {
+        if (ifExecute) {
+            putMetaResource(ProjectInstance.concatResourcePath(projInstance.getName()), projInstance,
+                    ProjectManager.PROJECT_SERIALIZER);
+        }
+        logger.info("saved project {}", projInstance);
+    }
+
+    public void saveHybrid(HybridInstance hybridInstance) throws IOException {
+        if (ifExecute) {
+            putMetaResource(HybridInstance.concatResourcePath(hybridInstance.getName()), hybridInstance,
+                    HybridManager.HYBRID_SERIALIZER);
+        }
+        logger.info("saved hybrid {}", hybridInstance);
+    }
+
+    public void saveTableDesc(TableDesc table) throws IOException {
+        if (ifExecute) {
+            putMetaResource(TableDesc.concatResourcePath(table.getIdentity(), table.getProject()), table,
+                    TableMetadataManager.TABLE_SERIALIZER);
+        }
+        logger.info("saved table {}", table);
+    }
+
+    public void saveModelDesc(DataModelDesc modelDesc) throws IOException {
+        if (ifExecute) {
+            putMetaResource(DataModelDesc.concatResourcePath(modelDesc.getName()), modelDesc,
+                    DataModelManager.MODELDESC_SERIALIZER);
+        }
+        logger.info("saved model {}", modelDesc);
+    }
+
+    public void saveCubeDesc(CubeDesc cubeDesc) throws IOException {
+        if (ifExecute) {
+            putMetaResource(CubeDesc.concatResourcePath(cubeDesc.getName()), cubeDesc,
+                    CubeDescManager.CUBE_DESC_SERIALIZER);
+        }
+        logger.info("saved cube desc {}", cubeDesc);
+    }
+
+    public void saveCubeInstance(CubeInstance cube) throws IOException {
+        if (ifExecute) {
+            putMetaResource(CubeInstance.concatResourcePath(cube.getName()), cube, CubeManager.CUBE_SERIALIZER);
+        }
+        logger.info("saved cube instance {}", cube);
+    }
+
+    public String saveDictionary(DictionaryInfo dictInfo) throws IOException {
+        String dupDict = checkDupDict(dictInfo);
+        if (dupDict == null) {
+            putMetaResource(dictInfo.getResourcePath(), dictInfo, DictionaryInfoSerializer.FULL_SERIALIZER);
+            logger.info("saved dictionary {}", dictInfo.getResourcePath());
+        }
+        return dupDict;
+    }
+
+    private String checkDupDict(DictionaryInfo dictInfo) throws IOException {
+        NavigableSet<String> existings = resourceStore.listResources(dictInfo.getResourceDir());
+        if (existings == null)
+            return null;
+
+        logger.info("{} existing dictionaries of the same column", existings.size());
+        if (existings.size() > 100) {
+            logger.warn("Too many dictionaries under {}, dict count: {}", dictInfo.getResourceDir(), existings.size());
+        }
+
+        for (String existing : existings) {
+            DictionaryInfo existingInfo = getDictionaryInfo(existing);
+            if (existingInfo != null && dictInfo.getDictionaryObject().equals(existingInfo.getDictionaryObject())) {
+                return existing;
+            }
+        }
+
+        return null;
+    }
+
+    public void saveSnapshotTable(SnapshotTable snapshotTable) throws IOException {
+        putMetaResource(snapshotTable.getResourcePath(), snapshotTable, SnapshotTableSerializer.FULL_SERIALIZER);
+        logger.info("saved snapshot table {}", snapshotTable.getResourcePath());
+    }
+
+    public void saveExtSnapshotTableInfo(ExtTableSnapshotInfo extTableSnapshotInfo) throws IOException {
+        putMetaResource(extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo,
+                ExtTableSnapshotInfoManager.SNAPSHOT_SERIALIZER);
+        logger.info("saved ext snapshot table info {}", extTableSnapshotInfo.getResourcePath());
+    }
+
+    public void saveUserInfo(String userKey, ManagedUser user) throws IOException {
+        if (ifExecute) {
+            putMetaResource(userKey, user, KylinUserService.SERIALIZER);
+        }
+        logger.info("saved user info {}", userKey);
+    }
+
+    private <T extends RootPersistentEntity> void putMetaResource(String resPath, T obj, Serializer<T> serializer)
+            throws IOException {
+        putMetaResource(resPath, obj, serializer, true);
+    }
+
+    public <T extends RootPersistentEntity> void putMetaResource(String resPath, T obj, Serializer<T> serializer,
+            boolean withoutCheck) throws IOException {
+        if (ifExecute) {
+            if (withoutCheck) {
+                resourceStore.putResource(resPath, obj, System.currentTimeMillis(), serializer);
+            } else {
+                resourceStore.checkAndPutResource(resPath, obj, System.currentTimeMillis(), serializer);
+            }
+        }
+        logger.info("saved resource {}", resPath);
+    }
+
+    public void putResource(String resPath, RawResource res) throws IOException {
+        if (ifExecute) {
+            resourceStore.putResource(resPath, res.content(), res.lastModified());
+        }
+        logger.info("saved resource {}", resPath);
+    }
+
+    // if htable does not exist in dst, return false;
+    // if htable exists in dst, and the segment tags are the same, if the htable is enabled, then return true;
+    //                                                             else delete the htable and return false;
+    //                          else the htable is used by others, should throw runtime exception
+    public boolean checkExist(TableName htableName, CubeSegment segment) throws IOException {
+        if (!htableExists(htableName)) {
+            return false;
+        }
+        Table table = hbaseConn.getTable(htableName);
+        HTableDescriptor tableDesc = table.getTableDescriptor();
+        if (segment.toString().equals(tableDesc.getValue(HTableSegmentTag))) {
+            if (hbaseAdmin.isTableEnabled(htableName)) {
+                return true;
+            } else {
+                hbaseAdmin.deleteTable(htableName);
+                logger.info("htable {} is deleted", htableName);
+                return false;
+            }
+        }
+        throw new RuntimeException(
+                "htable name " + htableName + " has been used by " + tableDesc.getValue(HTableSegmentTag));
+    }
+
+    public void deleteHTable(String tableName) throws IOException {
+        TableName htableName = TableName.valueOf(tableName);
+        if (hbaseAdmin.isTableEnabled(htableName)) {
+            hbaseAdmin.disableTable(htableName);
+        }
+        hbaseAdmin.deleteTable(htableName);
+        logger.info("htable {} is deleted", htableName);
+    }
+
+    public boolean htableExists(TableName htableName) throws IOException {
+        return hbaseAdmin.tableExists(htableName);
+    }
+
+    public void resetTableHost(HTableDescriptor tableDesc) {
+        tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+    }
+
+    public void deployCoprocessor(HTableDescriptor tableDesc, String localCoprocessorJar) throws IOException {
+        List<String> existingCoprocessors = tableDesc.getCoprocessors();
+        for (String existingCoprocessor : existingCoprocessors) {
+            tableDesc.removeCoprocessor(existingCoprocessor);
+        }
+
+        Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, hbaseFS,
+                hdfsWorkingDirectory, null);
+
+        if (User.isHBaseSecurityEnabled(hbaseConf)) {
+            // add coprocessor for bulk load
+            tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+        }
+        DeployCoprocessorCLI.addCoprocessorOnHTable(tableDesc, hdfsCoprocessorJar);
+
+        logger.info("deployed hbase table {} with coprocessor.", tableDesc.getTableName());
+    }
+
+    public void createTable(HTableDescriptor tableDesc, byte[][] splitKeys) throws IOException {
+        hbaseAdmin.createTable(tableDesc, splitKeys);
+
+        logger.info("htable {} successfully created!", tableDesc.getTableName());
+    }
+
+    public void copyInitOnJobCluster(Path path) throws IOException {
+        copyInit(jobFS, path);
+    }
+
+    public void copyInitOnHBaseCluster(Path path) throws IOException {
+        copyInit(hbaseFS, path);
+    }
+
+    public static void copyInit(FileSystem fs, Path path) throws IOException {
+        path = Path.getPathWithoutSchemeAndAuthority(path);
+        Path pathP = path.getParent();
+        if (!fs.exists(pathP)) {
+            fs.mkdirs(pathP);
+        }
+        if (fs.exists(path)) {
+            logger.warn("path {} already existed and will be deleted", path);
+            HadoopUtil.deletePath(fs.getConf(), path);
+        }
+    }
+
+    public void bulkLoadTable(String tableName) throws Exception {
+        Path rootPathOfTable = new Path(getRootDirOfHTable(tableName));
+        FileStatus[] regionFiles = hbaseFS.listStatus(rootPathOfTable, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return !path.getName().startsWith(".");
+            }
+        });
+
+        for (FileStatus regionFileStatus : regionFiles) {
+            ToolRunner.run(new LoadIncrementalHFiles(hbaseConf),
+                    new String[] { regionFileStatus.getPath().toString(), tableName });
+        }
+
+        logger.info("succeed to migrate htable {}", tableName);
+    }
+
+    public void updateMeta() {
+        if (ifExecute) {
+            try {
+                logger.info("update meta cache for {}", restClient);
+                restClient.announceWipeCache(Broadcaster.SYNC_ALL, Broadcaster.Event.UPDATE.getType(),
+                        Broadcaster.SYNC_ALL);
+            } catch (IOException e) {
+                logger.error(e.getMessage());
+            }
+        }
+    }
+}
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java
new file mode 100644
index 0000000..7f865d5
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SrcClusterUtil extends ClusterUtil {
+    private static final Logger logger = LoggerFactory.getLogger(SrcClusterUtil.class);
+
+    private static final String hbaseRootDirConfKey = "hbase.rootdir";
+    private final String hbaseDataDir;
+
+    private final TableMetadataManager metadataManager;
+    private final DataModelManager modelManager;
+    private final ProjectManager projectManager;
+    private final HybridManager hybridManager;
+    private final CubeManager cubeManager;
+    private final CubeDescManager cubeDescManager;
+    private final RealizationRegistry realizationRegistry;
+    private final DictionaryManager dictionaryManager;
+    private final SnapshotManager snapshotManager;
+    private final ExtTableSnapshotInfoManager extSnapshotInfoManager;
+
+    public SrcClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled) throws IOException {
+        super(configURI, ifJobFSHAEnabled, ifHBaseFSHAEnabled);
+
+        this.hbaseDataDir = hbaseConf.get(hbaseRootDirConfKey) + "/data/default/";
+        metadataManager = TableMetadataManager.getInstance(kylinConfig);
+        modelManager = DataModelManager.getInstance(kylinConfig);
+        projectManager = ProjectManager.getInstance(kylinConfig);
+        hybridManager = HybridManager.getInstance(kylinConfig);
+        cubeManager = CubeManager.getInstance(kylinConfig);
+        cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+        realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
+        dictionaryManager = DictionaryManager.getInstance(kylinConfig);
+        snapshotManager = SnapshotManager.getInstance(kylinConfig);
+        extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+    }
+
+    public String getDefaultCoprocessorJarPath() {
+        return kylinConfig.getCoprocessorLocalJar();
+    }
+
+    @Override
+    public ProjectInstance getProject(String projectName) throws IOException {
+        return projectManager.getProject(projectName);
+    }
+
+    public List<ProjectInstance> listAllProjects() throws IOException {
+        return projectManager.listAllProjects();
+    }
+
+    public ProjectInstance getProjectByRealization(RealizationType type, String realizationName) throws IOException {
+        List<ProjectInstance> ret = projectManager.findProjects(type, realizationName);
+        return ret.isEmpty() ? null : ret.get(0);
+    }
+
+    public CubeInstance getCube(String name) throws IOException {
+        return cubeManager.getCube(name);
+    }
+
+    public CubeDesc getCubeDesc(String name) throws IOException {
+        return cubeDescManager.getCubeDesc(name);
+    }
+
+    public HybridInstance getHybrid(String name) throws IOException {
+        return hybridManager.getHybridInstance(name);
+    }
+
+    public IRealization getRealization(RealizationEntry entry) throws IOException {
+        return realizationRegistry.getRealization(entry.getType(), entry.getRealization());
+    }
+
+    public DataModelDesc getDataModelDesc(String modelName) throws IOException {
+        return modelManager.getDataModelDesc(modelName);
+    }
+
+    public TableDesc getTableDesc(String tableIdentity, String projectName) throws IOException {
+        TableDesc ret = metadataManager.getStore().getResource(TableDesc.concatResourcePath(tableIdentity, projectName),
+                TableMetadataManager.TABLE_SERIALIZER);
+        if (projectName != null && ret == null) {
+            ret = metadataManager.getStore().getResource(TableDesc.concatResourcePath(tableIdentity, null),
+                    TableMetadataManager.TABLE_SERIALIZER);
+        }
+        return ret;
+    }
+
+    @Override
+    public DictionaryInfo getDictionaryInfo(String dictPath) throws IOException {
+        return dictionaryManager.getDictionaryInfo(dictPath);
+    }
+
+    @Override
+    public SnapshotTable getSnapshotTable(String snapshotPath) throws IOException {
+        return snapshotManager.getSnapshotTable(snapshotPath);
+    }
+
+    public ExtTableSnapshotInfo getExtTableSnapshotInfo(String snapshotPath) throws IOException {
+        return extSnapshotInfoManager.getSnapshot(snapshotPath);
+    }
+
+    @Override
+    public String getRootDirQualifiedOfHTable(String tableName) {
+        return hbaseDataDir + tableName;
+    }
+}
\ No newline at end of file