You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/02/04 00:51:45 UTC
[42/50] [abbrv] kylin git commit: KYLIN-2908 Add one option for
migration tool to indicate whether to migrate segment data
KYLIN-2908 Add one option for migration tool to indicate whether to migrate segment data
Signed-off-by: Zhong <nj...@apache.org>
Signed-off-by: lidongsjtu <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c704f7cd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c704f7cd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c704f7cd
Branch: refs/heads/sync
Commit: c704f7cda79921c5a22645492a52c94734847541
Parents: fee5730
Author: Wang Vic <jw...@ebay.com>
Authored: Wed Sep 27 11:18:22 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Tue Jan 30 19:10:51 2018 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeInstance.java | 6 +
.../org/apache/kylin/tool/CubeMigrationCLI.java | 114 +++++++++++++------
2 files changed, 87 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c704f7cd/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 27918bc..ea5006e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -383,6 +383,12 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
this.createTimeUTC = createTimeUTC;
}
+ public void clearCuboids() {
+ cuboidBytes = null;
+ cuboidBytesRecommend = null;
+ cuboidLastOptimized = 0L;
+ }
+
public Set<Long> getCuboidsByMode(String cuboidModeName) {
return getCuboidsByMode(cuboidModeName == null ? null : CuboidModeEnum.getByModeName(cuboidModeName));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/c704f7cd/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index f95139e..a4a6ab9 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -90,6 +90,7 @@ public class CubeMigrationCLI extends AbstractApplication {
private HBaseAdmin hbaseAdmin;
protected boolean doAclCopy = false;
protected boolean doOverwrite = false;
+ protected boolean doMigrateSegment = true;
protected String dstProject;
private static final String ACL_PREFIX = "/acl/";
@@ -97,16 +98,20 @@ public class CubeMigrationCLI extends AbstractApplication {
public static void main(String[] args) throws IOException, InterruptedException {
CubeMigrationCLI cli = new CubeMigrationCLI();
- if (args.length != 8) {
+ if (args.length != 8 && args.length != 9) {
cli.usage();
System.exit(1);
}
- cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]);
+ if (args.length == 8) {
+ cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7]);
+ } else if (args.length == 9) {
+ cli.moveCube(args[0], args[1], args[2], args[3], args[4], args[5], args[6], args[7], args[8]);
+ }
}
protected void usage() {
System.out.println(
- "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute");
+ "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute migrateSegmentOrNot");
System.out.println("srcKylinConfigUri: The KylinConfig of the cube’s source \n"
+ "dstKylinConfigUri: The KylinConfig of the cube’s new home \n"
+ "cubeName: the name of cube to be migrated. \n"
@@ -114,16 +119,36 @@ public class CubeMigrationCLI extends AbstractApplication {
+ "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n"
+ "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n"
+ "overwriteIfExists: overwrite cube if it already exists in the target environment. \n"
- + "realExecute: if false, just print the operations to take, if true, do the real migration. \n");
+ + "realExecute: if false, just print the operations to take, if true, do the real migration. \n"
+ + "migrateSegmentOrNot:(optional) true or false: whether copy segment data to target environment. \n");
}
- public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl,
+ public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
String purgeAndDisable, String overwriteIfExists, String realExecute)
throws IOException, InterruptedException {
- doAclCopy = Boolean.parseBoolean(copyAcl);
- doOverwrite = Boolean.parseBoolean(overwriteIfExists);
+ moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
+ projectName, Boolean.parseBoolean(copyAcl), Boolean.parseBoolean(purgeAndDisable),
+ Boolean.parseBoolean(overwriteIfExists), Boolean.parseBoolean(realExecute), true);
+ }
+
+ public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
+ String purgeAndDisable, String overwriteIfExists, String realExecute, String migrateSegment)
+ throws IOException, InterruptedException {
+
+ moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
+ projectName, Boolean.parseBoolean(copyAcl), Boolean.parseBoolean(purgeAndDisable),
+ Boolean.parseBoolean(overwriteIfExists), Boolean.parseBoolean(realExecute),
+ Boolean.parseBoolean(migrateSegment));
+ }
+
+ public void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, boolean copyAcl,
+ boolean purgeAndDisable, boolean overwriteIfExists, boolean realExecute, boolean migrateSegment)
+ throws IOException, InterruptedException {
+ doAclCopy = copyAcl;
+ doOverwrite = overwriteIfExists;
+ doMigrateSegment = migrateSegment;
srcConfig = srcCfg;
srcStore = ResourceStore.getStore(srcConfig);
dstConfig = dstCfg;
@@ -134,7 +159,10 @@ public class CubeMigrationCLI extends AbstractApplication {
CubeInstance cube = cubeManager.getCube(cubeName);
logger.info("cube to be moved is : " + cubeName);
- checkCubeState(cube);
+ if (migrateSegment) {
+ checkCubeState(cube);
+ }
+
checkAndGetHbaseUrl();
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
@@ -142,31 +170,29 @@ public class CubeMigrationCLI extends AbstractApplication {
hdfsFS = HadoopUtil.getWorkingFileSystem();
operations = new ArrayList<Opt>();
copyFilesInMetaStore(cube);
- renameFoldersInHdfs(cube);
- changeHtableHost(cube);
+ if (migrateSegment) {
+ renameFoldersInHdfs(cube);
+ changeHtableHost(cube);
+ } else {
+ clearSegments(cubeName); // this should be after copyFilesInMetaStore
+ }
addCubeAndModelIntoProject(cube, cubeName);
- if (Boolean.parseBoolean(purgeAndDisable) == true) {
+ if (migrateSegment && purgeAndDisable) {
purgeAndDisable(cubeName); // this should be the last action
}
- if (Boolean.parseBoolean(realExecute) == true) {
+ if (realExecute) {
doOpts();
- checkMigrationSuccess(dstConfig, cubeName, true);
+ if (migrateSegment) {
+ checkMigrationSuccess(dstConfig, cubeName, true);
+ }
updateMeta(dstConfig);
} else {
showOpts();
}
}
- public void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
- String purgeAndDisable, String overwriteIfExists, String realExecute)
- throws IOException, InterruptedException {
-
- moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
- projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute);
- }
-
public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException {
CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix);
checkCLI.execute(cubeName);
@@ -215,22 +241,28 @@ public class CubeMigrationCLI extends AbstractApplication {
}
}
- protected void copyFilesInMetaStore(CubeInstance cube) throws IOException {
+ protected void clearSegments(String cubeName) throws IOException {
+ operations.add(new Opt(OptType.CLEAR_SEGMENTS, new Object[] { cubeName }));
+ }
- List<String> metaItems = new ArrayList<String>();
- Set<String> dictAndSnapshot = new HashSet<String>();
- listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+ protected void copyFilesInMetaStore(CubeInstance cube) throws IOException {
if (dstStore.exists(cube.getResourcePath()) && !doOverwrite)
throw new IllegalStateException("The cube named " + cube.getName()
+ " already exists on target metadata store. Use overwriteIfExists to overwrite it");
+ List<String> metaItems = new ArrayList<String>();
+ Set<String> dictAndSnapshot = new HashSet<String>();
+ listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
+
for (String item : metaItems) {
operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
}
- for (String item : dictAndSnapshot) {
- operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
+ if (doMigrateSegment) {
+ for (String item : dictAndSnapshot) {
+ operations.add(new Opt(OptType.COPY_DICT_OR_SNAPSHOT, new Object[] { item, cube.getName() }));
+ }
}
}
@@ -285,10 +317,12 @@ public class CubeMigrationCLI extends AbstractApplication {
metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_RESOURCE_ROOT));
metaResource.addAll(getCompatibleTablePath(tblRefs, prj, ResourceStore.TABLE_EXD_RESOURCE_ROOT));
- for (CubeSegment segment : cube.getSegments()) {
- metaResource.add(segment.getStatisticsResourcePath());
- dictAndSnapshot.addAll(segment.getSnapshotPaths());
- dictAndSnapshot.addAll(segment.getDictionaryPaths());
+ if (doMigrateSegment) {
+ for (CubeSegment segment : cube.getSegments()) {
+ metaResource.add(segment.getStatisticsResourcePath());
+ dictAndSnapshot.addAll(segment.getSnapshotPaths());
+ dictAndSnapshot.addAll(segment.getDictionaryPaths());
+ }
}
if (doAclCopy) {
@@ -308,7 +342,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
protected enum OptType {
- COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE
+ COPY_FILE_IN_META, COPY_DICT_OR_SNAPSHOT, RENAME_FOLDER_IN_HDFS, ADD_INTO_PROJECT, CHANGE_HTABLE_HOST, PURGE_AND_DISABLE, CLEAR_SEGMENTS
}
protected void addOpt(OptType type, Object[] params) {
@@ -490,7 +524,7 @@ public class CubeMigrationCLI extends AbstractApplication {
project.addTable(tableRef.getTableIdentity());
}
- if (project.getModels().contains(modelName) == false)
+ if (!project.getModels().contains(modelName))
project.addModel(modelName);
project.removeRealization(RealizationType.CUBE, cubeName);
project.addRealizationEntry(RealizationType.CUBE, cubeName);
@@ -499,6 +533,20 @@ public class CubeMigrationCLI extends AbstractApplication {
logger.info("Project instance for " + projectName + " is corrected");
break;
}
+ case CLEAR_SEGMENTS: {
+ String cubeName = (String) opt.params[0];
+ String cubeInstancePath = CubeInstance.concatResourcePath(cubeName);
+ Serializer<CubeInstance> cubeInstanceSerializer = new JsonSerializer<CubeInstance>(CubeInstance.class);
+ CubeInstance cubeInstance = dstStore.getResource(cubeInstancePath, CubeInstance.class,
+ cubeInstanceSerializer);
+ cubeInstance.getSegments().clear();
+ cubeInstance.clearCuboids();
+ cubeInstance.setCreateTimeUTC(System.currentTimeMillis());
+ cubeInstance.setStatus(RealizationStatusEnum.DISABLED);
+ dstStore.putResource(cubeInstancePath, cubeInstance, cubeInstanceSerializer);
+ logger.info("Cleared segments for " + cubeName + ", since segments has not been copied");
+ break;
+ }
case PURGE_AND_DISABLE: {
String cubeName = (String) opt.params[0];
String cubeResPath = CubeInstance.concatResourcePath(cubeName);