You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/04 11:37:08 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4817 Refine
CubeMigrationCLI for kylin4
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new cd449ea KYLIN-4817 Refine CubeMigrationCLI for kylin4
cd449ea is described below
commit cd449eab1c6f49a27bc97ecfff8e1b29af92aead
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Mon Nov 30 15:36:43 2020 +0800
KYLIN-4817 Refine CubeMigrationCLI for kylin4
---
.../apache/kylin/common/restclient/RestClient.java | 4 +-
.../engine/spark/metadata/cube/PathManager.java | 6 +
.../org/apache/kylin/tool/CubeMigrationCLI.java | 439 ++++++++-------------
3 files changed, 168 insertions(+), 281 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index 955b0ff..2e99809 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -202,7 +202,7 @@ public class RestClient {
public String getKylinProperties() throws IOException {
String url = baseUrl + "/admin/config";
- HttpGet request = new HttpGet(url);
+ HttpGet request = newGet(url);
HttpResponse response = null;
try {
response = client.execute(request);
@@ -380,7 +380,7 @@ public class RestClient {
}
private HttpGet newGet(String url) {
- HttpGet get = new HttpGet();
+ HttpGet get = new HttpGet(url);
addHttpHeaders(get);
return get;
}
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
index 0484bfc..6444715 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/metadata/cube/PathManager.java
@@ -49,6 +49,12 @@ public final class PathManager {
return hdfsWorkDir + "parquet" + File.separator + cube.getName() + File.separator + segName + "_" + identifier;
}
+ public static String getSegmentParquetStoragePath(String hdfsWorkDir, String cubeName, CubeSegment segment) {
+ String segmentName = segment.getName();
+ String identifier = segment.getStorageLocationIdentifier();
+ return hdfsWorkDir + "parquet" + File.separator + cubeName + File.separator + segmentName + "_" + identifier;
+ }
+
/**
* Delete segment path
*/
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 4612cef..550da0c 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -18,22 +18,20 @@
package org.apache.kylin.tool;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.JsonSerializer;
@@ -42,30 +40,23 @@ import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.Serializer;
import org.apache.kylin.common.restclient.RestClient;
import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.lookup.SnapshotManager;
-import org.apache.kylin.dict.lookup.SnapshotTable;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.cube.model.DictionaryDesc;
+import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,14 +77,17 @@ public class CubeMigrationCLI extends AbstractApplication {
protected KylinConfig dstConfig;
protected ResourceStore srcStore;
protected ResourceStore dstStore;
- protected FileSystem hdfsFS;
- private HBaseAdmin hbaseAdmin;
+ protected FileSystem hdfsFs;
+ protected Configuration conf;
protected boolean doAclCopy = false;
protected boolean doOverwrite = false;
protected boolean doMigrateSegment = true;
protected String dstProject;
+ protected String srcHdfsWorkDir;
+ protected String dstHdfsWorkDir;
private static final String ACL_PREFIX = "/acl/";
+ private static final String GLOBAL_DICT_PREFIX = "/dict/global_dict/";
public static void main(String[] args) throws IOException, InterruptedException {
@@ -125,7 +119,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
- String purgeAndDisable, String overwriteIfExists, String realExecute)
+ String purgeAndDisable, String overwriteIfExists, String realExecute)
throws IOException, InterruptedException {
moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
@@ -133,7 +127,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl,
- String purgeAndDisable, String overwriteIfExists, String realExecute)
+ String purgeAndDisable, String overwriteIfExists, String realExecute)
throws IOException, InterruptedException {
moveCube(srcCfg, dstCfg, cubeName, projectName, Boolean.parseBoolean(copyAcl),
@@ -142,7 +136,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
- String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment)
+ String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment)
throws IOException, InterruptedException {
moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
@@ -152,7 +146,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, boolean copyAcl,
- boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment)
+ boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment)
throws IOException, InterruptedException {
doAclCopy = copyAcl;
doOverwrite = overwriteIfExists;
@@ -162,26 +156,24 @@ public class CubeMigrationCLI extends AbstractApplication {
dstConfig = dstCfg;
dstStore = ResourceStore.getStore(dstConfig);
dstProject = projectName;
+ conf = HadoopUtil.getCurrentConfiguration();
CubeManager cubeManager = CubeManager.getInstance(srcConfig);
CubeInstance cube = cubeManager.getCube(cubeName);
+ srcHdfsWorkDir = srcConfig.getHdfsWorkingDirectory(cube.getProject());
+ dstHdfsWorkDir = dstConfig.getHdfsWorkingDirectory(dstProject);
logger.info("cube to be moved is : " + cubeName);
if (migrateSegment) {
checkCubeState(cube);
}
- checkAndGetHbaseUrl();
+ checkAndGetMetadataUrl();
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
- hbaseAdmin = new HBaseAdmin(conf);
- hdfsFS = HadoopUtil.getWorkingFileSystem();
+ hdfsFs = HadoopUtil.getWorkingFileSystem();
operations = new ArrayList<Opt>();
copyFilesInMetaStore(cube);
- if (migrateSegment) {
- renameFoldersInHdfs(cube);
- changeHtableHost(cube);
- } else {
+ if (!migrateSegment) {
clearSegments(cubeName); // this should be after copyFilesInMetaStore
}
addCubeAndModelIntoProject(cube, cubeName);
@@ -192,20 +184,12 @@ public class CubeMigrationCLI extends AbstractApplication {
if (realExecute) {
doOpts();
- if (migrateSegment) {
- checkMigrationSuccess(dstConfig, cubeName, true);
- }
updateMeta(dstConfig, projectName, cubeName, cube.getModel());
} else {
showOpts();
}
}
- public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException {
- CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix);
- checkCLI.execute(cubeName);
- }
-
protected void checkCubeState(CubeInstance cube) {
if (cube.getStatus() != RealizationStatusEnum.READY)
throw new IllegalStateException("Cannot migrate cube that is not in READY state.");
@@ -217,40 +201,16 @@ public class CubeMigrationCLI extends AbstractApplication {
}
}
- protected void checkAndGetHbaseUrl() {
+ protected void checkAndGetMetadataUrl() {
StorageURL srcMetadataUrl = srcConfig.getMetadataUrl();
StorageURL dstMetadataUrl = dstConfig.getMetadataUrl();
logger.info("src metadata url is " + srcMetadataUrl);
logger.info("dst metadata url is " + dstMetadataUrl);
-
- if (!"hbase".equals(srcMetadataUrl.getScheme()) || !"hbase".equals(dstMetadataUrl.getScheme()))
- throw new IllegalStateException("Both metadata urls should be hbase metadata url");
- }
-
- protected void renameFoldersInHdfs(CubeInstance cube) throws IOException {
- for (CubeSegment segment : cube.getSegments()) {
-
- String jobUuid = segment.getLastBuildJobID();
- String src = JobBuilderSupport.getJobWorkingDir(srcConfig.getHdfsWorkingDirectory(), jobUuid);
- String tgt = JobBuilderSupport.getJobWorkingDir(dstConfig.getHdfsWorkingDirectory(), jobUuid);
-
- operations.add(new Opt(OptType.RENAME_FOLDER_IN_HDFS, new Object[] { src, tgt }));
- }
-
- }
-
- protected void changeHtableHost(CubeInstance cube) {
- if (cube.getDescriptor().getStorageType() != IStorageAware.ID_SHARDED_HBASE)
- return;
- for (CubeSegment segment : cube.getSegments()) {
- operations
- .add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() }));
- }
}
protected void clearSegments(String cubeName) throws IOException {
- operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[] { cubeName }));
+ operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[]{cubeName}));
}
protected void copyFilesInMetaStore(CubeInstance cube) throws IOException {
@@ -260,16 +220,22 @@ public class CubeMigrationCLI extends AbstractApplication {
+ " already exists on target metadata store. Use overwriteIfExists to overwrite it");
List<String> metaItems = new ArrayList<String>();
+ List<String> srcParquetFiles = new ArrayList<String>();
+ List<String> dstParquetFiles = new ArrayList<String>();
Set<String> dictAndSnapshot = new HashSet<String>();
- listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+ listCubeRelatedResources(cube, metaItems, dictAndSnapshot, srcParquetFiles, dstParquetFiles);
for (String item : metaItems) {
- operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
+ operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[]{item}));
}
if (doMigrateSegment) {
for (String item : dictAndSnapshot) {
- operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
+ operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[]{item, cube.getName()}));
+ }
+
+ for (int i = 0; i < srcParquetFiles.size(); i++) {
+ operations.add(new Opt(OptType.COPY_PARQUET_FILE, new Object[]{srcParquetFiles.get(i), dstParquetFiles.get(i)}));
}
}
}
@@ -279,11 +245,11 @@ public class CubeMigrationCLI extends AbstractApplication {
if (!dstStore.exists(projectResPath))
throw new IllegalStateException("The target project " + dstProject + " does not exist");
- operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[] { srcCube, cubeName, dstProject }));
+ operations.add(new Opt(OptType.ADD_INTO_PROJECT, new Object[]{srcCube, cubeName, dstProject}));
}
private void purgeAndDisable(String cubeName) throws IOException {
- operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName }));
+ operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[]{cubeName}));
}
private List<String> getCompatibleTablePath(Set<TableRef> tableRefs, String project, String rootPath)
@@ -311,7 +277,7 @@ public class CubeMigrationCLI extends AbstractApplication {
return toResource;
}
- protected void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot)
+ protected void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot, List<String> srcParquetFiles, List<String> dstParquetFiles)
throws IOException {
CubeDesc cubeDesc = cube.getDescriptor();
@@ -326,10 +292,25 @@ public class CubeMigrationCLI extends AbstractApplication {
metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_EXD_RESOURCE_ROOT));
if (doMigrateSegment) {
+ for (DictionaryDesc dictionaryDesc : cubeDesc.getDictionaries()) {
+ String[] columnInfo = dictionaryDesc.getColumnRef().getColumnWithTable().split("\\.");
+ String globalDictPath;
+ if (columnInfo.length == 3) {
+ globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX + columnInfo[1] + File.separator + columnInfo[2];
+ } else {
+ globalDictPath = cube.getProject() + GLOBAL_DICT_PREFIX + columnInfo[0] + File.separator + columnInfo[1];
+ }
+ if (globalDictPath != null) {
+ logger.info("Add " + globalDictPath + " to migrate dict list");
+ dictAndSnapshot.add(globalDictPath);
+ }
+ }
for (CubeSegment segment : cube.getSegments()) {
metaResource.add(segment.getStatisticsResourcePath());
dictAndSnapshot.addAll(segment.getSnapshotPaths());
- dictAndSnapshot.addAll(segment.getDictionaryPaths());
+ srcParquetFiles.add(PathManager.getSegmentParquetStoragePath(srcHdfsWorkDir, cube.getName(), segment));
+ dstParquetFiles.add(PathManager.getSegmentParquetStoragePath(dstHdfsWorkDir, cube.getName(), segment));
+ logger.info("Add " + PathManager.getSegmentParquetStoragePath(cube, segment.getName(), segment.getStorageLocationIdentifier()) + " to migrate parquet file list");
}
}
@@ -337,11 +318,6 @@ public class CubeMigrationCLI extends AbstractApplication {
metaResource.add(ACL_PREFIX + cube.getUuid());
metaResource.add(ACL_PREFIX + cube.getModel().getUuid());
}
-
-// if (cubeDesc.isStreamingCube()) {
-// // add streaming source config info for streaming cube
-// metaResource.add(StreamingSourceConfig.concatResourcePath(cubeDesc.getModel().getRootFactTableName()));
-// }
}
@Override
@@ -355,7 +331,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
protected enum OptType {
- COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE, CLEAR_SEGMENTS
+ COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, COPY_PARQUET_FILE, ADD_INTO_PROJECT, PURGE_AND_DISABLE, CLEAR_SEGMENTS
}
protected void addOpt(OptType type, Object[] params) {
@@ -420,161 +396,94 @@ public class CubeMigrationCLI extends AbstractApplication {
logger.info("Executing operation: " + opt.toString());
switch (opt.type) {
- case CHANGE_HTABLE_HOST: {
- String tableName = (String) opt.params[0];
- System.out.println("CHANGE_HTABLE_HOST, table name: " + tableName);
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
- desc.setValue(IRealizationConstants.HTableTag, dstConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
- logger.info("CHANGE_HTABLE_HOST is completed");
- break;
- }
- case COPY_FILE_IN_META: {
- String item = (String) opt.params[0];
- RawResource res = srcStore.getResource(item);
- if (res == null) {
- logger.info("Item: {} doesn't exist, ignore it.", item);
+ case COPY_FILE_IN_META: {
+ String item = (String) opt.params[0];
+ RawResource res = srcStore.getResource(item);
+ if (res == null) {
+ logger.info("Item: {} doesn't exist, ignore it.", item);
+ break;
+ }
+ dstStore.putResource(renameTableWithinProject(item), res.content(), res.lastModified());
+ res.content().close();
+ logger.info("Item " + item + " is copied");
break;
}
- dstStore.putResource(renameTableWithinProject(item), res.content(), res.lastModified());
- res.content().close();
- logger.info("Item " + item + " is copied");
- break;
- }
- case COPY_DICT_OR_SNAPSHOT: {
- String item = (String) opt.params[0];
-
- if (item.toLowerCase(Locale.ROOT).endsWith(".dict")) {
- DictionaryManager dstDictMgr = DictionaryManager.getInstance(dstConfig);
- DictionaryManager srcDicMgr = DictionaryManager.getInstance(srcConfig);
- DictionaryInfo dictSrc = srcDicMgr.getDictionaryInfo(item);
-
- long ts = dictSrc.getLastModified();
- dictSrc.setLastModified(0);//to avoid resource store write conflict
- Dictionary dictObj = dictSrc.getDictionaryObject().copyToAnotherMeta(srcConfig, dstConfig);
- DictionaryInfo dictSaved = dstDictMgr.trySaveNewDict(dictObj, dictSrc);
- dictSrc.setLastModified(ts);
-
- if (dictSaved == dictSrc) {
- //no dup found, already saved to dest
- logger.info("Item " + item + " is copied");
+ case COPY_DICT_OR_SNAPSHOT: {
+ String item = (String) opt.params[0];
+ String itemPath = item.substring(item.substring(0, item.indexOf("/")).length()+1);
+ Path srcPath = new Path(srcHdfsWorkDir + itemPath);
+ Path dstPath = new Path(dstHdfsWorkDir + itemPath);
+ if (hdfsFs.exists(srcPath)) {
+ FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false, true, conf);
+ logger.info("Copy " + srcPath + " to " + dstPath);
} else {
- //dictSrc is rejected because of duplication
- //modify cube's dictionary path
- String cubeName = (String) opt.params[1];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = dstStore.getResource(cubeResPath, cubeSerializer);
- for (CubeSegment segment : cube.getSegments()) {
- for (Map.Entry<String, String> entry : segment.getDictionaries().entrySet()) {
- if (entry.getValue().equalsIgnoreCase(item)) {
- entry.setValue(dictSaved.getResourcePath());
- }
- }
- }
- dstStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
- logger.info("Item " + item + " is dup, instead " + dictSaved.getResourcePath() + " is reused");
+ logger.info("Dict or snapshot " + srcPath + " is not exists, ignore it");
}
-
- } else if (item.toLowerCase(Locale.ROOT).endsWith(".snapshot")) {
- SnapshotManager dstSnapMgr = SnapshotManager.getInstance(dstConfig);
- SnapshotManager srcSnapMgr = SnapshotManager.getInstance(srcConfig);
- SnapshotTable snapSrc = srcSnapMgr.getSnapshotTable(item);
-
- long ts = snapSrc.getLastModified();
- snapSrc.setLastModified(0);
- SnapshotTable snapSaved = dstSnapMgr.trySaveNewSnapshot(snapSrc);
- snapSrc.setLastModified(ts);
-
- if (snapSaved == snapSrc) {
- //no dup found, already saved to dest
- logger.info("Item " + item + " is copied");
-
+ break;
+ }
+ case COPY_PARQUET_FILE: {
+ Path srcPath = new Path((String) opt.params[0]);
+ Path dstPath = new Path((String) opt.params[1]);
+ if (hdfsFs.exists(srcPath)) {
+ FileUtil.copy(hdfsFs, srcPath, hdfsFs, dstPath, false, true, conf);
+ logger.info("Copy " + srcPath + " to " + dstPath);
} else {
- String cubeName = (String) opt.params[1];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = dstStore.getResource(cubeResPath, cubeSerializer);
- for (CubeSegment segment : cube.getSegments()) {
- for (Map.Entry<String, String> entry : segment.getSnapshots().entrySet()) {
- if (entry.getValue().equalsIgnoreCase(item)) {
- entry.setValue(snapSaved.getResourcePath());
- }
- }
- }
- dstStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
- logger.info("Item " + item + " is dup, instead " + snapSaved.getResourcePath() + " is reused");
-
+ logger.info("Parquet file " + srcPath + " is not exists, ignore it");
}
-
- } else {
- logger.error("unknown item found: " + item);
- logger.info("ignore it");
+ break;
}
+ case ADD_INTO_PROJECT: {
+ CubeInstance srcCube = (CubeInstance) opt.params[0];
+ String cubeName = (String) opt.params[1];
+ String projectName = (String) opt.params[2];
+ String modelName = srcCube.getDescriptor().getModelName();
+
+ String projectResPath = ProjectInstance.concatResourcePath(projectName);
+ Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
+ ProjectInstance project = dstStore.getResource(projectResPath, projectSerializer);
+
+ for (TableRef tableRef : srcCube.getModel().getAllTables()) {
+ project.addTable(tableRef.getTableIdentity());
+ }
- break;
- }
- case RENAME_FOLDER_IN_HDFS: {
- String srcPath = (String) opt.params[0];
- String dstPath = (String) opt.params[1];
- renameHDFSPath(srcPath, dstPath);
- logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
- break;
- }
- case ADD_INTO_PROJECT: {
- CubeInstance srcCube = (CubeInstance) opt.params[0];
- String cubeName = (String) opt.params[1];
- String projectName = (String) opt.params[2];
- String modelName = srcCube.getDescriptor().getModelName();
-
- String projectResPath = ProjectInstance.concatResourcePath(projectName);
- Serializer<ProjectInstance> projectSerializer = new JsonSerializer<ProjectInstance>(ProjectInstance.class);
- ProjectInstance project = dstStore.getResource(projectResPath, projectSerializer);
-
- for (TableRef tableRef : srcCube.getModel().getAllTables()) {
- project.addTable(tableRef.getTableIdentity());
- }
+ if (!project.getModels().contains(modelName))
+ project.addModel(modelName);
+ project.removeRealization(RealizationType.CUBE, cubeName);
+ project.addRealizationEntry(RealizationType.CUBE, cubeName);
- if (!project.getModels().contains(modelName))
- project.addModel(modelName);
- project.removeRealization(RealizationType.CUBE, cubeName);
- project.addRealizationEntry(RealizationType.CUBE, cubeName);
+ dstStore.checkAndPutResource(projectResPath, project, projectSerializer);
+ logger.info("Project instance for " + projectName + " is corrected");
+ break;
+ }
+ case CLEAR_SEGMENTS: {
+ String cubeName = (String) opt.params[0];
+ String cubeInstancePath = CubeInstance.concatResourcePath(cubeName);
+ Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+ CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, cubeInstanceSerializer);
+ cubeInstance.getSegments().clear();
+ cubeInstance.clearCuboids();
+ cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
+ cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
+ dstStore.checkAndPutResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer);
+ logger.info("Cleared segments for " + cubeName + ", since segments has not been copied");
+ break;
+ }
+ case PURGE_AND_DISABLE: {
+ String cubeName = (String) opt.params[0];
+ String cubeResPath = CubeInstance.concatResourcePath(cubeName);
+ Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+ CubeInstance cube = srcStore.getResource(cubeResPath, cubeSerializer);
+ cube.getSegments().clear();
+ cube.setStatus(RealizationStatusEnum.DISABLED);
+ srcStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
+ logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl());
- dstStore.checkAndPutResource(projectResPath, project, projectSerializer);
- logger.info("Project instance for " + projectName + " is corrected");
- break;
- }
- case CLEAR_SEGMENTS: {
- String cubeName = (String) opt.params[0];
- String cubeInstancePath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, cubeInstanceSerializer);
- cubeInstance.getSegments().clear();
- cubeInstance.clearCuboids();
- cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
- cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
- dstStore.checkAndPutResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer);
- logger.info("Cleared segments for " + cubeName + ", since segments has not been copied");
- break;
- }
- case PURGE_AND_DISABLE: {
- String cubeName = (String) opt.params[0];
- String cubeResPath = CubeInstance.concatResourcePath(cubeName);
- Serializer<CubeInstance> cubeSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
- CubeInstance cube = srcStore.getResource(cubeResPath, cubeSerializer);
- cube.getSegments().clear();
- cube.setStatus(RealizationStatusEnum.DISABLED);
- srcStore.checkAndPutResource(cubeResPath, cube, cubeSerializer);
- logger.info("Cube " + cubeName + " is purged and disabled in " + srcConfig.getMetadataUrl());
-
- break;
- }
- default: {
- //do nothing
- break;
- }
+ break;
+ }
+ default: {
+ //do nothing
+ break;
+ }
}
}
@@ -582,53 +491,38 @@ public class CubeMigrationCLI extends AbstractApplication {
logger.info("Undo operation: " + opt.toString());
switch (opt.type) {
- case CHANGE_HTABLE_HOST: {
- String tableName = (String) opt.params[0];
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- hbaseAdmin.disableTable(tableName);
- desc.setValue(IRealizationConstants.HTableTag, srcConfig.getMetadataUrlPrefix());
- hbaseAdmin.modifyTable(tableName, desc);
- hbaseAdmin.enableTable(tableName);
- break;
- }
- case COPY_FILE_IN_META: {
- // no harm
- logger.info("Undo for COPY_FILE_IN_META is ignored");
- String item = (String) opt.params[0];
-
- if (item.startsWith(ACL_PREFIX) && doAclCopy) {
- logger.info("Remove acl record");
- dstStore.deleteResource(item);
+ case COPY_FILE_IN_META: {
+ // no harm
+ logger.info("Undo for COPY_FILE_IN_META is ignored");
+ String item = (String) opt.params[0];
+
+ if (item.startsWith(ACL_PREFIX) && doAclCopy) {
+ logger.info("Remove acl record");
+ dstStore.deleteResource(item);
+ }
+ break;
}
- break;
- }
- case COPY_DICT_OR_SNAPSHOT: {
- // no harm
- logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
- break;
- }
- case RENAME_FOLDER_IN_HDFS: {
- String srcPath = (String) opt.params[1];
- String dstPath = (String) opt.params[0];
-
- if (hdfsFS.exists(new Path(srcPath)) && !hdfsFS.exists(new Path(dstPath))) {
- renameHDFSPath(srcPath, dstPath);
- logger.info("HDFS Folder renamed from " + srcPath + " to " + dstPath);
+ case COPY_DICT_OR_SNAPSHOT: {
+ // no harm
+ logger.info("Undo for COPY_DICT_OR_SNAPSHOT is ignored");
+ break;
+ }
+ case ADD_INTO_PROJECT: {
+ logger.info("Undo for ADD_INTO_PROJECT is ignored");
+ break;
+ }
+ case PURGE_AND_DISABLE: {
+ logger.info("Undo for PURGE_AND_DISABLE is not supported");
+ break;
+ }
+ case COPY_PARQUET_FILE: {
+ logger.info("Undo for COPY_PARQUET_FILE is ignored");
+ break;
+ }
+ default: {
+ //do nothing
+ break;
}
- break;
- }
- case ADD_INTO_PROJECT: {
- logger.info("Undo for ADD_INTO_PROJECT is ignored");
- break;
- }
- case PURGE_AND_DISABLE: {
- logger.info("Undo for PURGE_AND_DISABLE is not supported");
- break;
- }
- default: {
- //do nothing
- break;
- }
}
}
@@ -660,17 +554,4 @@ public class CubeMigrationCLI extends AbstractApplication {
}
}
}
-
- private void renameHDFSPath(String srcPath, String dstPath) throws IOException, InterruptedException {
- int nRetry = 0;
- int sleepTime = 5000;
- while (!hdfsFS.rename(new Path(srcPath), new Path(dstPath))) {
- ++nRetry;
- if (nRetry > 3) {
- throw new InterruptedException("Cannot rename folder " + srcPath + " to folder " + dstPath);
- } else {
- Thread.sleep((long) sleepTime * nRetry * nRetry);
- }
- }
- }
-}
+}
\ No newline at end of file