You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/03/21 10:41:51 UTC

[kylin] branch master updated: KYLIN-3270: add integration test for optimize job

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 24042e2  KYLIN-3270: add integration test for optimize job
24042e2 is described below

commit 24042e2209d85b0c8de98a86d9a573aff182d9c9
Author: Zhong <nj...@apache.org>
AuthorDate: Thu Mar 1 20:08:01 2018 +0800

    KYLIN-3270: add integration test for optimize job
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../java/org/apache/kylin/cube/CubeManager.java    | 16 ++--
 .../java/org/apache/kylin/cube/model/CubeDesc.java | 72 ++++++++--------
 .../kylin/engine/mr/steps/CopyDictionaryStep.java  |  2 +-
 .../kylin/engine/spark/SparkCubingByLayer.java     |  3 +-
 .../kylin/provision/BuildCubeWithEngine.java       | 97 ++++++++++++++++++----
 5 files changed, 132 insertions(+), 58 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index dc370e2..15bb676 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -348,6 +348,10 @@ public class CubeManager implements IRealizationProvider {
             cube.setCuboids(update.getCuboids());
         }
 
+        if (update.getCuboidsRecommend() != null) {
+            cube.setCuboidsRecommend(update.getCuboidsRecommend());
+        }
+
         try {
             cube = crud.save(cube);
         } catch (IllegalStateException ise) {
@@ -637,19 +641,21 @@ public class CubeManager implements IRealizationProvider {
         }
 
         public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException {
-            checkReadyForOptimize(cube);
+            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy
 
-            List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
+            checkReadyForOptimize(cubeCopy);
+
+            List<CubeSegment> readySegments = cubeCopy.getSegments(SegmentStatusEnum.READY);
             CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()];
             int i = 0;
             for (CubeSegment segment : readySegments) {
-                CubeSegment newSegment = newSegment(cube, segment.getTSRange(), null);
-                validateNewSegments(cube, newSegment);
+                CubeSegment newSegment = newSegment(cubeCopy, segment.getTSRange(), null);
+                validateNewSegments(cubeCopy, newSegment);
 
                 optimizeSegments[i++] = newSegment;
             }
 
-            CubeUpdate update = new CubeUpdate(cube);
+            CubeUpdate update = new CubeUpdate(cubeCopy);
             update.setCuboidsRecommend(cuboidsRecommend);
             update.setToAddSegs(optimizeSegments);
             updateCube(update);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index dbd8708..93c327d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -18,28 +18,16 @@
 
 package org.apache.kylin.cube.model;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.lang.reflect.Method;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -77,16 +65,27 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.lang.reflect.Method;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  */
@@ -677,12 +676,18 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
     }
 
     private void initMandatoryCuboids() {
+        this.mandatoryCuboids.clear();
+        this.mandatoryCuboids.addAll(generateMandatoryCuboids(this.mandatoryDimensionSetList));
+    }
+
+    public Set<Long> generateMandatoryCuboids(List<Set<String>> mandatoryDimensionSetList) {
         Map<String, RowKeyColDesc> rowKeyColDescMap = Maps.newHashMap();
         for (RowKeyColDesc entry : getRowkey().getRowKeyColumns()) {
             rowKeyColDescMap.put(entry.getColumn(), entry);
         }
 
-        for (Set<String> mandatoryDimensionSet : this.mandatoryDimensionSetList) {
+        Set<Long> mandatoryCuboids = Sets.newHashSetWithExpectedSize(mandatoryDimensionSetList.size());
+        for (Set<String> mandatoryDimensionSet : mandatoryDimensionSetList) {
             long cuboid = 0L;
             for (String columnName : mandatoryDimensionSet) {
                 TblColRef tblColRef = model.findColumn(columnName);
@@ -697,6 +702,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
             }
             mandatoryCuboids.add(cuboid);
         }
+        return mandatoryCuboids;
     }
 
     public CuboidScheduler getInitialCuboidScheduler() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
index c0b3c99..dbabc12 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
@@ -44,7 +44,7 @@ public class CopyDictionaryStep extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         final CubeManager mgr = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
         final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
 
         CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 0d26815..714991d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -45,7 +45,6 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.common.RowKeySplitter;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.cuboid.CuboidUtil;
 import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -184,7 +183,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
             reducerFunction2 = new CuboidReducerFunction2(cubeName, metaUrl, sConf, needAggr);
         }
 
-        final int totalLevels = CuboidUtil.getLongestDepth(cubeSegment.getCuboidScheduler().getAllCuboidIds());
+        final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
         int level = 0;
         int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index ce7b892..afd9788 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,20 +18,9 @@
 
 package org.apache.kylin.provision;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,12 +34,15 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -63,7 +55,21 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public class BuildCubeWithEngine {
 
@@ -171,7 +177,8 @@ public class BuildCubeWithEngine {
         }
         cubeManager = CubeManager.getInstance(kylinConfig);
         for (String jobId : jobService.getAllJobIds()) {
-            if (jobService.getJob(jobId) instanceof CubingJob) {
+            AbstractExecutable executable = jobService.getJob(jobId);
+            if (executable instanceof CubingJob || executable instanceof CheckpointExecutable) {
                 jobService.deleteJob(jobId);
             }
         }
@@ -228,7 +235,7 @@ public class BuildCubeWithEngine {
             for (int i = 0; i < tasks.size(); ++i) {
                 Future<Boolean> task = tasks.get(i);
                 final Boolean result = task.get();
-                if (result == false) {
+                if (!result) {
                     throw new RuntimeException("The test '" + testCase[i] + "' is failed.");
                 }
             }
@@ -299,6 +306,8 @@ public class BuildCubeWithEngine {
             return false;
         if (!buildSegment(cubeName, date2, date3))
             return false;
+        if (!optimizeCube(cubeName))
+            return false;
         if (!buildSegment(cubeName, date3, date4))
             return false;
         if (!buildSegment(cubeName, date4, date5)) // one empty segment
@@ -339,6 +348,24 @@ public class BuildCubeWithEngine {
         cubeManager.updateCubeDropSegments(cube, cube.getSegments());
     }
 
+    private Boolean optimizeCube(String cubeName) throws Exception {
+        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        Set<Long> cuboidsRecommend = mockRecommendCuboids(cubeInstance, 0.05, 255);
+        CubeSegment[] optimizeSegments = cubeManager.optimizeSegments(cubeInstance, cuboidsRecommend);
+        List<AbstractExecutable> optimizeJobList = Lists.newArrayListWithExpectedSize(optimizeSegments.length);
+        for (CubeSegment optimizeSegment : optimizeSegments) {
+            DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, "TEST");
+            jobService.addJob(optimizeJob);
+            optimizeJobList.add(optimizeJob);
+            optimizeSegment.setLastBuildJobID(optimizeJob.getId());
+        }
+        CheckpointExecutable checkpointJob = new BatchOptimizeJobCheckpointBuilder(cubeInstance, "TEST").build();
+        checkpointJob.addTaskListForCheck(optimizeJobList);
+        jobService.addJob(checkpointJob);
+        ExecutableState state = waitForJob(checkpointJob.getId());
+        return Boolean.valueOf(ExecutableState.SUCCEED == state);
+    }
+
     private Boolean mergeSegment(String cubeName, long startDate, long endDate) throws Exception {
         CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), new TSRange(startDate, endDate), null, true);
         DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST");
@@ -356,6 +383,42 @@ public class BuildCubeWithEngine {
         return Boolean.valueOf(ExecutableState.SUCCEED == state);
     }
 
+    private Set<Long> mockRecommendCuboids(CubeInstance cubeInstance, double maxRatio, int maxNumber) {
+        Preconditions.checkArgument(maxRatio > 0.0 && maxRatio < 1.0);
+        Preconditions.checkArgument(maxNumber > 0);
+        Set<Long> cuboidsRecommend;
+        Random rnd = new Random();
+
+        // add some mandatory cuboids which are for other unit test
+        // - org.apache.kylin.query.ITCombinationTest.testLimitEnabled
+        // - org.apache.kylin.query.ITFailfastQueryTest.testPartitionNotExceedMaxScanBytes
+        // - org.apache.kylin.query.ITFailfastQueryTest.testQueryNotExceedMaxScanBytes
+        List<Set<String>> mandatoryDimensionSetList = Lists.newLinkedList();
+        mandatoryDimensionSetList.add(Sets.newHashSet("CAL_DT"));
+        mandatoryDimensionSetList.add(Sets.newHashSet("seller_id", "CAL_DT"));
+        mandatoryDimensionSetList.add(Sets.newHashSet("LSTG_FORMAT_NAME", "slr_segment_cd"));
+        Set<Long> mandatoryCuboids = cubeInstance.getDescriptor().generateMandatoryCuboids(mandatoryDimensionSetList);
+
+        CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler();
+        Set<Long> cuboidsCurrent = cuboidScheduler.getAllCuboidIds();
+        long baseCuboid = cuboidScheduler.getBaseCuboidId();
+        do {
+            cuboidsRecommend = Sets.newHashSet();
+            cuboidsRecommend.add(baseCuboid);
+            cuboidsRecommend.addAll(mandatoryCuboids);
+            for (long i = 1; i < baseCuboid; i++) {
+                if (rnd.nextDouble() < maxRatio) { // add 5% cuboids
+                    cuboidsRecommend.add(i);
+                }
+                if (cuboidsRecommend.size() > maxNumber) {
+                    break;
+                }
+            }
+        } while (cuboidsRecommend.equals(cuboidsCurrent));
+
+        return cuboidsRecommend;
+    }
+
     @SuppressWarnings("unused")
     private int cleanupOldStorage() throws Exception {
         String[] args = { "--delete", "true" };

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.