You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/03/21 03:40:40 UTC

[kylin] branch 2.3.x updated: KYLIN-3270: add integration test for optimize job

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch 2.3.x
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/2.3.x by this push:
     new 97e572c  KYLIN-3270: add integration test for optimize job
97e572c is described below

commit 97e572c48338678f13f6da3b560dce0328570f06
Author: Zhong <nj...@apache.org>
AuthorDate: Thu Mar 1 20:08:01 2018 +0800

    KYLIN-3270: add integration test for optimize job
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../java/org/apache/kylin/cube/CubeManager.java    |  16 ++-
 .../java/org/apache/kylin/cube/model/CubeDesc.java |  72 +++++++-------
 .../kylin/engine/mr/steps/CopyDictionaryStep.java  |   2 +-
 .../kylin/provision/BuildCubeWithEngine.java       | 109 ++++++++++++++++-----
 4 files changed, 137 insertions(+), 62 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index dc370e2..15bb676 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -348,6 +348,10 @@ public class CubeManager implements IRealizationProvider {
             cube.setCuboids(update.getCuboids());
         }
 
+        if (update.getCuboidsRecommend() != null) {
+            cube.setCuboidsRecommend(update.getCuboidsRecommend());
+        }
+
         try {
             cube = crud.save(cube);
         } catch (IllegalStateException ise) {
@@ -637,19 +641,21 @@ public class CubeManager implements IRealizationProvider {
         }
 
         public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException {
-            checkReadyForOptimize(cube);
+            CubeInstance cubeCopy = cube.latestCopyForWrite(); // get a latest copy
 
-            List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
+            checkReadyForOptimize(cubeCopy);
+
+            List<CubeSegment> readySegments = cubeCopy.getSegments(SegmentStatusEnum.READY);
             CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()];
             int i = 0;
             for (CubeSegment segment : readySegments) {
-                CubeSegment newSegment = newSegment(cube, segment.getTSRange(), null);
-                validateNewSegments(cube, newSegment);
+                CubeSegment newSegment = newSegment(cubeCopy, segment.getTSRange(), null);
+                validateNewSegments(cubeCopy, newSegment);
 
                 optimizeSegments[i++] = newSegment;
             }
 
-            CubeUpdate update = new CubeUpdate(cube);
+            CubeUpdate update = new CubeUpdate(cubeCopy);
             update.setCuboidsRecommend(cuboidsRecommend);
             update.setToAddSegs(optimizeSegments);
             updateCube(update);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index dbd8708..93c327d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -18,28 +18,16 @@
 
 package org.apache.kylin.cube.model;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.lang.reflect.Method;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -77,16 +65,27 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonInclude;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.lang.reflect.Method;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeSet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  */
@@ -677,12 +676,18 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
     }
 
     private void initMandatoryCuboids() {
+        this.mandatoryCuboids.clear();
+        this.mandatoryCuboids.addAll(generateMandatoryCuboids(this.mandatoryDimensionSetList));
+    }
+
+    public Set<Long> generateMandatoryCuboids(List<Set<String>> mandatoryDimensionSetList) {
         Map<String, RowKeyColDesc> rowKeyColDescMap = Maps.newHashMap();
         for (RowKeyColDesc entry : getRowkey().getRowKeyColumns()) {
             rowKeyColDescMap.put(entry.getColumn(), entry);
         }
 
-        for (Set<String> mandatoryDimensionSet : this.mandatoryDimensionSetList) {
+        Set<Long> mandatoryCuboids = Sets.newHashSetWithExpectedSize(mandatoryDimensionSetList.size());
+        for (Set<String> mandatoryDimensionSet : mandatoryDimensionSetList) {
             long cuboid = 0L;
             for (String columnName : mandatoryDimensionSet) {
                 TblColRef tblColRef = model.findColumn(columnName);
@@ -697,6 +702,7 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
             }
             mandatoryCuboids.add(cuboid);
         }
+        return mandatoryCuboids;
     }
 
     public CuboidScheduler getInitialCuboidScheduler() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
index c0b3c99..dbabc12 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
@@ -44,7 +44,7 @@ public class CopyDictionaryStep extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         final CubeManager mgr = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams())).latestCopyForWrite();
         final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
 
         CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 18a07cf..810be0a 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,19 +18,9 @@
 
 package org.apache.kylin.provision;
 
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,12 +34,15 @@ import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.DeployUtil;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -62,7 +55,20 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 public class BuildCubeWithEngine {
 
@@ -102,7 +108,7 @@ public class BuildCubeWithEngine {
     public static void beforeClass() throws Exception {
         beforeClass(HBaseMetadataTestCase.SANDBOX_TEST_DATA);
     }
-    
+
     public static void beforeClass(String confDir) throws Exception {
         logger.info("Adding to classpath: " + new File(confDir).getAbsolutePath());
         ClassUtil.addClasspath(new File(confDir).getAbsolutePath());
@@ -163,7 +169,8 @@ public class BuildCubeWithEngine {
         }
         cubeManager = CubeManager.getInstance(kylinConfig);
         for (String jobId : jobService.getAllJobIds()) {
-            if (jobService.getJob(jobId) instanceof CubingJob) {
+            AbstractExecutable executable = jobService.getJob(jobId);
+            if (executable instanceof CubingJob || executable instanceof CheckpointExecutable) {
                 jobService.deleteJob(jobId);
             }
         }
@@ -220,7 +227,7 @@ public class BuildCubeWithEngine {
             for (int i = 0; i < tasks.size(); ++i) {
                 Future<Boolean> task = tasks.get(i);
                 final Boolean result = task.get();
-                if (result == false) {
+                if (!result) {
                     throw new RuntimeException("The test '" + testCase[i] + "' is failed.");
                 }
             }
@@ -281,10 +288,12 @@ public class BuildCubeWithEngine {
         if (fastBuildMode) {
             return buildSegment(cubeName, date1, date4);
         } else {
-            if (buildSegment(cubeName, date1, date2) == true) {
-                if (buildSegment(cubeName, date2, date3) == true) {
-                    if (buildSegment(cubeName, date3, date4) == true) {
-                        return mergeSegment(cubeName, date1, date3); // don't merge all segments
+            if (buildSegment(cubeName, date1, date2)) {
+                if (buildSegment(cubeName, date2, date3)) {
+                    if (optimizeCube(cubeName)) {
+                        if (buildSegment(cubeName, date3, date4)) {
+                            return mergeSegment(cubeName, date1, date3); // don't merge all segments
+                        }
                     }
                 }
             }
@@ -309,7 +318,7 @@ public class BuildCubeWithEngine {
         if (fastBuildMode) {
             return buildSegment(cubeName, date1, date3);
         } else {
-            if (buildSegment(cubeName, date1, date2) == true) { // all-in-one build
+            if (buildSegment(cubeName, date1, date2)) { // all-in-one build
                 return buildSegment(cubeName, date2, date3); // empty segment
             }
         }
@@ -325,6 +334,24 @@ public class BuildCubeWithEngine {
         }
     }
 
+    private Boolean optimizeCube(String cubeName) throws Exception {
+        CubeInstance cubeInstance = cubeManager.getCube(cubeName);
+        Set<Long> cuboidsRecommend = mockRecommendCuboids(cubeInstance, 0.05, 255);
+        CubeSegment[] optimizeSegments = cubeManager.optimizeSegments(cubeInstance, cuboidsRecommend);
+        List<AbstractExecutable> optimizeJobList = Lists.newArrayListWithExpectedSize(optimizeSegments.length);
+        for (CubeSegment optimizeSegment : optimizeSegments) {
+            DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, "TEST");
+            jobService.addJob(optimizeJob);
+            optimizeJobList.add(optimizeJob);
+            optimizeSegment.setLastBuildJobID(optimizeJob.getId());
+        }
+        CheckpointExecutable checkpointJob = new BatchOptimizeJobCheckpointBuilder(cubeInstance, "TEST").build();
+        checkpointJob.addTaskListForCheck(optimizeJobList);
+        jobService.addJob(checkpointJob);
+        ExecutableState state = waitForJob(checkpointJob.getId());
+        return Boolean.valueOf(ExecutableState.SUCCEED == state);
+    }
+
     private void clearSegment(String cubeName) throws Exception {
         CubeInstance cube = cubeManager.getCube(cubeName);
         cubeManager.updateCubeDropSegments(cube, cube.getSegments());
@@ -347,9 +374,45 @@ public class BuildCubeWithEngine {
         return Boolean.valueOf(ExecutableState.SUCCEED == state);
     }
 
+    private Set<Long> mockRecommendCuboids(CubeInstance cubeInstance, double maxRatio, int maxNumber) {
+        Preconditions.checkArgument(maxRatio > 0.0 && maxRatio < 1.0);
+        Preconditions.checkArgument(maxNumber > 0);
+        Set<Long> cuboidsRecommend;
+        Random rnd = new Random();
+
+        // add some mandatory cuboids which are for other unit test
+        // - org.apache.kylin.query.ITCombinationTest.testLimitEnabled
+        // - org.apache.kylin.query.ITFailfastQueryTest.testPartitionNotExceedMaxScanBytes
+        // - org.apache.kylin.query.ITFailfastQueryTest.testQueryNotExceedMaxScanBytes
+        List<Set<String>> mandatoryDimensionSetList = Lists.newLinkedList();
+        mandatoryDimensionSetList.add(Sets.newHashSet("CAL_DT"));
+        mandatoryDimensionSetList.add(Sets.newHashSet("seller_id", "CAL_DT"));
+        mandatoryDimensionSetList.add(Sets.newHashSet("LSTG_FORMAT_NAME", "slr_segment_cd"));
+        Set<Long> mandatoryCuboids = cubeInstance.getDescriptor().generateMandatoryCuboids(mandatoryDimensionSetList);
+
+        CuboidScheduler cuboidScheduler = cubeInstance.getCuboidScheduler();
+        Set<Long> cuboidsCurrent = cuboidScheduler.getAllCuboidIds();
+        long baseCuboid = cuboidScheduler.getBaseCuboidId();
+        do {
+            cuboidsRecommend = Sets.newHashSet();
+            cuboidsRecommend.add(baseCuboid);
+            cuboidsRecommend.addAll(mandatoryCuboids);
+            for (long i = 1; i < baseCuboid; i++) {
+                if (rnd.nextDouble() < maxRatio) { // add 5% cuboids
+                    cuboidsRecommend.add(i);
+                }
+                if (cuboidsRecommend.size() > maxNumber) {
+                    break;
+                }
+            }
+        } while (cuboidsRecommend.equals(cuboidsCurrent));
+
+        return cuboidsRecommend;
+    }
+
     @SuppressWarnings("unused")
     private int cleanupOldStorage() throws Exception {
-        String[] args = { "--delete", "true" };
+        String[] args = {"--delete", "true"};
 
         StorageCleanupJob cli = new StorageCleanupJob();
         cli.execute(args);

-- 
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.