You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/03/21 03:40:40 UTC
[kylin] branch 2.3.x updated: KYLIN-3270: add integration test for
optimize job
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch 2.3.x
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/2.3.x by this push:
new 97e572c KYLIN-3270: add integration test for optimize job
97e572c is described below
commit 97e572c48338678f13f6da3b560dce0328570f06
Author: Zhong <nj...@apache.org>
AuthorDate: Thu Mar 1 20:08:01 2018 +0800
KYLIN-3270: add integration test for optimize job
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../java/org/apache/kylin/cube/CubeManager.java | 16 ++-
.../java/org/apache/kylin/cube/model/CubeDesc.java | 72 +++++++-------
.../kylin/engine/mr/steps/CopyDictionaryStep.java | 2 +-
.../kylin/provision/BuildCubeWithEngine.java | 109 ++++++++++++++++-----
4 files changed, 137 insertions(+), 62 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index dc370e2..15bb676 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -348,6 +348,10 @@ public class CubeManager implements IRealizationProvider {
cube.setCuboids(update.getCuboids());
}
+ if (update.getCuboidsRecommend() != null) {
+ cube.setCuboidsRecommend(update.getCuboidsRecommend());
+ }
+
try {
cube = crud.save(cube);
} catch (IllegalStateException ise) {
@@ -637,19 +641,21 @@ public class CubeManager implements IRealizationProvider {
}
public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException {
- checkReadyForOptimize(cube);
+ CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy
- List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
+ checkReadyForOptimize(cubeCopy);
+
+ List<CubeSegment> readySegments = cubeCopy.getSegments(SegmentStatusEnum.READY);
CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()];
int i = 0;
for (CubeSegment segment : readySegments) {
- CubeSegment newSegment = newSegment(cube, segment.getTSRange(), null);
- validateNewSegments(cube, newSegment);
+ CubeSegment newSegment = newSegment(cubeCopy, segment.getTSRange(), null);
+ validateNewSegments(cubeCopy, newSegment);
optimizeSegments[i++] = newSegment;
}
- CubeUpdate update = new CubeUpdate(cube);
+ CubeUpdate update = new CubeUpdate(cubeCopy);
update.setCuboidsRecommend(cuboidsRecommend);
update.setToAddSegs(optimizeSegments);
updateCube(update);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index dbd8708..93c327d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -18,28 +18,16 @@
package org.apache.kylin.cube.model;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.lang.reflect.Method;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.ArrayUtils;
@@ -77,16 +65,27 @@ import org.apache.kylin.metadata.realization.RealizationType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.lang.reflect.Method;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
/**
*/
@@ -677,12 +676,18 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
}
private void initMandatoryCuboids() {
+ this.mandatoryCuboids.clear();
+ this.mandatoryCuboids.addAll(generateMandatoryCuboids(this.mandatoryDimensionSetList));
+ }
+
+ public Set<Long> generateMandatoryCuboids(List<Set<String>> mandatoryDimensionSetList) {
Map<String, RowKeyColDesc> rowKeyColDescMap = Maps.newHashMap();
for (RowKeyColDesc entry : getRowkey().getRowKeyColumns()) {
rowKeyColDescMap.put(entry.getColumn(), entry);
}
- for (Set<String> mandatoryDimensionSet : this.mandatoryDimensionSetList) {
+ Set<Long> mandatoryCuboids = Sets.newHashSetWithExpectedSize(mandatoryDimensionSetList.size());
+ for (Set<String> mandatoryDimensionSet : mandatoryDimensionSetList) {
long cuboid = 0L;
for (String columnName : mandatoryDimensionSet) {
TblColRef tblColRef = model.findColumn(columnName);
@@ -697,6 +702,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
}
mandatoryCuboids.add(cuboid);
}
+ return mandatoryCuboids;
}
public CuboidScheduler getInitialCuboidScheduler() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
index c0b3c99..dbabc12 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
@@ -44,7 +44,7 @@ public class CopyDictionaryStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
final CubeManager mgr = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 18a07cf..810be0a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,19 +18,9 @@
package org.apache.kylin.provision;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -44,12 +34,15 @@ import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.job.DeployUtil;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
@@ -62,7 +55,20 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public class BuildCubeWithEngine {
@@ -102,7 +108,7 @@ public class BuildCubeWithEngine {
public static void beforeClass() throws Exception {
beforeClass(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
}
-
+
public static void beforeClass(String confDir) throws Exception {
logger.info("Adding to classpath: " + new File(confDir).getAbsolutePath());
ClassUtil.addClasspath(new File(confDir).getAbsolutePath());
@@ -163,7 +169,8 @@ public class BuildCubeWithEngine {
}
cubeManager = CubeManager.getInstance(kylinConfig);
for (String jobId : jobService.getAllJobIds()) {
- if (jobService.getJob(jobId) instanceof CubingJob) {
+ AbstractExecutable executable = jobService.getJob(jobId);
+ if (executable instanceof CubingJob || executable instanceof CheckpointExecutable) {
jobService.deleteJob(jobId);
}
}
@@ -220,7 +227,7 @@ public class BuildCubeWithEngine {
for (int i = 0; i < tasks.size(); ++i) {
Future<Boolean> task = tasks.get(i);
final Boolean result = task.get();
- if (result == false) {
+ if (!result) {
throw new RuntimeException("The test '" + testCase[i] + "' is failed.");
}
}
@@ -281,10 +288,12 @@ public class BuildCubeWithEngine {
if (fastBuildMode) {
return buildSegment(cubeName, date1, date4);
} else {
- if (buildSegment(cubeName, date1, date2) == true) {
- if (buildSegment(cubeName, date2, date3) == true) {
- if (buildSegment(cubeName, date3, date4) == true) {
- return mergeSegment(cubeName, date1, date3); // don't merge all segments
+ if (buildSegment(cubeName, date1, date2)) {
+ if (buildSegment(cubeName, date2, date3)) {
+ if (optimizeCube(cubeName)) {
+ if (buildSegment(cubeName, date3, date4)) {
+ return mergeSegment(cubeName, date1, date3); // don't merge all segments
+ }
}
}
}
@@ -309,7 +318,7 @@ public class BuildCubeWithEngine {
if (fastBuildMode) {
return buildSegment(cubeName, date1, date3);
} else {
- if (buildSegment(cubeName, date1, date2) == true) { // all-in-one build
+ if (buildSegment(cubeName, date1, date2)) { // all-in-one build
return buildSegment(cubeName, date2, date3); // empty segment
}
}
@@ -325,6 +334,24 @@ public class BuildCubeWithEngine {
}
}
+ private Boolean optimizeCube(String cubeName) throws Exception {
+ CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+ Set<Long> cuboidsRecommend = mockRecommendCuboids(cubeInstance, 0.05, 255);
+ CubeSegment[] optimizeSegments = cubeManager.optimizeSegments(cubeInstance, cuboidsRecommend);
+ List<AbstractExecutable> optimizeJobList = Lists.newArrayListWithExpectedSize(optimizeSegments.length);
+ for (CubeSegment optimizeSegment : optimizeSegments) {
+ DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, "TEST");
+ jobService.addJob(optimizeJob);
+ optimizeJobList.add(optimizeJob);
+ optimizeSegment.setLastBuildJobID(optimizeJob.getId());
+ }
+ CheckpointExecutable checkpointJob = new BatchOptimizeJobCheckpointBuilder(cubeInstance, "TEST").build();
+ checkpointJob.addTaskListForCheck(optimizeJobList);
+ jobService.addJob(checkpointJob);
+ ExecutableState state = waitForJob(checkpointJob.getId());
+ return Boolean.valueOf(ExecutableState.SUCCEED == state);
+ }
+
private void clearSegment(String cubeName) throws Exception {
CubeInstance cube = cubeManager.getCube(cubeName);
cubeManager.updateCubeDropSegments(cube, cube.getSegments());
@@ -347,9 +374,45 @@ public class BuildCubeWithEngine {
return Boolean.valueOf(ExecutableState.SUCCEED == state);
}
+ private Set<Long> mockRecommendCuboids(CubeInstance cubeInstance, double maxRatio, int maxNumber) {
+ Preconditions.checkArgument(maxRatio > 0.0 && maxRatio < 1.0);
+ Preconditions.checkArgument(maxNumber > 0);
+ Set<Long> cuboidsRecommend;
+ Random rnd = new Random();
+
+ // add some mandatory cuboids which are for other unit test
+ // - org.apache.kylin.query.ITCombinationTest.testLimitEnabled
+ // - org.apache.kylin.query.ITFailfastQueryTest.testPartitionNotExceedMaxScanBytes
+ // - org.apache.kylin.query.ITFailfastQueryTest.testQueryNotExceedMaxScanBytes
+ List<Set<String>> mandatoryDimensionSetList = Lists.newLinkedList();
+ mandatoryDimensionSetList.add(Sets.newHashSet("CAL_DT"));
+ mandatoryDimensionSetList.add(Sets.newHashSet("seller_id", "CAL_DT"));
+ mandatoryDimensionSetList.add(Sets.newHashSet("LSTG_FORMAT_NAME", "slr_segment_cd"));
+ Set<Long> mandatoryCuboids = cubeInstance.getDescriptor().generateMandatoryCuboids(mandatoryDimensionSetList);
+
+ CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler();
+ Set<Long> cuboidsCurrent = cuboidScheduler.getAllCuboidIds();
+ long baseCuboid = cuboidScheduler.getBaseCuboidId();
+ do {
+ cuboidsRecommend = Sets.newHashSet();
+ cuboidsRecommend.add(baseCuboid);
+ cuboidsRecommend.addAll(mandatoryCuboids);
+ for (long i = 1; i < baseCuboid; i++) {
+ if (rnd.nextDouble() < maxRatio) { // add 5% cuboids
+ cuboidsRecommend.add(i);
+ }
+ if (cuboidsRecommend.size() > maxNumber) {
+ break;
+ }
+ }
+ } while (cuboidsRecommend.equals(cuboidsCurrent));
+
+ return cuboidsRecommend;
+ }
+
@SuppressWarnings("unused")
private int cleanupOldStorage() throws Exception {
- String[] args = { "--delete", "true" };
+ String[] args = {"--delete", "true"};
StorageCleanupJob cli = new StorageCleanupJob();
cli.execute(args);
--
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.