You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/06/15 02:51:29 UTC
[kylin] 13/15: KYLIN-4489 Create a tool for migration cross clusters
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2e2a4dff9ca9c08351084f53d293e189083db140
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Tue May 19 11:14:42 2020 +0800
KYLIN-4489 Create a tool for migration cross clusters
---
.../java/org/apache/kylin/common/KylinConfig.java | 20 +
.../apache/kylin/common/restclient/RestClient.java | 40 +-
.../kylin/metadata/model/DataModelManager.java | 3 +
.../kylin/metadata/project/ProjectManager.java | 6 +-
.../kylin/rest/controller/MigrationController.java | 2 +-
pom.xml | 5 +
.../kylin/rest/controller/AdminController.java | 22 +
.../apache/kylin/rest/service/AdminService.java | 14 +
.../apache/kylin/rest/service/HBaseInfoUtil.java | 17 +-
.../apache/kylin/rest/service/ModelService.java | 8 +-
.../kylin/storage/hbase/HBaseResourceStore.java | 7 +-
.../storage/hbase/util/DeployCoprocessorCLI.java | 21 +-
tool/pom.xml | 11 +
.../apache/kylin/tool/migration/ClusterUtil.java | 165 +++++
.../migration/CubeMigrationCrossClusterCLI.java | 757 +++++++++++++++++++++
.../kylin/tool/migration/DstClusterUtil.java | 371 ++++++++++
.../kylin/tool/migration/SrcClusterUtil.java | 148 ++++
17 files changed, 1595 insertions(+), 22 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 7b0888b..c9001f0 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -20,6 +20,7 @@ package org.apache.kylin.common;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.threadlocal.InternalThreadLocal;
import org.apache.kylin.common.util.ClassUtil;
@@ -571,6 +572,25 @@ public class KylinConfig extends KylinConfigBase {
reloadKylinConfig(buildSiteProperties());
}
+ public static String getConfigAsString(Configuration conf) {
+ final StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : conf) {
+ sb.append(entry.getKey() + "=" + entry.getValue()).append('\n');
+ }
+ return sb.toString();
+ }
+
+ public static Configuration getConfigFromString(String configInStr) throws IOException {
+ Properties props = new Properties();
+ props.load(new StringReader(configInStr));
+
+ Configuration config = new Configuration();
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ config.set((String) entry.getKey(), (String) entry.getValue());
+ }
+ return config;
+ }
+
public KylinConfig base() {
return this;
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index d908f58..21b08f8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -30,7 +30,6 @@ import java.util.regex.Pattern;
import javax.xml.bind.DatatypeConverter;
-import com.google.common.base.Strings;
import org.apache.http.HttpResponse;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
@@ -55,6 +54,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
/**
*/
@@ -175,6 +175,25 @@ public class RestClient {
}
}
+ public void announceWipeCache(String entity, String event, String cacheKey) throws IOException {
+ String url = baseUrl + "/cache/announce/" + entity + "/" + cacheKey + "/" + event;
+ HttpPut request = new HttpPut(url);
+
+ try {
+ HttpResponse response = client.execute(request);
+
+ if (response.getStatusLine().getStatusCode() != 200) {
+ String msg = EntityUtils.toString(response.getEntity());
+ throw new IOException("Invalid response " + response.getStatusLine().getStatusCode()
+ + " with announce cache wipe url " + url + "\n" + msg);
+ }
+ } catch (Exception ex) {
+ throw new IOException(ex);
+ } finally {
+ request.releaseConnection();
+ }
+ }
+
public void wipeCache(String entity, String event, String cacheKey) throws IOException {
HttpPut request;
String url;
@@ -202,8 +221,19 @@ public class RestClient {
}
public String getKylinProperties() throws IOException {
- String url = baseUrl + "/admin/config";
- HttpGet request = new HttpGet(url);
+ return getConfiguration(baseUrl + "/admin/config", false);
+ }
+
+ public String getHDFSConfiguration() throws IOException {
+ return getConfiguration(baseUrl + "/admin/config/hdfs", true);
+ }
+
+ public String getHBaseConfiguration() throws IOException {
+ return getConfiguration(baseUrl + "/admin/config/hbase", true);
+ }
+
+ private String getConfiguration(String url, boolean ifAuth) throws IOException {
+ HttpGet request = ifAuth ? newGet(url) : new HttpGet(url);
HttpResponse response = null;
try {
response = client.execute(request);
@@ -372,7 +402,7 @@ public class RestClient {
String msg = getContent(response);
Map<String, String> kvMap = JsonUtil.readValueAsMap(msg);
String exception = kvMap.containsKey("exception") ? kvMap.get("exception") : "unknown";
- throw new IOException(exception);
+ throw new IOException("Error code: " + response.getStatusLine().getStatusCode() + "\n" + exception);
}
} finally {
post.releaseConnection();
@@ -411,7 +441,7 @@ public class RestClient {
}
private HttpGet newGet(String url) {
- HttpGet get = new HttpGet();
+ HttpGet get = new HttpGet(url);
addHttpHeaders(get);
return get;
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
index 47e2c3d..f483a2c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelManager.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.persistence.WriteConflictException;
@@ -48,6 +49,8 @@ import org.apache.kylin.shaded.com.google.common.collect.Lists;
public class DataModelManager {
private static final Logger logger = LoggerFactory.getLogger(DataModelManager.class);
+ public static final Serializer<DataModelDesc> MODELDESC_SERIALIZER = new JsonSerializer<DataModelDesc>(
+ DataModelDesc.class);
public static DataModelManager getInstance(KylinConfig config) {
return config.getManager(DataModelManager.class);
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 15b6a2d..ebcd45c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -30,7 +30,9 @@ import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.util.AutoReadWriteLock;
import org.apache.kylin.common.util.AutoReadWriteLock.AutoLock;
import org.apache.kylin.metadata.TableMetadataManager;
@@ -54,7 +56,9 @@ import org.apache.kylin.shaded.com.google.common.collect.Sets;
public class ProjectManager {
private static final Logger logger = LoggerFactory.getLogger(ProjectManager.class);
-
+ public static final Serializer<ProjectInstance> PROJECT_SERIALIZER = new JsonSerializer<ProjectInstance>(
+ ProjectInstance.class);
+
public static ProjectManager getInstance(KylinConfig config) {
return config.getManager(ProjectManager.class);
}
diff --git a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
index efef5cf..45f83ea 100644
--- a/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
+++ b/cube-migration/src/main/java/org/apache/kylin/rest/controller/MigrationController.java
@@ -155,7 +155,7 @@ public class MigrationController extends BasicController {
}
DataModelDesc dataModelDesc = JsonUtil.readValue(request.getModelDescData(), DataModelDesc.class);
logger.info("Schema compatibility check for model {}", dataModelDesc.getName());
- modelService.checkModelCompatibility(request.getProjectName(), dataModelDesc, tableDescList);
+ modelService.checkModelCompatibility(dataModelDesc, tableDescList);
logger.info("Pass schema compatibility check for model {}", dataModelDesc.getName());
} catch (Exception e) {
logger.error(e.getMessage(), e);
diff --git a/pom.xml b/pom.xml
index c73bf28..f60b7c8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -542,6 +542,11 @@
<optional>true</optional>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <version>${hadoop2.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
index 4d90db8..780e069 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/AdminController.java
@@ -112,6 +112,28 @@ public class AdminController extends BasicController {
return configRes;
}
+ @RequestMapping(value = "/config/hdfs", method = { RequestMethod.GET }, produces = { "application/json" })
+ @ResponseBody
+ public GeneralResponse getHDFSConfig() throws IOException {
+ String config = adminService.getHadoopConfigAsString();
+
+ GeneralResponse configRes = new GeneralResponse();
+ configRes.put("config", config);
+
+ return configRes;
+ }
+
+ @RequestMapping(value = "/config/hbase", method = { RequestMethod.GET }, produces = { "application/json" })
+ @ResponseBody
+ public GeneralResponse getHBaseConfig() throws IOException {
+ String config = adminService.getHBaseConfigAsString();
+
+ GeneralResponse configRes = new GeneralResponse();
+ configRes.put("config", config);
+
+ return configRes;
+ }
+
@RequestMapping(value = "/metrics/cubes", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
public MetricsResponse cubeMetrics(MetricsRequest request) {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
index c133a28..4e9cd03 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/AdminService.java
@@ -31,10 +31,12 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OrderedProperties;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.job.StorageCleanupJob;
+import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.security.access.prepost.PreAuthorize;
@@ -117,4 +119,16 @@ public class AdminService extends BasicService {
return KylinConfig.getInstanceFromEnv().exportToString(propertyKeys);
}
+
+ public String getHadoopConfigAsString() throws IOException {
+ logger.debug("Get Kylin Hadoop Config");
+
+ return KylinConfig.getConfigAsString(HadoopUtil.getCurrentConfiguration());
+ }
+
+ public String getHBaseConfigAsString() throws IOException {
+ logger.debug("Get Kylin HBase Config");
+
+ return KylinConfig.getConfigAsString(HBaseConnection.getCurrentHBaseConfiguration());
+ }
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java b/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java
index 8fc1de9..012f14a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/HBaseInfoUtil.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.service;
import java.io.IOException;
import java.util.Map;
+import java.util.Objects;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.kylin.common.KylinConfig;
@@ -28,13 +29,17 @@ import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.apache.kylin.storage.hbase.util.HBaseUnionUtil;
public class HBaseInfoUtil {
-
+
@SuppressWarnings("unused") // used by reflection
public static HBaseResponse getHBaseInfo(String tableName, KylinConfig config) throws IOException {
if (!config.getStorageUrl().getScheme().equals("hbase"))
return null;
-
+
Connection conn = HBaseUnionUtil.getConnection(config, tableName);
+ return getHBaseInfo(tableName, conn);
+ }
+
+ public static HBaseResponse getHBaseInfo(String tableName, Connection conn) throws IOException {
HBaseResponse hr = null;
long tableSize = 0;
int regionCount = 0;
@@ -54,4 +59,12 @@ public class HBaseInfoUtil {
hr.setRegionCount(regionCount);
return hr;
}
+
+ public static boolean checkEquals(HBaseResponse hbaseR1, HBaseResponse hbaseR2) {
+ if (hbaseR1 == hbaseR2)
+ return true;
+ return Objects.equals(hbaseR1.getTableName(), hbaseR2.getTableName())
+ && hbaseR1.getTableSize() == hbaseR2.getTableSize()
+ && hbaseR1.getRegionCount() == hbaseR2.getRegionCount();
+ }
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
index 5a3a1ca..7ff3919 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ModelService.java
@@ -168,11 +168,7 @@ public class ModelService extends BasicService {
result.raiseExceptionWhenInvalid();
}
- public void checkModelCompatibility(String project, DataModelDesc dataModalDesc, List<TableDesc> tableDescList) {
- ProjectInstance prjInstance = getProjectManager().getProject(project);
- if (prjInstance == null) {
- throw new BadRequestException("Project " + project + " does not exist");
- }
+ public void checkModelCompatibility(DataModelDesc dataModalDesc, List<TableDesc> tableDescList) {
ModelSchemaUpdateChecker checker = new ModelSchemaUpdateChecker(getTableManager(), getCubeManager(),
getDataModelManager());
@@ -181,7 +177,7 @@ public class ModelService extends BasicService {
tableDescMap.put(tableDesc.getIdentity(), tableDesc);
}
dataModalDesc.init(getConfig(), tableDescMap);
- ModelSchemaUpdateChecker.CheckResult result = checker.allowEdit(dataModalDesc, project, false);
+ ModelSchemaUpdateChecker.CheckResult result = checker.allowEdit(dataModalDesc, null, false);
result.raiseExceptionWhenInvalid();
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
index dd63b52..e26cd74 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseResourceStore.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HTableDescriptor;
@@ -91,10 +92,14 @@ public class HBaseResourceStore extends PushdownResourceStore {
.parseInt(getConnection().getConfiguration().get("hbase.client.keyvalue.maxsize", "10485760"));
}
- Connection getConnection() throws IOException {
+ protected Connection getConnection() throws IOException {
return HBaseConnection.get(metadataUrl);
}
+ protected Configuration getCurrentHBaseConfiguration() {
+ return HBaseConnection.getCurrentHBaseConfiguration();
+ }
+
private StorageURL buildMetadataUrl(KylinConfig kylinConfig) throws IOException {
StorageURL url = kylinConfig.getMetadataUrl();
if (!url.getScheme().equals("hbase"))
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index 845a4e5..52d1c14 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -420,7 +420,7 @@ public class DeployCoprocessorCLI {
}
public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, getHDFSWorkingDirectory(config));
FileStatus newestJar = null;
for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
if (fileStatus.getPath().toString().endsWith(".jar")) {
@@ -440,8 +440,14 @@ public class DeployCoprocessorCLI {
return path;
}
- public static synchronized Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
- Set<String> oldJarPaths) throws IOException {
+ public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
+ Set<String> oldJarPaths) throws IOException {
+ String hdfsWorkingDirectory = getHDFSWorkingDirectory(KylinConfig.getInstanceFromEnv());
+ return uploadCoprocessorJar(localCoprocessorJar, fileSystem, hdfsWorkingDirectory, oldJarPaths);
+ }
+
+ public synchronized static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem,
+ String hdfsWorkingDirectory, Set<String> oldJarPaths) throws IOException {
Path uploadPath = null;
File localCoprocessorFile = new File(localCoprocessorJar);
@@ -449,7 +455,7 @@ public class DeployCoprocessorCLI {
if (oldJarPaths == null) {
oldJarPaths = new HashSet<String>();
}
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
+ Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, hdfsWorkingDirectory);
for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
if (isSame(localCoprocessorFile, fileStatus)) {
uploadPath = fileStatus.getPath();
@@ -511,9 +517,12 @@ public class DeployCoprocessorCLI {
return baseName;
}
- private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
+ private static String getHDFSWorkingDirectory(KylinConfig config) {
String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
- hdfsWorkingDirectory = HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory);
+ return HBaseConnection.makeQualifiedPathInHBaseCluster(hdfsWorkingDirectory);
+ }
+
+ private static Path getCoprocessorHDFSDir(FileSystem fileSystem, String hdfsWorkingDirectory) throws IOException {
Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
fileSystem.mkdirs(coprocessorDir);
return coprocessorDir;
diff --git a/tool/pom.xml b/tool/pom.xml
index 140ff93..8d4c2b4 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -76,6 +76,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-server</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
@@ -86,6 +91,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!--Spring-->
<dependency>
<groupId>org.springframework</groupId>
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java
new file mode 100644
index 0000000..14f8e90
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/ClusterUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.rest.security.ManagedUser;
+import org.apache.kylin.rest.service.KylinUserService;
+import org.apache.kylin.storage.hbase.HBaseResourceStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Sets;
+
+public abstract class ClusterUtil {
+ private static final Logger logger = LoggerFactory.getLogger(ClusterUtil.class);
+
+ protected final KylinConfig kylinConfig;
+ protected final RestClient restClient;
+ protected final String hdfsWorkingDirectory;
+
+ protected final Configuration hbaseConf;
+ protected final Connection hbaseConn;
+ protected final ResourceStore resourceStore;
+ protected final Admin hbaseAdmin;
+
+ final Configuration jobConf;
+ final FileSystem jobFS;
+ final String jobHdfsWorkingDirectoryQualified;
+ final FileSystem hbaseFS;
+ final String hbaseHdfsWorkingDirectoryQualified;
+
+ public ClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled) throws IOException {
+ this.kylinConfig = KylinConfig.createInstanceFromUri(configURI);
+ this.restClient = new RestClient(configURI);
+ Path hdfsWorkingPath = Path.getPathWithoutSchemeAndAuthority(new Path(kylinConfig.getHdfsWorkingDirectory()));
+ String tmpHdfsWorkingDirectory = hdfsWorkingPath.toString();
+ this.hdfsWorkingDirectory = tmpHdfsWorkingDirectory.endsWith("/") ? tmpHdfsWorkingDirectory
+ : tmpHdfsWorkingDirectory + "/";
+
+ this.jobConf = KylinConfig.getConfigFromString(restClient.getHDFSConfiguration());
+ this.jobFS = FileSystem.get(jobConf);
+ this.jobHdfsWorkingDirectoryQualified = getQualifiedPath(jobConf, hdfsWorkingDirectory, ifJobFSHAEnabled);
+
+ this.hbaseConf = KylinConfig.getConfigFromString(restClient.getHBaseConfiguration());
+ this.hbaseFS = FileSystem.get(hbaseConf);
+ this.hbaseHdfsWorkingDirectoryQualified = getQualifiedPath(hbaseConf, hdfsWorkingDirectory, ifHBaseFSHAEnabled);
+
+ this.hbaseConn = ConnectionFactory.createConnection(hbaseConf);
+ this.resourceStore = new HBaseResourceStore(kylinConfig) {
+ @Override
+ protected Connection getConnection() {
+ return hbaseConn;
+ }
+
+ @Override
+ protected Configuration getCurrentHBaseConfiguration() {
+ return hbaseConf;
+ }
+ };
+ this.hbaseAdmin = hbaseConn.getAdmin();
+ }
+
+ public abstract ProjectInstance getProject(String projName) throws IOException;
+
+ public abstract DictionaryInfo getDictionaryInfo(String dictPath) throws IOException;
+
+ public abstract SnapshotTable getSnapshotTable(String snapshotPath) throws IOException;
+
+ public abstract String getRootDirQualifiedOfHTable(String tableName);
+
+ public ManagedUser getUserDetails(String userKey) throws IOException {
+ return resourceStore.getResource(userKey, KylinUserService.SERIALIZER);
+ }
+
+ public final RawResource getResource(String resPath) throws IOException {
+ return resourceStore.getResource(resPath);
+ }
+
+ public String getJobWorkingDirQualified(String jobId) {
+ return JobBuilderSupport.getJobWorkingDir(jobHdfsWorkingDirectoryQualified, jobId);
+ }
+
+ private static String getQualifiedPath(Configuration conf, String path, boolean ifHAEnabled) throws IOException {
+ String hdfsSchema = getReplacedDefaultFS(conf, !ifHAEnabled);
+ return hdfsSchema + path;
+ }
+
+ private static String getReplacedDefaultFS(Configuration conf, boolean ifNeedReplace) throws IOException {
+ String defaultFS = conf.get(FileSystem.FS_DEFAULT_NAME_KEY);
+ if (!ifNeedReplace) {
+ return defaultFS;
+ }
+
+ String nameServices = conf.get("dfs.nameservices");
+ if (Strings.isNullOrEmpty(nameServices)) {
+ return defaultFS;
+ }
+
+ // check whether name service is defined for the default fs
+ Set<String> nameServiceSet = Sets.newHashSet(nameServices.split(","));
+ String defaultNameService = URI.create(defaultFS).getHost();
+ if (!nameServiceSet.contains(defaultNameService)) {
+ logger.info("name service {} is not defined among {}", defaultNameService, nameServices);
+ return defaultFS;
+ }
+
+ // select one usable node as the default fs
+ String haHostNames = conf.get("dfs.ha.namenodes." + defaultNameService);
+ if (!Strings.isNullOrEmpty(haHostNames)) {
+ conf = new Configuration(conf);
+ for (String oneNodeAlias : haHostNames.split(",")) {
+ String rpcNode = conf.get("dfs.namenode.rpc-address." + defaultNameService + "." + oneNodeAlias);
+ String replaced = "hdfs://" + rpcNode;
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, replaced);
+
+ Path rootPath = new Path(replaced + "/");
+ FileSystem fs = FileSystem.get(conf);
+ try {
+ fs.getStatus(rootPath);
+ } catch (Exception e) {
+ logger.warn("cannot use {} as default fs due to ", replaced, e);
+ continue;
+ }
+ logger.info("replaced the default fs {} by {}", defaultFS, replaced);
+ return replaced;
+ }
+ throw new IllegalArgumentException("fail to replace the default fs " + defaultFS);
+ }
+ throw new IllegalArgumentException("dfs.ha.namenodes." + defaultNameService + " is not set");
+ }
+}
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
new file mode 100644
index 0000000..95efab0
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/CubeMigrationCrossClusterCLI.java
@@ -0,0 +1,757 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
+import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.OptionsParser;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.DictionaryDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.GlobalDictionaryBuilder;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.rest.response.HBaseResponse;
+import org.apache.kylin.rest.security.ManagedUser;
+import org.apache.kylin.rest.service.HBaseInfoUtil;
+import org.apache.kylin.rest.service.KylinUserService;
+import org.apache.kylin.rest.service.update.TableSchemaUpdateMapping;
+import org.apache.kylin.rest.service.update.TableSchemaUpdater;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class CubeMigrationCrossClusterCLI extends AbstractApplication {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeMigrationCrossClusterCLI.class);
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_KYLIN_URI_SRC = OptionBuilder.withArgName("kylinUriSrc").hasArg().isRequired(true)
+ .withDescription("Specify the source kylin uri with format user:pwd@host:port").create("kylinUriSrc");
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_KYLIN_URI_DST = OptionBuilder.withArgName("kylinUriDst").hasArg().isRequired(true)
+ .withDescription("Specify the destination kylin uri with format user:pwd@host:port").create("kylinUriDst");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_UPDATE_MAPPING = OptionBuilder.withArgName("updateMappingPath").hasArg()
+ .isRequired(false).withDescription("Specify the path for the update mapping file")
+ .create("updateMappingPath");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(false)
+ .withDescription("Specify which cube to extract").create("cube");
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_HYBRID = OptionBuilder.withArgName("hybrid").hasArg().isRequired(false)
+ .withDescription("Specify which hybrid to extract").create("hybrid");
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_PROJECT = OptionBuilder.withArgName("project").hasArg().isRequired(false)
+ .withDescription("Specify realizations in which project to extract").create("project");
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_All = OptionBuilder.withArgName("all").hasArg(false).isRequired(false)
+ .withDescription("Specify realizations in all projects to extract").create("all");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_DST_HIVE_CHECK = OptionBuilder.withArgName("dstHiveCheck").hasArg()
+ .isRequired(false).withDescription("Specify whether to check destination hive tables")
+ .create("dstHiveCheck");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_OVERWRITE = OptionBuilder.withArgName("overwrite").hasArg().isRequired(false)
+ .withDescription("Specify whether to overwrite existing cubes").create("overwrite");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_SCHEMA_ONLY = OptionBuilder.withArgName("schemaOnly").hasArg().isRequired(false)
+ .withDescription("Specify whether only migrate cube related schema").create("schemaOnly");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_EXECUTE = OptionBuilder.withArgName("execute").hasArg().isRequired(false)
+ .withDescription("Specify whether it's to execute the migration").create("execute");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_COPROCESSOR_PATH = OptionBuilder.withArgName("coprocessorPath").hasArg()
+ .isRequired(false).withDescription("Specify the path of coprocessor to be deployed")
+ .create("coprocessorPath");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_FS_HA_ENABLED_CODE = OptionBuilder.withArgName("codeOfFSHAEnabled").hasArg()
+ .isRequired(false).withDescription("Specify whether to enable the namenode ha of clusters")
+ .create("codeOfFSHAEnabled");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_DISTCP_JOB_QUEUE = OptionBuilder.withArgName("distCpJobQueue").hasArg()
+ .isRequired(false).withDescription("Specify the mapreduce.job.queuename for DistCp job ")
+ .create("distCpJobQueue");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_DISTCP_JOB_MEMORY = OptionBuilder.withArgName("distCpJobMemory").hasArg()
+ .isRequired(false).withDescription("Specify the mapreduce.map.memory.mb for DistCp job ")
+ .create("distCpJobMemory");
+
+ @SuppressWarnings("static-access")
+ public static final Option OPTION_THREAD_NUM = OptionBuilder.withArgName("nThread").hasArg().isRequired(false)
+ .withDescription("Specify the number of threads for migrating cube data in parallel ").create("nThread");
+
+ protected final Options options;
+
+ private Configuration distCpConf;
+
+ protected SrcClusterUtil srcCluster;
+ protected DstClusterUtil dstCluster;
+
+ private int codeOfFSHAEnabled = 3;
+ protected int nThread;
+
+ private boolean ifDstHiveCheck = true;
+ private boolean ifSchemaOnly = true;
+ private boolean ifExecute = false;
+ private boolean ifOverwrite = false;
+
+ private String coprocessorJarPath;
+
+ private Set<CubeInstance> cubes = Sets.newHashSet();
+ private Set<HybridInstance> hybrids = Sets.newHashSet();
+ private Set<ProjectInstance> projects = Sets.newHashSet();
+
+ private Map<String, TableSchemaUpdateMapping> mappings = Maps.newHashMap();
+
+ private Map<String, ProjectInstance> dstProjects = Maps.newHashMap();
+
+ public CubeMigrationCrossClusterCLI() {
+ OptionGroup realizationOrProject = new OptionGroup();
+ realizationOrProject.addOption(OPTION_CUBE);
+ realizationOrProject.addOption(OPTION_HYBRID);
+ realizationOrProject.addOption(OPTION_PROJECT);
+ realizationOrProject.addOption(OPTION_All);
+ realizationOrProject.setRequired(true);
+
+ options = new Options();
+ options.addOption(OPTION_KYLIN_URI_SRC);
+ options.addOption(OPTION_KYLIN_URI_DST);
+ options.addOption(OPTION_FS_HA_ENABLED_CODE);
+ options.addOption(OPTION_UPDATE_MAPPING);
+ options.addOptionGroup(realizationOrProject);
+ options.addOption(OPTION_DST_HIVE_CHECK);
+ options.addOption(OPTION_SCHEMA_ONLY);
+ options.addOption(OPTION_OVERWRITE);
+ options.addOption(OPTION_EXECUTE);
+ options.addOption(OPTION_COPROCESSOR_PATH);
+ options.addOption(OPTION_DISTCP_JOB_QUEUE);
+ options.addOption(OPTION_THREAD_NUM);
+ options.addOption(OPTION_DISTCP_JOB_MEMORY);
+ }
+
+ protected Options getOptions() {
+ return options;
+ }
+
+ public static boolean ifFSHAEnabled(int code, int pos) {
+ int which = 1 << pos;
+ return (code & which) == which;
+ }
+
+ protected void init(OptionsHelper optionsHelper) throws Exception {
+ if (optionsHelper.hasOption(OPTION_UPDATE_MAPPING)) {
+ File mappingFile = new File(optionsHelper.getOptionValue(OPTION_UPDATE_MAPPING));
+ String content = new String(Files.readAllBytes(mappingFile.toPath()), Charset.defaultCharset());
+ Map<String, TableSchemaUpdateMapping> tmpMappings = JsonUtil.readValue(content,
+ new TypeReference<Map<String, TableSchemaUpdateMapping>>() {
+ });
+ mappings = Maps.newHashMapWithExpectedSize(tmpMappings.size());
+ for (Map.Entry<String, TableSchemaUpdateMapping> entry : tmpMappings.entrySet()) {
+ mappings.put(entry.getKey().toUpperCase(Locale.ROOT), entry.getValue());
+ }
+ }
+
+ ifDstHiveCheck = optionsHelper.hasOption(OPTION_DST_HIVE_CHECK)
+ ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_DST_HIVE_CHECK))
+ : true;
+ ifSchemaOnly = optionsHelper.hasOption(OPTION_SCHEMA_ONLY)
+ ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_SCHEMA_ONLY))
+ : true;
+ ifOverwrite = optionsHelper.hasOption(OPTION_OVERWRITE)
+ ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_OVERWRITE))
+ : false;
+ ifExecute = optionsHelper.hasOption(OPTION_EXECUTE)
+ ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_EXECUTE))
+ : false;
+
+ codeOfFSHAEnabled = optionsHelper.hasOption(OPTION_FS_HA_ENABLED_CODE)
+ ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_FS_HA_ENABLED_CODE))
+ : 3;
+
+ String srcConfigURI = optionsHelper.getOptionValue(OPTION_KYLIN_URI_SRC);
+ srcCluster = new SrcClusterUtil(srcConfigURI, ifFSHAEnabled(codeOfFSHAEnabled, 0),
+ ifFSHAEnabled(codeOfFSHAEnabled, 1));
+ String dstConfigURI = optionsHelper.getOptionValue(OPTION_KYLIN_URI_DST);
+ dstCluster = new DstClusterUtil(dstConfigURI, ifFSHAEnabled(codeOfFSHAEnabled, 2),
+ ifFSHAEnabled(codeOfFSHAEnabled, 3), ifExecute);
+
+ distCpConf = new Configuration(srcCluster.jobConf);
+ if (optionsHelper.hasOption(OPTION_DISTCP_JOB_QUEUE)) {
+ distCpConf.set("mapreduce.job.queuename", optionsHelper.getOptionValue(OPTION_DISTCP_JOB_QUEUE));
+ }
+ int distCpMemory = optionsHelper.hasOption(OPTION_DISTCP_JOB_MEMORY)
+ ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_DISTCP_JOB_MEMORY))
+ : 1500;
+ int distCpJVMMemory = distCpMemory * 4 / 5;
+ distCpConf.set("mapreduce.map.memory.mb", "" + distCpMemory);
+ distCpConf.set("mapreduce.map.java.opts",
+ "-server -Xmx" + distCpJVMMemory + "m -Djava.net.preferIPv4Stack=true");
+
+ nThread = optionsHelper.hasOption(OPTION_THREAD_NUM)
+ ? Integer.valueOf(optionsHelper.getOptionValue(OPTION_THREAD_NUM))
+ : 8;
+
+ coprocessorJarPath = optionsHelper.hasOption(OPTION_COPROCESSOR_PATH)
+ ? optionsHelper.getOptionValue(OPTION_COPROCESSOR_PATH)
+ : srcCluster.getDefaultCoprocessorJarPath();
+ }
+
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ init(optionsHelper);
+
+ if (optionsHelper.hasOption(OPTION_All)) {
+ projects.addAll(srcCluster.listAllProjects());
+ } else if (optionsHelper.hasOption(OPTION_PROJECT)) {
+ Set<String> projectNames = Sets.newHashSet(optionsHelper.getOptionValue(OPTION_PROJECT).split(","));
+ for (String projectName : projectNames) {
+ ProjectInstance project = srcCluster.getProject(projectName);
+ if (project == null) {
+ throw new IllegalArgumentException("No project found with name of " + projectName);
+ }
+ projects.add(project);
+ }
+ } else if (optionsHelper.hasOption(OPTION_CUBE)) {
+ String cubeNames = optionsHelper.getOptionValue(OPTION_CUBE);
+ for (String cubeName : cubeNames.split(",")) {
+ CubeInstance cube = srcCluster.getCube(cubeName);
+ if (cube == null) {
+ throw new IllegalArgumentException("No cube found with name of " + cubeName);
+ } else {
+ cubes.add(cube);
+ }
+ }
+ } else if (optionsHelper.hasOption(OPTION_HYBRID)) {
+ String hybridNames = optionsHelper.getOptionValue(OPTION_HYBRID);
+ for (String hybridName : hybridNames.split(",")) {
+ HybridInstance hybridInstance = srcCluster.getHybrid(hybridName);
+ if (hybridInstance != null) {
+ hybrids.add(hybridInstance);
+ } else {
+ throw new IllegalArgumentException("No hybrid found with name of" + hybridName);
+ }
+ }
+ }
+
+ if (!projects.isEmpty()) {
+ for (ProjectInstance project : projects) {
+ for (RealizationEntry entry : project.getRealizationEntries()) {
+ IRealization realization = srcCluster.getRealization(entry);
+ addRealization(realization);
+ }
+ }
+ }
+ if (!hybrids.isEmpty()) {
+ for (HybridInstance hybrid : hybrids) {
+ addHybrid(hybrid);
+ }
+ }
+
+ Map<CubeInstance, Exception> failedCubes = Maps.newHashMap();
+
+ for (CubeInstance cube : cubes) {
+ logger.info("start to migrate cube {}", cube);
+ try {
+ migrateCube(cube);
+ logger.info("finish migrating cube {}", cube);
+ } catch (Exception e) {
+ logger.error("fail to migrate cube {} due to ", cube, e);
+ failedCubes.put(cube, e);
+ }
+ }
+
+ for (HybridInstance hybrid : hybrids) {
+ dstCluster.saveHybrid(hybrid);
+
+ // update project
+ ProjectInstance srcProject = srcCluster.getProjectByRealization(RealizationType.HYBRID, hybrid.getName());
+ ProjectInstance dstProject = getDstProject(srcProject);
+
+ // update hybrids
+ Set<RealizationEntry> projReals = Sets.newHashSet(dstProject.getRealizationEntries());
+ projReals.add(RealizationEntry.create(RealizationType.HYBRID, hybrid.getName()));
+ dstProject.setRealizationEntries(Lists.newArrayList(projReals));
+
+ dstProjects.put(dstProject.getName(), dstProject);
+ }
+
+ for (String projName : dstProjects.keySet()) {
+ dstCluster.saveProject(dstProjects.get(projName));
+ }
+
+ dstCluster.updateMeta();
+
+ if (failedCubes.isEmpty()) {
+ logger.info("Migration for cubes {}, hyrbids {} all succeed", cubes, hybrids);
+ } else {
+ logger.warn("Failed to migrate cubes {} and need to check the detailed reason and retry again!!!",
+ failedCubes.keySet());
+ }
+ }
+
+ private void migrateCube(CubeInstance cube) throws IOException {
+ if (!ifOverwrite && dstCluster.exists(CubeInstance.concatResourcePath(cube.getName()))) {
+ throw new RuntimeException(("The cube named " + cube.getName()
+ + " already exists on target metadata store. Please delete it firstly and try again"));
+ }
+
+ ProjectInstance srcProject = srcCluster.getProjectByRealization(RealizationType.CUBE, cube.getName());
+
+ String descName = cube.getDescName();
+ CubeDesc cubeDesc = srcCluster.getCubeDesc(descName);
+
+ String modelName = cubeDesc.getModelName();
+ DataModelDesc modelDesc = srcCluster.getDataModelDesc(modelName);
+
+ Set<TableDesc> tableSet = Sets.newHashSet();
+ for (TableRef tableRef : modelDesc.getAllTables()) {
+ TableDesc tableDescOld = srcCluster.getTableDesc(tableRef.getTableIdentity(), srcProject.getName());
+ TableDesc tableDescUpdated = TableSchemaUpdater.dealWithMappingForTable(tableDescOld, mappings);
+ tableSet.add(tableDescUpdated);
+ }
+
+ modelDesc = TableSchemaUpdater.dealWithMappingForModel(modelDesc, mappings);
+
+ cubeDesc = TableSchemaUpdater.dealWithMappingForCubeDesc(cubeDesc, mappings);
+
+ { // compatibility check before migrating to the destination cluster
+ dstCluster.checkCompatibility(srcProject.getName(), tableSet, modelDesc, ifDstHiveCheck);
+ }
+
+ {
+ for (TableDesc table : tableSet) {
+ dstCluster.saveTableDesc(table);
+ }
+
+ dstCluster.saveModelDesc(modelDesc);
+
+ dstCluster.saveCubeDesc(cubeDesc);
+
+ if (ifSchemaOnly) {
+ cube = CubeInstance.getCopyOf(cube);
+ cube.getSegments().clear();
+ cube.resetSnapshots();
+ cube.setStatus(RealizationStatusEnum.DISABLED);
+ cube.clearCuboids();
+ } else {
+ // cube with global dictionary cannot be migrated with data
+ checkGlobalDict(cubeDesc);
+
+ // delete those NEW segments and only keep the READY segments
+ cube.setSegments(cube.getSegments(SegmentStatusEnum.READY));
+
+ cube = TableSchemaUpdater.dealWithMappingForCube(cube, mappings);
+
+ ExecutorService executor = Executors.newFixedThreadPool(nThread, new ThreadFactoryBuilder()
+ .setNameFormat("Cube-" + cube.getName() + "-data-migration-pool-%d").build());
+ try {
+ List<Future<?>> futureList = migrateCubeData(cube, cubeDesc, executor);
+ executor.shutdown();
+ for (Future<?> future : futureList) {
+ try {
+ future.get();
+ } catch (InterruptedException e) {
+ logger.warn(e.getMessage());
+ } catch (ExecutionException e) {
+ executor.shutdownNow();
+ logger.error(e.getMessage());
+ throw new RuntimeException(e);
+ }
+ }
+ } finally {
+ // in case that exceptions are thrown when call migrateCubeData()
+ if (!executor.isShutdown()) {
+ logger.warn("shut down executor for cube {}", cube);
+ executor.shutdownNow();
+ }
+ }
+ }
+
+ dstCluster.saveCubeInstance(cube);
+ }
+
+ {
+ ProjectInstance dstProject = getDstProject(srcProject);
+
+ // update tables in project
+ Set<String> projTables = Sets.newHashSet(dstProject.getTables());
+ projTables.addAll(tableSet.stream().map(TableDesc::getIdentity).collect(Collectors.toSet()));
+ dstProject.setTables(projTables);
+
+ // update models in project
+ Set<String> projModels = Sets.newHashSet(dstProject.getModels());
+ projModels.add(modelName);
+ dstProject.setModels(Lists.newArrayList(projModels));
+
+ // update cubes in project
+ Set<RealizationEntry> projReals = Sets.newHashSet(dstProject.getRealizationEntries());
+ projReals.add(RealizationEntry.create(RealizationType.CUBE, cube.getName()));
+ dstProject.setRealizationEntries(Lists.newArrayList(projReals));
+
+ dstProjects.put(dstProject.getName(), dstProject);
+ }
+ }
+
+ private void checkGlobalDict(CubeDesc cubeDesc) {
+ if (cubeDesc.getDictionaries() != null && !cubeDesc.getDictionaries().isEmpty()) {
+ for (DictionaryDesc dictDesc : cubeDesc.getDictionaries()) {
+ if (GlobalDictionaryBuilder.class.getName().equalsIgnoreCase(dictDesc.getBuilderClass())) {
+ throw new RuntimeException("it's not supported to migrate global dictionaries " + dictDesc
+ + " for cube " + cubeDesc.getName());
+ }
+ }
+ }
+ }
+
+ private List<Future<?>> migrateCubeData(CubeInstance cube, CubeDesc cubeDesc, ExecutorService executor)
+ throws IOException {
+ List<Future<?>> futureList = Lists.newLinkedList();
+
+ for (final CubeSegment segment : cube.getSegments(SegmentStatusEnum.READY)) {
+ logger.info("start to migrate segment: {} {}", cube, segment.getName());
+ copyMetaResource(segment.getStatisticsResourcePath());
+ for (String dict : segment.getDictionaryPaths()) {
+ copyDictionary(cube, dict);
+ }
+ for (String snapshot : segment.getSnapshotPaths()) {
+ copySnapshot(cube, snapshot);
+ }
+ Future<?> future;
+ future = executor.submit(new MyRunnable() {
+ @Override
+ public void doRun() throws Exception {
+ copyHDFSJobInfo(segment.getLastBuildJobID());
+ }
+ });
+ futureList.add(future);
+
+ future = executor.submit(new MyRunnable() {
+ @Override
+ public void doRun() throws Exception {
+ copyHTable(segment);
+ }
+ });
+ futureList.add(future);
+
+ logger.info("add segment {} to migration list", segment);
+ }
+ if (cubeDesc.getSnapshotTableDescList() != null) {
+ for (SnapshotTableDesc snapshotTable : cubeDesc.getSnapshotTableDescList()) {
+ if (snapshotTable.isGlobal()) {
+ String snapshotResPath = cube.getSnapshotResPath(snapshotTable.getTableName());
+ if (snapshotTable.isExtSnapshotTable()) {
+ final ExtTableSnapshotInfo extSnapshot = srcCluster.getExtTableSnapshotInfo(snapshotResPath);
+ dstCluster.saveExtSnapshotTableInfo(extSnapshot);
+ if (ExtTableSnapshotInfo.STORAGE_TYPE_HBASE.equals(extSnapshot.getStorageType())) {
+ Future<?> future = executor.submit(new MyRunnable() {
+ @Override
+ public void doRun() throws Exception {
+ copyHTable(extSnapshot);
+ }
+ });
+ futureList.add(future);
+ }
+ } else {
+ copySnapshot(cube, snapshotResPath);
+ }
+ logger.info("add cube-level snapshot table {} for cube {} to migration list", snapshotResPath,
+ cube);
+ }
+ }
+ }
+
+ return futureList;
+ }
+
+ private ProjectInstance getDstProject(ProjectInstance srcProject) throws IOException {
+ ProjectInstance dstProject = dstProjects.get(srcProject.getName());
+ if (dstProject == null) {
+ dstProject = dstCluster.getProject(srcProject.getName());
+ }
+ if (dstProject == null) {
+ dstProject = ProjectInstance.create(srcProject.getName(), srcProject.getOwner(),
+ srcProject.getDescription(), srcProject.getOverrideKylinProps(), null, null);
+ dstProject.setUuid(srcProject.getUuid());
+ }
+ return dstProject;
+ }
+
+ private void putUserInfo(String userName) throws IOException {
+ String userKey = KylinUserService.getId(userName);
+ ManagedUser user = srcCluster.getUserDetails(userKey);
+ if (user == null) {
+ logger.warn("Cannot find user {}", userName);
+ return;
+ }
+ dstCluster.saveUserInfo(userKey, user);
+ }
+
+ private void copyMetaResource(String item) throws IOException {
+ RawResource res = srcCluster.getResource(item);
+ dstCluster.putResource(item, res);
+ res.content().close();
+ }
+
+ private void copyDictionary(CubeInstance cube, String dictPath) throws IOException {
+ if (dstCluster.exists(dictPath)) {
+ logger.info("Item {} has already existed in destination cluster", dictPath);
+ return;
+ }
+ DictionaryInfo dictInfo = srcCluster.getDictionaryInfo(dictPath);
+ String dupDict = dstCluster.saveDictionary(dictInfo);
+ if (dupDict != null) {
+ for (CubeSegment segment : cube.getSegments()) {
+ for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) {
+ if (entry.getValue().equalsIgnoreCase(dictPath)) {
+ entry.setValue(dupDict);
+ }
+ }
+ }
+ logger.info("Item {} is dup, instead {} is reused", dictPath, dupDict);
+ }
+ }
+
+ private void copySnapshot(CubeInstance cube, String snapshotTablePath) throws IOException {
+ if (dstCluster.exists(snapshotTablePath)) {
+ logger.info("Item {} has already existed in destination cluster", snapshotTablePath);
+ return;
+ }
+ SnapshotTable snapshotTable = srcCluster.getSnapshotTable(snapshotTablePath);
+ dstCluster.saveSnapshotTable(snapshotTable);
+ }
+
+ private void copyHDFSJobInfo(String jobId) throws Exception {
+ String srcDirQualified = srcCluster.getJobWorkingDirQualified(jobId);
+ String dstDirQualified = dstCluster.getJobWorkingDirQualified(jobId);
+ if (ifExecute) {
+ dstCluster.copyInitOnJobCluster(new Path(dstDirQualified));
+ copyHDFSPath(srcDirQualified, srcCluster.jobConf, dstDirQualified, dstCluster.jobConf);
+ } else {
+ logger.info("copied hdfs directory from {} to {}", srcDirQualified, dstDirQualified);
+ }
+ }
+
+ private void copyHTable(CubeSegment segment) throws IOException {
+ String tableName = segment.getStorageLocationIdentifier();
+ if (ifExecute) {
+ if (checkHTableExist(segment)) {
+ logger.info("htable {} has already existed in dst, will skip the migration", tableName);
+ } else {
+ copyHTable(tableName, true);
+ if (!checkHTableEquals(tableName)) {
+ logger.error("htable {} is copied to dst with different size!!!", tableName);
+ }
+ }
+ }
+ logger.info("migrated htable {} for segment {}", tableName, segment);
+ }
+
+ private boolean checkHTableExist(CubeSegment segment) throws IOException {
+ String tableName = segment.getStorageLocationIdentifier();
+ TableName htableName = TableName.valueOf(tableName);
+ if (!dstCluster.checkExist(htableName, segment)) {
+ return false;
+ }
+
+ if (!checkHTableEquals(tableName)) {
+ logger.warn("although htable {} exists in destination, the details data are different", tableName);
+ dstCluster.deleteHTable(tableName);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean checkHTableEquals(String tableName) throws IOException {
+ HBaseResponse respSrc = HBaseInfoUtil.getHBaseInfo(tableName, srcCluster.hbaseConn);
+ HBaseResponse respDst = HBaseInfoUtil.getHBaseInfo(tableName, dstCluster.hbaseConn);
+ return HBaseInfoUtil.checkEquals(respSrc, respDst);
+ }
+
+ private void copyHTable(ExtTableSnapshotInfo extTableSnapshotInfo) throws IOException {
+ String tableName = extTableSnapshotInfo.getStorageLocationIdentifier();
+ if (ifExecute) {
+ TableName htableName = TableName.valueOf(tableName);
+ if (dstCluster.htableExists(htableName)) {
+ logger.warn("htable {} already exists in the dst cluster and will skip the htable migration");
+ } else {
+ copyHTable(tableName, false);
+ }
+ }
+ logger.info("migrated htable {} for ext table snapshot {}", tableName, extTableSnapshotInfo.getTableName());
+ }
+
+ private void copyHTable(String tableName, boolean ifDeployCoprocessor) {
+ if (ifExecute) {
+ TableName htableName = TableName.valueOf(tableName);
+ try {
+ //migrate data first
+ copyHFileByDistCp(tableName);
+
+ //create htable metadata, especially the split keys for predefining the regions
+ Table table = srcCluster.hbaseConn.getTable(TableName.valueOf(tableName));
+ byte[][] endKeys = srcCluster.hbaseConn.getRegionLocator(htableName).getEndKeys();
+ byte[][] splitKeys = Arrays.copyOfRange(endKeys, 0, endKeys.length - 1);
+
+ HTableDescriptor tableDesc = new HTableDescriptor(table.getTableDescriptor());
+ //change the table host
+ dstCluster.resetTableHost(tableDesc);
+ if (ifDeployCoprocessor) {
+ dstCluster.deployCoprocessor(tableDesc, coprocessorJarPath);
+ }
+ dstCluster.createTable(tableDesc, splitKeys);
+
+ //do bulk load to sync up htable data and metadata
+ dstCluster.bulkLoadTable(tableName);
+ } catch (Exception e) {
+ logger.error("fail to migrate htable {} due to {} ", tableName, e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ protected void copyHFileByDistCp(String tableName) throws Exception {
+ String srcDirQualified = srcCluster.getRootDirQualifiedOfHTable(tableName);
+ String dstDirQualified = dstCluster.getRootDirQualifiedOfHTable(tableName);
+ dstCluster.copyInitOnHBaseCluster(new Path(dstDirQualified));
+ copyHDFSPath(srcDirQualified, srcCluster.hbaseConf, dstDirQualified, dstCluster.hbaseConf);
+ }
+
+ protected void copyHDFSPath(String srcDir, Configuration srcConf, String dstDir, Configuration dstConf)
+ throws Exception {
+ logger.info("start to copy hdfs directory from {} to {}", srcDir, dstDir);
+ DistCpOptions distCpOptions = OptionsParser.parse(new String[] { srcDir, dstDir });
+ distCpOptions.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
+ distCpOptions.setBlocking(true);
+ setTargetPathExists(distCpOptions);
+ DistCp distCp = new DistCp(getConfOfDistCp(), distCpOptions);
+ distCp.execute();
+ logger.info("copied hdfs directory from {} to {}", srcDir, dstDir);
+ }
+
+ protected Configuration getConfOfDistCp() {
+ return distCpConf;
+ }
+
+ /**
+ * Set targetPathExists in both inputOptions and job config,
+ * for the benefit of CopyCommitter
+ */
+ public void setTargetPathExists(DistCpOptions inputOptions) throws IOException {
+ Path target = inputOptions.getTargetPath();
+ FileSystem targetFS = target.getFileSystem(dstCluster.jobConf);
+ boolean targetExists = targetFS.exists(target);
+ inputOptions.setTargetPathExists(targetExists);
+ dstCluster.jobConf.setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists);
+ }
+
+ private void addHybrid(HybridInstance hybrid) {
+ hybrids.add(hybrid);
+ for (IRealization realization : hybrid.getRealizations()) {
+ addRealization(realization);
+ }
+ }
+
+ private void addRealization(IRealization realization) {
+ if (realization instanceof HybridInstance) {
+ addHybrid((HybridInstance) realization);
+ } else if (realization instanceof CubeInstance) {
+ cubes.add((CubeInstance) realization);
+ } else {
+ logger.warn("Realization {} is neither hybrid nor cube", realization);
+ }
+ }
+
+ private static abstract class MyRunnable implements Runnable {
+ @Override
+ public void run() {
+ try {
+ doRun();
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public abstract void doRun() throws Exception;
+ }
+
+ public static void main(String[] args) {
+ CubeMigrationCrossClusterCLI cli = new CubeMigrationCrossClusterCLI();
+ cli.execute(args);
+ }
+}
\ No newline at end of file
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
new file mode 100644
index 0000000..e578935
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/DstClusterUtil.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+import static org.apache.kylin.metadata.realization.IRealizationConstants.HTableSegmentTag;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.persistence.RawResource;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.dict.lookup.SnapshotTableSerializer;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.IRealizationConstants;
+import org.apache.kylin.rest.security.ManagedUser;
+import org.apache.kylin.rest.service.KylinUserService;
+import org.apache.kylin.storage.hbase.util.DeployCoprocessorCLI;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class DstClusterUtil extends ClusterUtil {
+ private static final Logger logger = LoggerFactory.getLogger(DstClusterUtil.class);
+
+ public static final String hbaseSubDir = "migration/hbase/data/default/";
+
+ private final String hbaseDataDirQualified;
+ private final String hbaseDataDir;
+
+ private final boolean ifExecute;
+
+ public DstClusterUtil(String configURI, boolean ifExecute) throws IOException {
+ this(configURI, true, true, ifExecute);
+ }
+
+ public DstClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled, boolean ifExecute)
+ throws IOException {
+ super(configURI, ifJobFSHAEnabled, ifHBaseFSHAEnabled);
+ this.hbaseDataDirQualified = hbaseHdfsWorkingDirectoryQualified + hbaseSubDir;
+ this.hbaseDataDir = hdfsWorkingDirectory + hbaseSubDir;
+ this.ifExecute = ifExecute;
+ }
+
+ @Override
+ public ProjectInstance getProject(String projName) throws IOException {
+ return resourceStore.getResource(ProjectInstance.concatResourcePath(projName),
+ ProjectManager.PROJECT_SERIALIZER);
+ }
+
+ @Override
+ public DictionaryInfo getDictionaryInfo(String dictPath) throws IOException {
+ return resourceStore.getResource(dictPath, DictionaryInfoSerializer.FULL_SERIALIZER);
+ }
+
+ @Override
+ public SnapshotTable getSnapshotTable(String snapshotPath) throws IOException {
+ return resourceStore.getResource(snapshotPath, SnapshotTableSerializer.FULL_SERIALIZER);
+ }
+
+ @Override
+ public String getRootDirQualifiedOfHTable(String tableName) {
+ return hbaseDataDirQualified + tableName;
+ }
+
+ private String getRootDirOfHTable(String tableName) {
+ return hbaseDataDir + tableName;
+ }
+
+ public boolean exists(String resPath) throws IOException {
+ return resourceStore.exists(resPath);
+ }
+
+ public void checkCompatibility(String projectName, Set<TableDesc> tableSet, DataModelDesc modelDesc,
+ boolean ifHiveCheck) throws IOException {
+ List<String> tableDataList = Lists.newArrayList();
+ for (TableDesc table : tableSet) {
+ tableDataList.add(JsonUtil.writeValueAsIndentString(table));
+ }
+
+ String modelDescData = JsonUtil.writeValueAsIndentString(modelDesc);
+
+ CompatibilityCheckRequest request = new CompatibilityCheckRequest();
+ request.setProjectName(projectName);
+ request.setTableDescDataList(tableDataList);
+ request.setModelDescData(modelDescData);
+
+ String jsonRequest = JsonUtil.writeValueAsIndentString(request);
+ restClient.checkCompatibility(jsonRequest, ifHiveCheck);
+ }
+
+ public void saveProject(ProjectInstance projInstance) throws IOException {
+ if (ifExecute) {
+ putMetaResource(ProjectInstance.concatResourcePath(projInstance.getName()), projInstance,
+ ProjectManager.PROJECT_SERIALIZER);
+ }
+ logger.info("saved project {}", projInstance);
+ }
+
+ public void saveHybrid(HybridInstance hybridInstance) throws IOException {
+ if (ifExecute) {
+ putMetaResource(HybridInstance.concatResourcePath(hybridInstance.getName()), hybridInstance,
+ HybridManager.HYBRID_SERIALIZER);
+ }
+ logger.info("saved hybrid {}", hybridInstance);
+ }
+
+ public void saveTableDesc(TableDesc table) throws IOException {
+ if (ifExecute) {
+ putMetaResource(TableDesc.concatResourcePath(table.getIdentity(), table.getProject()), table,
+ TableMetadataManager.TABLE_SERIALIZER);
+ }
+ logger.info("saved table {}", table);
+ }
+
+ public void saveModelDesc(DataModelDesc modelDesc) throws IOException {
+ if (ifExecute) {
+ putMetaResource(DataModelDesc.concatResourcePath(modelDesc.getName()), modelDesc,
+ DataModelManager.MODELDESC_SERIALIZER);
+ }
+ logger.info("saved model {}", modelDesc);
+ }
+
+ public void saveCubeDesc(CubeDesc cubeDesc) throws IOException {
+ if (ifExecute) {
+ putMetaResource(CubeDesc.concatResourcePath(cubeDesc.getName()), cubeDesc,
+ CubeDescManager.CUBE_DESC_SERIALIZER);
+ }
+ logger.info("saved cube desc {}", cubeDesc);
+ }
+
+ public void saveCubeInstance(CubeInstance cube) throws IOException {
+ if (ifExecute) {
+ putMetaResource(CubeInstance.concatResourcePath(cube.getName()), cube, CubeManager.CUBE_SERIALIZER);
+ }
+ logger.info("saved cube instance {}", cube);
+ }
+
+ public String saveDictionary(DictionaryInfo dictInfo) throws IOException {
+ String dupDict = checkDupDict(dictInfo);
+ if (dupDict == null) {
+ putMetaResource(dictInfo.getResourcePath(), dictInfo, DictionaryInfoSerializer.FULL_SERIALIZER);
+ logger.info("saved dictionary {}", dictInfo.getResourcePath());
+ }
+ return dupDict;
+ }
+
+ private String checkDupDict(DictionaryInfo dictInfo) throws IOException {
+ NavigableSet<String> existings = resourceStore.listResources(dictInfo.getResourceDir());
+ if (existings == null)
+ return null;
+
+ logger.info("{} existing dictionaries of the same column", existings.size());
+ if (existings.size() > 100) {
+ logger.warn("Too many dictionaries under {}, dict count: {}", dictInfo.getResourceDir(), existings.size());
+ }
+
+ for (String existing : existings) {
+ DictionaryInfo existingInfo = getDictionaryInfo(existing);
+ if (existingInfo != null && dictInfo.getDictionaryObject().equals(existingInfo.getDictionaryObject())) {
+ return existing;
+ }
+ }
+
+ return null;
+ }
+
+ public void saveSnapshotTable(SnapshotTable snapshotTable) throws IOException {
+ putMetaResource(snapshotTable.getResourcePath(), snapshotTable, SnapshotTableSerializer.FULL_SERIALIZER);
+ logger.info("saved snapshot table {}", snapshotTable.getResourcePath());
+ }
+
+ public void saveExtSnapshotTableInfo(ExtTableSnapshotInfo extTableSnapshotInfo) throws IOException {
+ putMetaResource(extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo,
+ ExtTableSnapshotInfoManager.SNAPSHOT_SERIALIZER);
+ logger.info("saved ext snapshot table info {}", extTableSnapshotInfo.getResourcePath());
+ }
+
+ public void saveUserInfo(String userKey, ManagedUser user) throws IOException {
+ if (ifExecute) {
+ putMetaResource(userKey, user, KylinUserService.SERIALIZER);
+ }
+ logger.info("saved user info {}", userKey);
+ }
+
+ private <T extends RootPersistentEntity> void putMetaResource(String resPath, T obj, Serializer<T> serializer)
+ throws IOException {
+ putMetaResource(resPath, obj, serializer, true);
+ }
+
+ public <T extends RootPersistentEntity> void putMetaResource(String resPath, T obj, Serializer<T> serializer,
+ boolean withoutCheck) throws IOException {
+ if (ifExecute) {
+ if (withoutCheck) {
+ resourceStore.putResource(resPath, obj, System.currentTimeMillis(), serializer);
+ } else {
+ resourceStore.checkAndPutResource(resPath, obj, System.currentTimeMillis(), serializer);
+ }
+ }
+ logger.info("saved resource {}", resPath);
+ }
+
+ public void putResource(String resPath, RawResource res) throws IOException {
+ if (ifExecute) {
+ resourceStore.putResource(resPath, res.content(), res.lastModified());
+ }
+ logger.info("saved resource {}", resPath);
+ }
+
+ // if htable does not exist in dst, return false;
+ // if htable exists in dst, and the segment tags are the same, if the htable is enabled, then return true;
+ // else delete the htable and return false;
+ // else the htable is used by others, should throw runtime exception
+ public boolean checkExist(TableName htableName, CubeSegment segment) throws IOException {
+ if (!htableExists(htableName)) {
+ return false;
+ }
+ Table table = hbaseConn.getTable(htableName);
+ HTableDescriptor tableDesc = table.getTableDescriptor();
+ if (segment.toString().equals(tableDesc.getValue(HTableSegmentTag))) {
+ if (hbaseAdmin.isTableEnabled(htableName)) {
+ return true;
+ } else {
+ hbaseAdmin.deleteTable(htableName);
+ logger.info("htable {} is deleted", htableName);
+ return false;
+ }
+ }
+ throw new RuntimeException(
+ "htable name " + htableName + " has been used by " + tableDesc.getValue(HTableSegmentTag));
+ }
+
+ public void deleteHTable(String tableName) throws IOException {
+ TableName htableName = TableName.valueOf(tableName);
+ if (hbaseAdmin.isTableEnabled(htableName)) {
+ hbaseAdmin.disableTable(htableName);
+ }
+ hbaseAdmin.deleteTable(htableName);
+ logger.info("htable {} is deleted", htableName);
+ }
+
+ public boolean htableExists(TableName htableName) throws IOException {
+ return hbaseAdmin.tableExists(htableName);
+ }
+
+ public void resetTableHost(HTableDescriptor tableDesc) {
+ tableDesc.setValue(IRealizationConstants.HTableTag, kylinConfig.getMetadataUrlPrefix());
+ }
+
+ public void deployCoprocessor(HTableDescriptor tableDesc, String localCoprocessorJar) throws IOException {
+ List<String> existingCoprocessors = tableDesc.getCoprocessors();
+ for (String existingCoprocessor : existingCoprocessors) {
+ tableDesc.removeCoprocessor(existingCoprocessor);
+ }
+
+ Path hdfsCoprocessorJar = DeployCoprocessorCLI.uploadCoprocessorJar(localCoprocessorJar, hbaseFS,
+ hdfsWorkingDirectory, null);
+
+ if (User.isHBaseSecurityEnabled(hbaseConf)) {
+ // add coprocessor for bulk load
+ tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+ }
+ DeployCoprocessorCLI.addCoprocessorOnHTable(tableDesc, hdfsCoprocessorJar);
+
+ logger.info("deployed hbase table {} with coprocessor.", tableDesc.getTableName());
+ }
+
+ public void createTable(HTableDescriptor tableDesc, byte[][] splitKeys) throws IOException {
+ hbaseAdmin.createTable(tableDesc, splitKeys);
+
+ logger.info("htable {} successfully created!", tableDesc.getTableName());
+ }
+
+ public void copyInitOnJobCluster(Path path) throws IOException {
+ copyInit(jobFS, path);
+ }
+
+ public void copyInitOnHBaseCluster(Path path) throws IOException {
+ copyInit(hbaseFS, path);
+ }
+
+ public static void copyInit(FileSystem fs, Path path) throws IOException {
+ path = Path.getPathWithoutSchemeAndAuthority(path);
+ Path pathP = path.getParent();
+ if (!fs.exists(pathP)) {
+ fs.mkdirs(pathP);
+ }
+ if (fs.exists(path)) {
+ logger.warn("path {} already existed and will be deleted", path);
+ HadoopUtil.deletePath(fs.getConf(), path);
+ }
+ }
+
+ public void bulkLoadTable(String tableName) throws Exception {
+ Path rootPathOfTable = new Path(getRootDirOfHTable(tableName));
+ FileStatus[] regionFiles = hbaseFS.listStatus(rootPathOfTable, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return !path.getName().startsWith(".");
+ }
+ });
+
+ for (FileStatus regionFileStatus : regionFiles) {
+ ToolRunner.run(new LoadIncrementalHFiles(hbaseConf),
+ new String[] { regionFileStatus.getPath().toString(), tableName });
+ }
+
+ logger.info("succeed to migrate htable {}", tableName);
+ }
+
+ public void updateMeta() {
+ if (ifExecute) {
+ try {
+ logger.info("update meta cache for {}", restClient);
+ restClient.announceWipeCache(Broadcaster.SYNC_ALL, Broadcaster.Event.UPDATE.getType(),
+ Broadcaster.SYNC_ALL);
+ } catch (IOException e) {
+ logger.error(e.getMessage());
+ }
+ }
+ }
+}
diff --git a/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java b/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java
new file mode 100644
index 0000000..7f865d5
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/migration/SrcClusterUtil.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.tool.migration;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DataModelManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationRegistry;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+import org.apache.kylin.storage.hybrid.HybridManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SrcClusterUtil extends ClusterUtil {
+ private static final Logger logger = LoggerFactory.getLogger(SrcClusterUtil.class);
+
+ private static final String hbaseRootDirConfKey = "hbase.rootdir";
+ private final String hbaseDataDir;
+
+ private final TableMetadataManager metadataManager;
+ private final DataModelManager modelManager;
+ private final ProjectManager projectManager;
+ private final HybridManager hybridManager;
+ private final CubeManager cubeManager;
+ private final CubeDescManager cubeDescManager;
+ private final RealizationRegistry realizationRegistry;
+ private final DictionaryManager dictionaryManager;
+ private final SnapshotManager snapshotManager;
+ private final ExtTableSnapshotInfoManager extSnapshotInfoManager;
+
+ public SrcClusterUtil(String configURI, boolean ifJobFSHAEnabled, boolean ifHBaseFSHAEnabled) throws IOException {
+ super(configURI, ifJobFSHAEnabled, ifHBaseFSHAEnabled);
+
+ this.hbaseDataDir = hbaseConf.get(hbaseRootDirConfKey) + "/data/default/";
+ metadataManager = TableMetadataManager.getInstance(kylinConfig);
+ modelManager = DataModelManager.getInstance(kylinConfig);
+ projectManager = ProjectManager.getInstance(kylinConfig);
+ hybridManager = HybridManager.getInstance(kylinConfig);
+ cubeManager = CubeManager.getInstance(kylinConfig);
+ cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+ realizationRegistry = RealizationRegistry.getInstance(kylinConfig);
+ dictionaryManager = DictionaryManager.getInstance(kylinConfig);
+ snapshotManager = SnapshotManager.getInstance(kylinConfig);
+ extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(kylinConfig);
+ }
+
+ public String getDefaultCoprocessorJarPath() {
+ return kylinConfig.getCoprocessorLocalJar();
+ }
+
+ @Override
+ public ProjectInstance getProject(String projectName) throws IOException {
+ return projectManager.getProject(projectName);
+ }
+
+ public List<ProjectInstance> listAllProjects() throws IOException {
+ return projectManager.listAllProjects();
+ }
+
+ public ProjectInstance getProjectByRealization(RealizationType type, String realizationName) throws IOException {
+ List<ProjectInstance> ret = projectManager.findProjects(type, realizationName);
+ return ret.isEmpty() ? null : ret.get(0);
+ }
+
+ public CubeInstance getCube(String name) throws IOException {
+ return cubeManager.getCube(name);
+ }
+
+ public CubeDesc getCubeDesc(String name) throws IOException {
+ return cubeDescManager.getCubeDesc(name);
+ }
+
+ public HybridInstance getHybrid(String name) throws IOException {
+ return hybridManager.getHybridInstance(name);
+ }
+
+ public IRealization getRealization(RealizationEntry entry) throws IOException {
+ return realizationRegistry.getRealization(entry.getType(), entry.getRealization());
+ }
+
+ public DataModelDesc getDataModelDesc(String modelName) throws IOException {
+ return modelManager.getDataModelDesc(modelName);
+ }
+
+ public TableDesc getTableDesc(String tableIdentity, String projectName) throws IOException {
+ TableDesc ret = metadataManager.getStore().getResource(TableDesc.concatResourcePath(tableIdentity, projectName),
+ TableMetadataManager.TABLE_SERIALIZER);
+ if (projectName != null && ret == null) {
+ ret = metadataManager.getStore().getResource(TableDesc.concatResourcePath(tableIdentity, null),
+ TableMetadataManager.TABLE_SERIALIZER);
+ }
+ return ret;
+ }
+
+ @Override
+ public DictionaryInfo getDictionaryInfo(String dictPath) throws IOException {
+ return dictionaryManager.getDictionaryInfo(dictPath);
+ }
+
+ @Override
+ public SnapshotTable getSnapshotTable(String snapshotPath) throws IOException {
+ return snapshotManager.getSnapshotTable(snapshotPath);
+ }
+
+ public ExtTableSnapshotInfo getExtTableSnapshotInfo(String snapshotPath) throws IOException {
+ return extSnapshotInfoManager.getSnapshot(snapshotPath);
+ }
+
+ @Override
+ public String getRootDirQualifiedOfHTable(String tableName) {
+ return hbaseDataDir + tableName;
+ }
+}
\ No newline at end of file