You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by nj...@apache.org on 2017/12/02 17:23:59 UTC
[06/19] kylin git commit: APACHE-KYLIN-2733: Introduce optimize job
for adjusting cuboid set
APACHE-KYLIN-2733: Introduce optimize job for adjusting cuboid set
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/99fbd755
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/99fbd755
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/99fbd755
Branch: refs/heads/master
Commit: 99fbd755e167cfe0b4ef137dac66f28020d50415
Parents: 4cfffc2
Author: Zhong <nj...@apache.org>
Authored: Wed Aug 30 11:17:43 2017 +0800
Committer: Zhong <nj...@apache.org>
Committed: Sat Dec 2 23:21:43 2017 +0800
----------------------------------------------------------------------
.../org/apache/kylin/cube/CubeInstance.java | 12 +-
.../java/org/apache/kylin/cube/CubeManager.java | 84 ++++++-
.../java/org/apache/kylin/cube/CubeUpdate.java | 14 +-
.../kylin/cube/common/RowKeySplitter.java | 2 +-
.../org/apache/kylin/cube/cuboid/Cuboid.java | 5 +
.../apache/kylin/cube/cuboid/CuboidUtil.java | 48 ++++
.../cube/cuboid/TreeCuboidSchedulerManager.java | 6 +-
.../cube/inmemcubing/InMemCubeBuilder.java | 2 +-
.../org/apache/kylin/cube/kv/RowKeyDecoder.java | 4 +-
.../org/apache/kylin/engine/EngineFactory.java | 4 +
.../apache/kylin/engine/IBatchCubingEngine.java | 3 +
.../kylin/job/constant/ExecutableConstants.java | 5 +
.../org/apache/kylin/job/dao/ExecutableDao.java | 18 +-
.../org/apache/kylin/job/dao/ExecutablePO.java | 11 +
.../kylin/job/execution/ExecutableManager.java | 41 +++-
.../job/impl/threadpool/DefaultScheduler.java | 7 +-
.../engine/mr/BatchOptimizeJobBuilder2.java | 227 +++++++++++++++++
.../mr/BatchOptimizeJobCheckpointBuilder.java | 89 +++++++
.../org/apache/kylin/engine/mr/CubingJob.java | 7 +-
.../org/apache/kylin/engine/mr/IMROutput2.java | 29 ++-
.../kylin/engine/mr/JobBuilderSupport.java | 54 ++++-
.../kylin/engine/mr/MRBatchCubingEngine.java | 5 +
.../kylin/engine/mr/MRBatchCubingEngine2.java | 5 +
.../java/org/apache/kylin/engine/mr/MRUtil.java | 4 +
.../engine/mr/common/AbstractHadoopJob.java | 24 +-
.../kylin/engine/mr/common/BatchConstants.java | 1 +
.../kylin/engine/mr/common/CubeStatsReader.java | 126 ++++++----
.../engine/mr/common/CuboidRecommenderUtil.java | 14 +-
.../engine/mr/common/CuboidSchedulerUtil.java | 54 +++++
.../engine/mr/common/CuboidStatsReaderUtil.java | 5 +-
.../kylin/engine/mr/common/MapReduceUtil.java | 117 +++++++++
.../mr/common/StatisticsDecisionUtil.java | 2 +-
.../engine/mr/steps/BaseCuboidMapperBase.java | 2 +-
.../steps/CalculateStatsFromBaseCuboidJob.java | 116 +++++++++
.../CalculateStatsFromBaseCuboidMapper.java | 201 ++++++++++++++++
.../CalculateStatsFromBaseCuboidReducer.java | 112 +++++++++
.../engine/mr/steps/CopyDictionaryStep.java | 70 ++++++
.../engine/mr/steps/CubingExecutableUtil.java | 9 +
.../apache/kylin/engine/mr/steps/CuboidJob.java | 28 ++-
.../mr/steps/FactDistinctColumnsMapper.java | 37 +--
.../mr/steps/FactDistinctColumnsReducer.java | 1 +
.../mr/steps/FilterRecommendCuboidDataJob.java | 103 ++++++++
.../steps/FilterRecommendCuboidDataMapper.java | 107 ++++++++
.../mr/steps/InMemCuboidFromBaseCuboidJob.java | 154 ++++++++++++
.../steps/InMemCuboidFromBaseCuboidMapper.java | 96 ++++++++
.../steps/InMemCuboidFromBaseCuboidReducer.java | 23 ++
.../kylin/engine/mr/steps/InMemCuboidJob.java | 4 +-
.../engine/mr/steps/InMemCuboidMapperBase.java | 10 +-
.../kylin/engine/mr/steps/KVGTRecordWriter.java | 2 +-
.../engine/mr/steps/MergeCuboidMapper.java | 2 +-
.../mr/steps/MergeStatisticsWithOldStep.java | 144 +++++++++++
.../kylin/engine/mr/steps/NDCuboidMapper.java | 12 +-
.../kylin/engine/mr/steps/ReducerNumSizing.java | 106 --------
.../UpdateCubeInfoAfterCheckpointStep.java | 69 ++++++
.../steps/UpdateCubeInfoAfterOptimizeStep.java | 72 ++++++
.../mr/steps/UpdateOldCuboidShardJob.java | 105 ++++++++
.../mr/steps/UpdateOldCuboidShardMapper.java | 142 +++++++++++
.../kylin/engine/spark/SparkCubingByLayer.java | 2 +-
.../kylin/rest/controller/CubeController.java | 63 +++++
.../kylin/rest/request/JobOptimizeRequest.java | 34 +++
.../apache/kylin/rest/service/CubeService.java | 4 +
.../apache/kylin/rest/service/JobService.java | 241 ++++++++++++++++++-
.../storage/hbase/steps/CreateHTableJob.java | 19 +-
.../hbase/steps/HBaseMROutput2Transition.java | 41 +++-
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 82 +++++++
65 files changed, 2987 insertions(+), 255 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index f6eceb6..462223a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -148,6 +148,16 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return segments.getMergingSegments(mergedSegment);
}
+ public CubeSegment getOriginalSegmentToOptimize(CubeSegment optimizedSegment) {
+ for (CubeSegment segment : this.getSegments(SegmentStatusEnum.READY)) {
+ if (!optimizedSegment.equals(segment) //
+ && optimizedSegment.getSegRange().equals(segment.getSegRange())) {
+ return segment;
+ }
+ }
+ return null;
+ }
+
public CubeDesc getDescriptor() {
return CubeDescManager.getInstance(config).getCubeDesc(descName);
}
@@ -353,7 +363,7 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return cuboidsRecommend;
case RECOMMEND_MISSING_WITH_BASE:
cuboidsRecommend.removeAll(currentCuboids);
- currentCuboids.add(getCuboidScheduler().getBaseCuboidId());
+ cuboidsRecommend.add(getCuboidScheduler().getBaseCuboidId());
return cuboidsRecommend;
default:
return null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 388c840..5e72721 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -30,6 +30,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -76,6 +77,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* @author yangli9
@@ -406,6 +408,10 @@ public class CubeManager implements IRealizationProvider {
cube.setCuboids(update.getCuboids());
}
+ if (update.getCuboidsRecommend() != null) {
+ cube.setCuboidsRecommend(update.getCuboidsRecommend());
+ }
+
try {
getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER);
} catch (IllegalStateException ise) {
@@ -483,8 +489,7 @@ public class CubeManager implements IRealizationProvider {
return newSegment;
}
- public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange)
- throws IOException {
+ public CubeSegment refreshSegment(CubeInstance cube, TSRange tsRange, SegmentRange segRange) throws IOException {
checkInputRanges(tsRange, segRange);
checkBuildingSegment(cube);
@@ -519,6 +524,27 @@ public class CubeManager implements IRealizationProvider {
return newSegment;
}
+ public CubeSegment[] optimizeSegments(CubeInstance cube, Set<Long> cuboidsRecommend) throws IOException {
+ checkReadyForOptimize(cube);
+
+ List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
+ CubeSegment[] optimizeSegments = new CubeSegment[readySegments.size()];
+ int i = 0;
+ for (CubeSegment segment : readySegments) {
+ CubeSegment newSegment = newSegment(cube, segment.getTSRange(), null);
+ validateNewSegments(cube, newSegment);
+
+ optimizeSegments[i++] = newSegment;
+ }
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setCuboidsRecommend(cuboidsRecommend);
+ cubeBuilder.setToAddSegs(optimizeSegments);
+ updateCube(cubeBuilder);
+
+ return optimizeSegments;
+ }
+
public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange, boolean force)
throws IOException {
if (cube.getSegments().isEmpty())
@@ -594,8 +620,15 @@ public class CubeManager implements IRealizationProvider {
}
private void checkBuildingSegment(CubeInstance cube) {
- int maxBuldingSeg = cube.getConfig().getMaxBuildingSegments();
- if (cube.getBuildingSegments().size() >= maxBuldingSeg) {
+ checkBuildingSegment(cube, cube.getConfig().getMaxBuildingSegments());
+ }
+
+ public void checkReadyForOptimize(CubeInstance cube) {
+ checkBuildingSegment(cube, 1);
+ }
+
+ private void checkBuildingSegment(CubeInstance cube, int maxBuildingSeg) {
+ if (cube.getBuildingSegments().size() >= maxBuildingSeg) {
throw new IllegalStateException(
"There is already " + cube.getBuildingSegments().size() + " building segment; ");
}
@@ -725,6 +758,49 @@ public class CubeManager implements IRealizationProvider {
updateCube(cubeBuilder);
}
+ public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments) throws IOException {
+ for (CubeSegment seg : optimizedSegments) {
+ seg.setStatus(SegmentStatusEnum.READY_PENDING);
+ }
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(optimizedSegments);
+ updateCube(cubeBuilder);
+ }
+
+ public void promoteCheckpointOptimizeSegments(CubeInstance cube, Map<Long, Long> recommendCuboids,
+ CubeSegment... optimizedSegments) throws IOException {
+ if (cube.getSegments().size() != optimizedSegments.length * 2) {
+ throw new IllegalStateException("For cube " + cube
+ + ", every READY segment should be optimized and all segments should be READY before optimizing");
+ }
+ CubeSegment[] originalSegments = new CubeSegment[optimizedSegments.length];
+ int i = 0;
+ for (CubeSegment seg : optimizedSegments) {
+ originalSegments[i++] = cube.getOriginalSegmentToOptimize(seg);
+
+ if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
+ throw new IllegalStateException(
+ "For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier");
+
+ if (StringUtils.isBlank(seg.getLastBuildJobID()))
+ throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID");
+
+ seg.setStatus(SegmentStatusEnum.READY);
+ }
+
+ logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(optimizedSegments)
+ + ", to remove segments " + originalSegments);
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(originalSegments) //
+ .setToUpdateSegs(optimizedSegments) //
+ .setStatus(RealizationStatusEnum.READY) //
+ .setCuboids(recommendCuboids) //
+ .setCuboidsRecommend(Sets.<Long> newHashSet());
+ updateCube(cubeBuilder);
+ }
+
public void validateNewSegments(CubeInstance cube, CubeSegment newSegments) {
List<CubeSegment> tobe = cube.calculateToBeSegments(newSegments);
List<CubeSegment> newList = Arrays.asList(newSegments);
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
index fae20dc..2e1d652 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -19,6 +19,7 @@
package org.apache.kylin.cube;
import java.util.Map;
+import java.util.Set;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
@@ -34,6 +35,7 @@ public class CubeUpdate {
private String owner;
private int cost = -1;
private Map<Long, Long> cuboids = null;
+ private Set<Long> cuboidsRecommend = null;
public CubeUpdate(CubeInstance cubeInstance) {
this.cubeInstance = cubeInstance;
@@ -106,7 +108,17 @@ public class CubeUpdate {
return cuboids;
}
- public void setCuboids(Map<Long, Long> cuboids) {
+ public CubeUpdate setCuboids(Map<Long, Long> cuboids) {
this.cuboids = cuboids;
+ return this;
+ }
+
+ public Set<Long> getCuboidsRecommend() {
+ return cuboidsRecommend;
+ }
+
+ public CubeUpdate setCuboidsRecommend(Set<Long> cuboidsRecommend) {
+ this.cuboidsRecommend = cuboidsRecommend;
+ return this;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index cd26347..0c54ecf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -115,7 +115,7 @@ public class RowKeySplitter implements java.io.Serializable {
offset += RowConstants.ROWKEY_CUBOIDID_LEN;
long lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
- Cuboid cuboid = Cuboid.findById(cubeSegment, lastSplittedCuboidId);
+ Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, lastSplittedCuboidId);
// rowkey columns
for (int i = 0; i < cuboid.getColumns().size(); i++) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index efd2e2e..3c4fceb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -88,6 +88,11 @@ public class Cuboid implements Comparable<Cuboid>, Serializable {
return cuboidID;
}
+ // for mandatory cuboid, no need to translate cuboid
+ public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) {
+ return new Cuboid(cube, cuboidID, cuboidID);
+ }
+
public static Cuboid findById(CuboidScheduler cuboidScheduler, byte[] cuboidID) {
return findById(cuboidScheduler, Bytes.toLong(cuboidID));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
new file mode 100644
index 0000000..a84f153
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidUtil.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.cube.cuboid;
+
+import com.google.common.base.Preconditions;
+
+public class CuboidUtil {
+
+ public static Integer[][] getCuboidBitSet(Long[] cuboidIds, int nRowKey) {
+ Preconditions.checkArgument(nRowKey < Long.SIZE,
+ "the size of row key could not be large than " + (Long.SIZE - 1));
+
+ Integer[][] allCuboidsBitSet = new Integer[cuboidIds.length][];
+
+ for (int j = 0; j < cuboidIds.length; j++) {
+ Long cuboidId = cuboidIds[j];
+
+ allCuboidsBitSet[j] = new Integer[Long.bitCount(cuboidId)];
+
+ long mask = 1L << (nRowKey - 1);
+ int position = 0;
+ for (int i = 0; i < nRowKey; i++) {
+ if ((mask & cuboidId) > 0) {
+ allCuboidsBitSet[j][position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+ }
+ return allCuboidsBitSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
index 5e8d965..22e636b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/TreeCuboidSchedulerManager.java
@@ -64,7 +64,7 @@ public class TreeCuboidSchedulerManager {
* @param cubeName
* @return null if the cube has no pre-built cuboids
*/
- public static TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) {
+ public TreeCuboidScheduler getTreeCuboidScheduler(String cubeName) {
TreeCuboidScheduler result = cache.get(cubeName);
if (result == null) {
CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
@@ -83,14 +83,14 @@ public class TreeCuboidSchedulerManager {
return result;
}
- public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) {
+ public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, Map<Long, Long> cuboidsWithRowCnt) {
if (cuboidsWithRowCnt == null || cuboidsWithRowCnt.isEmpty()) {
return null;
}
return getTreeCuboidScheduler(cubeDesc, Lists.newArrayList(cuboidsWithRowCnt.keySet()), cuboidsWithRowCnt);
}
- public static TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds,
+ public TreeCuboidScheduler getTreeCuboidScheduler(CubeDesc cubeDesc, List<Long> cuboidIds,
Map<Long, Long> cuboidsWithRowCnt) {
if (cuboidIds == null || cuboidsWithRowCnt == null) {
return null;
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index f63b53f..97bb1de 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -108,7 +108,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
}
private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
- GTInfo info = CubeGridTable.newGTInfo(Cuboid.findById(cuboidScheduler, cuboidID),
+ GTInfo info = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, cuboidID),
new CubeDimEncMap(cubeDesc, dictionaryMap)
);
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 5a1f668..bb03c4c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -36,7 +36,6 @@ import org.apache.kylin.metadata.model.TblColRef;
*/
public class RowKeyDecoder {
- private final CubeSegment cubeSegment;
private final CubeDesc cubeDesc;
private final RowKeyColumnIO colIO;
private final RowKeySplitter rowKeySplitter;
@@ -45,7 +44,6 @@ public class RowKeyDecoder {
private List<String> values;
public RowKeyDecoder(CubeSegment cubeSegment) {
- this.cubeSegment = cubeSegment;
this.cubeDesc = cubeSegment.getCubeDesc();
this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 255);
this.colIO = new RowKeyColumnIO(cubeSegment.getDimensionEncodingMap());
@@ -75,7 +73,7 @@ public class RowKeyDecoder {
if (this.cuboid != null && this.cuboid.getId() == cuboidID) {
return;
}
- this.cuboid = Cuboid.findById(cubeSegment, cuboidID);
+ this.cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
}
private void collectValue(TblColRef col, byte[] valueBytes, int length) throws IOException {
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
index 78b1efe..03d986b 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/EngineFactory.java
@@ -61,4 +61,8 @@ public class EngineFactory {
return batchEngine(mergeSegment).createBatchMergeJob(mergeSegment, submitter);
}
+ /** Optimize a segment based on the cuboid recommend list produced by the cube planner. */
+ public static DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+ return batchEngine(optimizeSegment).createBatchOptimizeJob(optimizeSegment, submitter);
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index 754dbde..a618eac 100644
--- a/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/core-job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -37,6 +37,9 @@ public interface IBatchCubingEngine {
/** Merge multiple small segments into a big one. */
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
+ /** Optimize a segment based on the cuboid recommend list produced by the cube planner. */
+ public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter);
+
public Class<?> getSourceInterface();
public Class<?> getStorageInterface();
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 36496fe..f3caf3b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -37,6 +37,9 @@ public final class ExecutableConstants {
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
+ public static final String STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID = "Calculate Statistics from Base Cuboid";
+ public static final String STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION = "Filter Recommend Cuboid Data for Optimization";
+ public static final String STEP_NAME_UPDATE_OLD_CUBOID_SHARD = "Update Old Cuboid Shard for Optimization";
public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid";
public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube In-Mem";
public static final String STEP_NAME_BUILD_SPARK_CUBE = "Build Cube with Spark";
@@ -45,8 +48,10 @@ public final class ExecutableConstants {
public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
+ public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment";
public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
+ public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization";
public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
index 16875b1..9cfc61a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
@@ -92,8 +92,8 @@ public class ExecutableDao {
return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
}
- private void writeJobResource(String path, ExecutablePO job) throws IOException {
- store.putResource(path, job, JOB_SERIALIZER);
+ private long writeJobResource(String path, ExecutablePO job) throws IOException {
+ return store.putResource(path, job, JOB_SERIALIZER);
}
private ExecutableOutputPO readJobOutputResource(String path) throws IOException {
@@ -179,6 +179,20 @@ public class ExecutableDao {
}
}
+ public ExecutablePO updateJob(ExecutablePO job) throws PersistentException {
+ try {
+ if (getJob(job.getUuid()) == null) {
+ throw new IllegalArgumentException("job id:" + job.getUuid() + " does not exist");
+ }
+ final long ts = writeJobResource(pathOfJob(job), job);
+ job.setLastModified(ts);
+ return job;
+ } catch (IOException e) {
+ logger.error("error update job:" + job.getUuid(), e);
+ throw new PersistentException(e);
+ }
+ }
+
public void deleteJob(String uuid) throws PersistentException {
try {
store.deleteResource(pathOfJob(uuid));
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
index 75717e0..f48c876 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
@@ -39,6 +39,9 @@ public class ExecutablePO extends RootPersistentEntity {
@JsonProperty("tasks")
private List<ExecutablePO> tasks;
+ @JsonProperty("tasks_check")
+ private List<ExecutablePO> tasksForCheck;
+
@JsonProperty("type")
private String type;
@@ -61,6 +64,14 @@ public class ExecutablePO extends RootPersistentEntity {
this.tasks = tasks;
}
+ public List<ExecutablePO> getTasksForCheck() {
+ return tasksForCheck;
+ }
+
+ public void setTasksForCheck(List<ExecutablePO> tasksForCheck) {
+ this.tasksForCheck = tasksForCheck;
+ }
+
public String getType() {
return type;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index bab8c30..bc38fff 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -96,6 +96,13 @@ public class ExecutableManager {
}
result.setTasks(tasks);
}
+ if (executable instanceof CheckpointExecutable) {
+ List<ExecutablePO> tasksForCheck = Lists.newArrayList();
+ for (AbstractExecutable taskForCheck : ((CheckpointExecutable) executable).getSubTasksForCheck()) {
+ tasksForCheck.add(parse(taskForCheck));
+ }
+ result.setTasksForCheck(tasksForCheck);
+ }
return result;
}
@@ -121,6 +128,23 @@ public class ExecutableManager {
}
}
+ public void updateCheckpointJob(String jobId, List<AbstractExecutable> subTasksForCheck) {
+ try {
+ final ExecutablePO job = executableDao.getJob(jobId);
+ Preconditions.checkArgument(job != null, "there is no related job for job id:" + jobId);
+
+ List<ExecutablePO> tasksForCheck = Lists.newArrayListWithExpectedSize(subTasksForCheck.size());
+ for (AbstractExecutable taskForCheck : subTasksForCheck) {
+ tasksForCheck.add(parse(taskForCheck));
+ }
+ job.setTasksForCheck(tasksForCheck);
+ executableDao.updateJob(job);
+ } catch (PersistentException e) {
+ logger.error("fail to update checkpoint job:" + jobId, e);
+ throw new RuntimeException(e);
+ }
+ }
+
//for ut
public void deleteJob(String jobId) {
try {
@@ -349,7 +373,15 @@ public class ExecutableManager {
if (job == null) {
return;
}
-
+ if (job.getStatus().isFinalState()) {
+ if (job.getStatus() != ExecutableState.DISCARDED) {
+ logger.warn("The status of job " + jobId + " is " + job.getStatus().toString()
+ + ". It's final state and cannot be transfer to be discarded!!!");
+ } else {
+ logger.warn("The job " + jobId + " has been discarded.");
+ }
+ return;
+ }
if (job instanceof DefaultChainedExecutable) {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
for (AbstractExecutable task : tasks) {
@@ -501,6 +533,13 @@ public class ExecutableManager {
((ChainedExecutable) result).addTask(parseTo(subTask));
}
}
+ List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck();
+ if (tasksForCheck != null && !tasksForCheck.isEmpty()) {
+ Preconditions.checkArgument(result instanceof CheckpointExecutable);
+ for (ExecutablePO subTaskForCheck : tasksForCheck) {
+ ((CheckpointExecutable) result).addTaskForCheck(parseTo(subTaskForCheck));
+ }
+ }
return result;
} catch (ReflectiveOperationException e) {
throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e);
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
index 64a7db7..6ef9c81 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DefaultScheduler.java
@@ -88,8 +88,9 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
nRunning++;
continue;
}
- final Output output = executableManager.getOutput(id);
- if ((output.getState() != ExecutableState.READY)) {
+ final AbstractExecutable executable = executableManager.getJob(id);
+ if (!executable.isReady()) {
+ final Output output = executableManager.getOutput(id);
// logger.debug("Job id:" + id + " not runnable");
if (output.getState() == ExecutableState.DISCARDED) {
nDiscarded++;
@@ -105,10 +106,8 @@ public class DefaultScheduler implements Scheduler<AbstractExecutable>, Connecti
continue;
}
nReady++;
- AbstractExecutable executable = null;
String jobDesc = null;
try {
- executable = executableManager.getJob(id);
jobDesc = executable.toString();
logger.info(jobDesc + " prepare to schedule");
context.addRunningJob(executable);
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
new file mode 100644
index 0000000..a8127cc
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobBuilder2.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CopyDictionaryStep;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.FilterRecommendCuboidDataJob;
+import org.apache.kylin.engine.mr.steps.InMemCuboidFromBaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsWithOldStep;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterOptimizeStep;
+import org.apache.kylin.engine.mr.steps.UpdateOldCuboidShardJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class BatchOptimizeJobBuilder2 extends JobBuilderSupport {
+ private static final Logger logger = LoggerFactory.getLogger(BatchOptimizeJobBuilder2.class);
+
+ private final IMROutput2.IMRBatchOptimizeOutputSide2 outputSide;
+
+ public BatchOptimizeJobBuilder2(CubeSegment optimizeSegment, String submitter) {
+ super(optimizeSegment, submitter);
+ this.outputSide = MRUtil.getBatchOptimizeOutputSide2(optimizeSegment);
+ }
+
+ public CubingJob build() {
+ logger.info("MR_V2 new job to OPTIMIZE a segment " + seg);
+
+ final CubingJob result = CubingJob.createOptimizeJob(seg, submitter, config);
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+
+ final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
+ final String optimizeCuboidRootPath = getOptimizationCuboidPath(jobId);
+
+ CubeSegment oldSegment = seg.getCubeInstance().getOriginalSegmentToOptimize(seg);
+ Preconditions.checkNotNull(oldSegment, "cannot find the original segment to be optimized by " + seg);
+
+ // Phase 1: Prepare base cuboid data from old segment
+ String oldcuboidRootPath = getCuboidRootPath(oldSegment) + "*";
+ result.addTask(createFilterRecommendCuboidDataStep(oldcuboidRootPath, optimizeCuboidRootPath));
+
+ // Phase 2: Prepare dictionary and statistics for new segment
+ result.addTask(createCopyDictionaryStep());
+ String optStatsSourcePath = getBaseCuboidPath(optimizeCuboidRootPath);
+ String optStatsDstPath = getOptimizationStatisticsPath(jobId);
+ result.addTask(createCalculateStatsFromBaseCuboid(optStatsSourcePath, optStatsDstPath,
+ CuboidModeEnum.RECOMMEND_MISSING));
+ result.addTask(createMergeStatisticsWithOldStep(jobId, optStatsDstPath, getStatisticsPath(jobId)));
+ outputSide.addStepPhase2_CreateHTable(result);
+
+ result.addTask(createUpdateShardForOldCuboidDataStep(optimizeCuboidRootPath + "*", cuboidRootPath));
+
+ // Phase 3: Build Cube for Missing Cuboid Data
+ addLayerCubingSteps(result, jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE, cuboidRootPath); // layer cubing
+ result.addTask(createInMemCubingStep(jobId, CuboidModeEnum.RECOMMEND_MISSING_WITH_BASE, cuboidRootPath));
+
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterOptimizeStep(jobId));
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ public MapReduceExecutable createFilterRecommendCuboidDataStep(String inputPath, String outputPath) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_FILTER_RECOMMEND_CUBOID_DATA_FOR_OPTIMIZATION);
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+ "Kylin_Filter_Recommend_Cuboid_Data_" + seg.getRealization().getName());
+
+ result.setMapReduceParams(cmd.toString());
+ result.setMapReduceJobClass(FilterRecommendCuboidDataJob.class);
+ return result;
+ }
+
+ public CopyDictionaryStep createCopyDictionaryStep() {
+ CopyDictionaryStep result = new CopyDictionaryStep();
+ result.setName(ExecutableConstants.STEP_NAME_COPY_DICTIONARY);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ return result;
+ }
+
+ private MapReduceExecutable createUpdateShardForOldCuboidDataStep(String inputPath, String outputPath) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_UPDATE_OLD_CUBOID_SHARD);
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+ "Kylin_Update_Old_Cuboid_Shard_" + seg.getRealization().getName());
+
+ result.setMapReduceParams(cmd.toString());
+ result.setMapReduceJobClass(UpdateOldCuboidShardJob.class);
+ return result;
+ }
+
+ private MergeStatisticsWithOldStep createMergeStatisticsWithOldStep(final String jobId, final String optStatsPath,
+ final String mergedStatisticsFolder) {
+ MergeStatisticsWithOldStep result = new MergeStatisticsWithOldStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS_WITH_OLD);
+
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setStatisticsPath(optStatsPath, result.getParams());
+ CubingExecutableUtil.setMergedStatisticsPath(mergedStatisticsFolder, result.getParams());
+
+ return result;
+ }
+
+ private void addLayerCubingSteps(final CubingJob result, final String jobId, final CuboidModeEnum cuboidMode,
+ final String cuboidRootPath) {
+ // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime
+ final int maxLevel = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+ // Don't need to build base cuboid
+ // n dim cuboid steps
+ for (int i = 1; i <= maxLevel; i++) {
+ String parentCuboidPath = i == 1 ? getBaseCuboidPath(cuboidRootPath)
+ : getCuboidOutputPathsByLevel(cuboidRootPath, i - 1);
+ result.addTask(createNDimensionCuboidStep(parentCuboidPath,
+ getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId, cuboidMode));
+ }
+ }
+
+ private MapReduceExecutable createNDimensionCuboidStep(String parentPath, String outputPath, int level,
+ String jobId, CuboidModeEnum cuboidMode) {
+ // ND cuboid job
+ MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
+
+ ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : level " + level);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, parentPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+ "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "" + level);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+
+ ndCuboidStep.setMapReduceParams(cmd.toString());
+ ndCuboidStep.setMapReduceJobClass(getNDCuboidJob());
+ return ndCuboidStep;
+ }
+
+ private MapReduceExecutable createInMemCubingStep(String jobId, CuboidModeEnum cuboidMode, String cuboidRootPath) {
+ MapReduceExecutable cubeStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, JobEngineConfig.IN_MEM_JOB_CONF_SUFFIX);
+
+ cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getBaseCuboidPath(cuboidRootPath));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getInMemCuboidPath(cuboidRootPath));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+ "Kylin_Cube_Builder_" + seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+
+ cubeStep.setMapReduceParams(cmd.toString());
+ cubeStep.setMapReduceJobClass(InMemCuboidFromBaseCuboidJob.class);
+ cubeStep.setCounterSaveAs(
+ CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+ return cubeStep;
+ }
+
+ public UpdateCubeInfoAfterOptimizeStep createUpdateCubeInfoAfterOptimizeStep(String jobId) {
+ final UpdateCubeInfoAfterOptimizeStep result = new UpdateCubeInfoAfterOptimizeStep();
+ result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
+
+ return result;
+ }
+
+ protected Class<? extends AbstractHadoopJob> getNDCuboidJob() {
+ return NDCuboidJob.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java
new file mode 100644
index 0000000..1a779d2
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchOptimizeJobCheckpointBuilder.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterCheckpointStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.CheckpointExecutable;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+
+import com.google.common.base.Preconditions;
+
+public class BatchOptimizeJobCheckpointBuilder {
+
+ protected static SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+
+ final protected CubeInstance cube;
+ final protected String submitter;
+
+ private final IMROutput2.IMRBatchOptimizeOutputSide2 outputSide;
+
+ public BatchOptimizeJobCheckpointBuilder(CubeInstance cube, String submitter) {
+ this.cube = cube;
+ this.submitter = submitter;
+
+ Preconditions.checkNotNull(cube.getFirstSegment(), "Cube " + cube + " is empty!!!");
+ this.outputSide = MRUtil.getBatchOptimizeOutputSide2(cube.getFirstSegment());
+ }
+
+ public CheckpointExecutable build() {
+ KylinConfig kylinConfig = cube.getConfig();
+ List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(),
+ cube.getName());
+ if (projList == null || projList.size() == 0) {
+ throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!");
+ } else if (projList.size() >= 2) {
+ throw new RuntimeException("Find more than one project containing the cube " + cube.getName()
+ + ". It does't meet the uniqueness requirement!!! ");
+ }
+
+ CheckpointExecutable checkpointJob = new CheckpointExecutable();
+ checkpointJob.setSubmitter(submitter);
+ CubingExecutableUtil.setCubeName(cube.getName(), checkpointJob.getParams());
+ checkpointJob.setName(
+ cube.getName() + " - OPTIMIZE CHECKPOINT - " + format.format(new Date(System.currentTimeMillis())));
+ checkpointJob.setDeployEnvName(kylinConfig.getDeployEnv());
+ checkpointJob.setProjectName(projList.get(0).getName());
+
+ // Phase 1: Update cube information
+ checkpointJob.addTask(createUpdateCubeInfoAfterCheckpointStep());
+
+ // Phase 2: Garbage collection
+ outputSide.addStepPhase5_Cleanup(checkpointJob);
+
+ return checkpointJob;
+ }
+
+ private UpdateCubeInfoAfterCheckpointStep createUpdateCubeInfoAfterCheckpointStep() {
+ UpdateCubeInfoAfterCheckpointStep result = new UpdateCubeInfoAfterCheckpointStep();
+ result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
+
+ CubingExecutableUtil.setCubeName(cube.getName(), result.getParams());
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index 4e7bcdd..71b62a0 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -65,7 +65,7 @@ public class CubingJob extends DefaultChainedExecutable {
}
public enum CubingJobTypeEnum {
- BUILD("BUILD"), MERGE("MERGE");
+ BUILD("BUILD"), OPTIMIZE("OPTIMIZE"), MERGE("MERGE");
private final String name;
@@ -106,6 +106,10 @@ public class CubingJob extends DefaultChainedExecutable {
return initCubingJob(seg, CubingJobTypeEnum.BUILD.toString(), submitter, config);
}
+ public static CubingJob createOptimizeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
+ return initCubingJob(seg, CubingJobTypeEnum.OPTIMIZE.toString(), submitter, config);
+ }
+
public static CubingJob createMergeJob(CubeSegment seg, String submitter, JobEngineConfig config) {
return initCubingJob(seg, CubingJobTypeEnum.MERGE.toString(), submitter, config);
}
@@ -135,6 +139,7 @@ public class CubingJob extends DefaultChainedExecutable {
result.setJobType(jobType);
CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams());
CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setSegmentName(seg.getName(), result.getParams());
result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() + " - " + seg.getName() + " - "
+ format.format(new Date(System.currentTimeMillis())));
result.setSubmitter(submitter);
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 69bba0a..e70b497 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
public interface IMROutput2 {
@@ -67,7 +68,7 @@ public interface IMROutput2 {
public void configureJobInput(Job job, String input) throws Exception;
/** Configure the OutputFormat of given job. */
- public void configureJobOutput(Job job, String output, CubeSegment segment, int level) throws Exception;
+ public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler, int level) throws Exception;
}
@@ -113,4 +114,30 @@ public interface IMROutput2 {
public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube);
}
+ public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 3.
+ *
+ * - Phase 1: Filter Recommended Cuboid Data
+ * - Phase 2: Copy Dictionary & Calculate Statistics & Update Reused Cuboid Shard
+ * - Phase 3: Build Cube
+ * - Phase 4: Cleanup Optimize
+ * - Phase 5: Update Metadata & Cleanup
+ */
+ public interface IMRBatchOptimizeOutputSide2 {
+
+ /** Create HTable based on recommended cuboids & statistics*/
+ public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow);
+
+ /** Build only missing cuboids*/
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Cleanup intermediate cuboid data on HDFS*/
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+
+ /** Invoked by Checkpoint job & Cleanup old segments' HTables and related working directory*/
+ public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow);
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 2a51c89..694c936 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -22,14 +22,16 @@ import java.io.IOException;
import java.util.List;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
-import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
+import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -48,6 +50,10 @@ public class JobBuilderSupport {
final public static String LayeredCuboidFolderPrefix = "level_";
+ final public static String PathNameCuboidBase = "base_cuboid";
+ final public static String PathNameCuboidOld = "old";
+ final public static String PathNameCuboidInMem = "in_memory";
+
public JobBuilderSupport(CubeSegment seg, String submitter) {
Preconditions.checkNotNull(seg, "segment cannot be null");
this.config = new JobEngineConfig(seg.getConfig());
@@ -98,6 +104,31 @@ public class JobBuilderSupport {
return result;
}
+ public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath) {
+ return createCalculateStatsFromBaseCuboid(inputPath, outputPath, CuboidModeEnum.CURRENT);
+ }
+
+ public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath,
+ CuboidModeEnum cuboidMode) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID);
+ result.setMapReduceJobClass(CalculateStatsFromBaseCuboidJob.class);
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, outputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_SAMPLING_PERCENT,
+ String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+ "Calculate_Stats_For_Segment_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
+
+ result.setMapReduceParams(cmd.toString());
+ return result;
+ }
+
public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
// base cuboid job
HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
@@ -197,6 +228,18 @@ public class JobBuilderSupport {
return getRealizationRootPath(jobId) + "/dict";
}
+ public String getOptimizationRootPath(String jobId) {
+ return getRealizationRootPath(jobId) + "/optimize";
+ }
+
+ public String getOptimizationStatisticsPath(String jobId) {
+ return getOptimizationRootPath(jobId) + "/statistics";
+ }
+
+ public String getOptimizationCuboidPath(String jobId) {
+ return getOptimizationRootPath(jobId) + "/cuboid/";
+ }
+
// ============================================================================
// static methods also shared by other job flow participant
// ----------------------------------------------------------------------------
@@ -218,10 +261,17 @@ public class JobBuilderSupport {
public static String getCuboidOutputPathsByLevel(String cuboidRootPath, int level) {
if (level == 0) {
- return cuboidRootPath + LayeredCuboidFolderPrefix + "base_cuboid";
+ return cuboidRootPath + LayeredCuboidFolderPrefix + PathNameCuboidBase;
} else {
return cuboidRootPath + LayeredCuboidFolderPrefix + level + "_cuboid";
}
}
+ public static String getBaseCuboidPath(String cuboidRootPath) {
+ return cuboidRootPath + PathNameCuboidBase;
+ }
+
+ public static String getInMemCuboidPath(String cuboidRootPath) {
+ return cuboidRootPath + PathNameCuboidInMem;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
index 681c545..74c9b6d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
@@ -48,6 +48,11 @@ public class MRBatchCubingEngine implements IBatchCubingEngine {
}
@Override
+ public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+ return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+ }
+
+ @Override
public Class<?> getSourceInterface() {
return IMRInput.class;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
index d9fdcb9..665e791 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
@@ -48,6 +48,11 @@ public class MRBatchCubingEngine2 implements IBatchCubingEngine {
}
@Override
+ public DefaultChainedExecutable createBatchOptimizeJob(CubeSegment optimizeSegment, String submitter) {
+ return new BatchOptimizeJobBuilder2(optimizeSegment, submitter).build();
+ }
+
+ @Override
public Class<?> getSourceInterface() {
return IMRInput.class;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index b2a2ea3..124e5e7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -76,6 +76,10 @@ public class MRUtil {
return SourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchMergeInputSide(seg);
}
+ public static IMROutput2.IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide2(CubeSegment seg) {
+ return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
+ }
+
// use this method instead of ToolRunner.run() because ToolRunner.run() is not thread-sale
// Refer to: http://stackoverflow.com/questions/22462665/is-hadoops-toorunner-thread-safe
public static int runMRJob(Tool tool, String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index e6c208b..b63ef16 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -24,12 +24,14 @@ package org.apache.kylin.engine.mr.common;
*/
import static org.apache.hadoop.util.StringUtils.formatTime;
+import static org.apache.kylin.engine.mr.common.JobRelatedMetaUtil.collectCubeMetadata;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -114,6 +116,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_STATISTICS_SAMPLING_PERCENT = OptionBuilder
.withArgName(BatchConstants.ARG_STATS_SAMPLING_PERCENT).hasArg().isRequired(false)
.withDescription("Statistics sampling percentage").create(BatchConstants.ARG_STATS_SAMPLING_PERCENT);
+ protected static final Option OPTION_CUBOID_MODE = OptionBuilder.withArgName(BatchConstants.ARG_CUBOID_MODE)
+ .hasArg().isRequired(false).withDescription("Cuboid Mode").create(BatchConstants.ARG_CUBOID_MODE);
private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
@@ -492,27 +496,41 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
}
protected void attachCubeMetadata(CubeInstance cube, Configuration conf) throws IOException {
- dumpKylinPropsAndMetadata(cube.getProject(), JobRelatedMetaUtil.collectCubeMetadata(cube), cube.getConfig(),
+ dumpKylinPropsAndMetadata(cube.getProject(), collectCubeMetadata(cube), cube.getConfig(),
conf);
}
protected void attachCubeMetadataWithDict(CubeInstance cube, Configuration conf) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
- dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(cube));
+ dumpList.addAll(collectCubeMetadata(cube));
for (CubeSegment segment : cube.getSegments()) {
dumpList.addAll(segment.getDictionaryPaths());
}
dumpKylinPropsAndMetadata(cube.getProject(), dumpList, cube.getConfig(), conf);
}
+ protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, Configuration conf) throws IOException {
+ Set<String> dumpList = new LinkedHashSet<>();
+ CubeInstance cube = segments.get(0).getCubeInstance();
+ dumpList.addAll(collectCubeMetadata(cube));
+ for (CubeSegment segment : segments) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ }
+ dumpKylinPropsAndMetadata(cube.getProject(), dumpList, cube.getConfig(), conf);
+ }
+
protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
+ attachSegmentMetadata(segment, conf, true, false);
+ }
+
+ protected void attachSegmentMetadataWithAll(CubeSegment segment, Configuration conf) throws IOException {
attachSegmentMetadata(segment, conf, true, true);
}
protected void attachSegmentMetadata(CubeSegment segment, Configuration conf, boolean ifDictIncluded,
boolean ifStatsIncluded) throws IOException {
Set<String> dumpList = new LinkedHashSet<>();
- dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
+ dumpList.addAll(collectCubeMetadata(segment.getCubeInstance()));
if (ifDictIncluded) {
dumpList.addAll(segment.getDictionaryPaths());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index aaf2654..1d6a582 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -75,6 +75,7 @@ public interface BatchConstants {
String ARG_INPUT = "input";
String ARG_OUTPUT = "output";
String ARG_PROJECT = "project";
+ String ARG_CUBOID_MODE = "cuboidMode";
String ARG_JOB_NAME = "jobname";
String ARG_CUBING_JOB_ID = "cubingJobId";
String ARG_CUBE_NAME = "cubename";
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index c82d797..3d7d542 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -80,51 +80,52 @@ public class CubeStatsReader {
final CuboidScheduler cuboidScheduler;
public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
+ this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig);
+ }
+
+ /**
+ * @param cuboidScheduler if it's null, part of it's functions will not be supported
+ */
+ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig)
+ throws IOException {
ResourceStore store = ResourceStore.getStore(kylinConfig);
- cuboidScheduler = cubeSegment.getCuboidScheduler();
String statsKey = cubeSegment.getStatisticsResourcePath();
File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream);
- Reader reader = null;
-
- try {
- Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
-
- Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
- Option seqInput = SequenceFile.Reader.file(path);
- reader = new SequenceFile.Reader(hadoopConf, seqInput);
-
- int percentage = 100;
- int mapperNumber = 0;
- double mapperOverlapRatio = 0;
- Map<Long, HLLCounter> counterMap = Maps.newHashMap();
-
- LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
- BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
- while (reader.next(key, value)) {
- if (key.get() == 0L) {
- percentage = Bytes.toInt(value.getBytes());
- } else if (key.get() == -1) {
- mapperOverlapRatio = Bytes.toDouble(value.getBytes());
- } else if (key.get() == -2) {
- mapperNumber = Bytes.toInt(value.getBytes());
- } else if (key.get() > 0) {
- HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
- ByteArray byteArray = new ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
- counterMap.put(key.get(), hll);
- }
- }
-
- this.seg = cubeSegment;
- this.samplingPercentage = percentage;
- this.mapperNumberOfFirstBuild = mapperNumber;
- this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio;
- this.cuboidRowEstimatesHLL = counterMap;
+ Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath()));
+
+ CubeStatsResult cubeStatsResult = new CubeStatsResult();
+ cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision());
+ tmpSeqFile.delete();
+
+ this.seg = cubeSegment;
+ this.cuboidScheduler = cuboidScheduler;
+ this.samplingPercentage = cubeStatsResult.percentage;
+ this.mapperNumberOfFirstBuild = cubeStatsResult.mapperNumber;
+ this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.mapperOverlapRatio;
+ this.cuboidRowEstimatesHLL = cubeStatsResult.counterMap;
+ }
- } finally {
- IOUtils.closeStream(reader);
- tmpSeqFile.delete();
- }
+ /**
+ * Read statistics from
+ * @param path
+ * rather than
+ * @param cubeSegment
+ *
+ * Since the statistics are from
+ * @param path
+ * cuboid scheduler should be provided by default
+ */
+ public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, Path path)
+ throws IOException {
+ CubeStatsResult cubeStatsResult = new CubeStatsResult();
+ cubeStatsResult.initialize(path, kylinConfig.getCubeStatsHLLPrecision());
+
+ this.seg = cubeSegment;
+ this.cuboidScheduler = cuboidScheduler;
+ this.samplingPercentage = cubeStatsResult.percentage;
+ this.mapperNumberOfFirstBuild = cubeStatsResult.mapperNumber;
+ this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.mapperOverlapRatio;
+ this.cuboidRowEstimatesHLL = cubeStatsResult.counterMap;
}
private File writeTmpSeqFile(InputStream inputStream) throws IOException {
@@ -144,6 +145,10 @@ public class CubeStatsReader {
return this.cuboidRowEstimatesHLL;
}
+ public int getSamplingPercentage() {
+ return samplingPercentage;
+ }
+
public Map<Long, Long> getCuboidRowEstimatesHLL() {
return getCuboidRowCountMapFromSampling(cuboidRowEstimatesHLL, samplingPercentage);
}
@@ -253,6 +258,9 @@ public class CubeStatsReader {
//return MB
public double estimateLayerSize(int level) {
+ if (cuboidScheduler == null) {
+ throw new UnsupportedOperationException("cuboid scheduler is null");
+ }
List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
Map<Long, Double> cuboidSizeMap = getCuboidSizeMap();
double ret = 0;
@@ -265,11 +273,17 @@ public class CubeStatsReader {
}
public List<Long> getCuboidsByLayer(int level) {
+ if (cuboidScheduler == null) {
+ throw new UnsupportedOperationException("cuboid scheduler is null");
+ }
List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
return layeredCuboids.get(level);
}
private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, PrintWriter out) {
+ if (cuboidScheduler == null) {
+ throw new UnsupportedOperationException("cuboid scheduler is null");
+ }
long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc());
int dimensionCount = Long.bitCount(baseCuboid);
printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out);
@@ -317,6 +331,36 @@ public class CubeStatsReader {
return new DecimalFormat("#.##").format(input);
}
+ private class CubeStatsResult {
+ private int percentage = 100;
+ private double mapperOverlapRatio = 0;
+ private int mapperNumber = 0;
+ Map<Long, HLLCounter> counterMap = Maps.newHashMap();
+
+ void initialize(Path path, int precision) throws IOException {
+ Configuration hadoopConf = HadoopUtil.getCurrentConfiguration();
+ Option seqInput = SequenceFile.Reader.file(path);
+ try (Reader reader = new SequenceFile.Reader(hadoopConf, seqInput)) {
+ LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf);
+ while (reader.next(key, value)) {
+ if (key.get() == 0L) {
+ percentage = Bytes.toInt(value.getBytes());
+ } else if (key.get() == -1) {
+ mapperOverlapRatio = Bytes.toDouble(value.getBytes());
+ } else if (key.get() == -2) {
+ mapperNumber = Bytes.toInt(value.getBytes());
+ } else if (key.get() > 0) {
+ HLLCounter hll = new HLLCounter(precision);
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+ counterMap.put(key.get(), hll);
+ }
+ }
+ }
+ }
+ }
+
public static void main(String[] args) throws IOException {
System.out.println("CubeStatsReader is used to read cube statistic saved in metadata store");
KylinConfig config = KylinConfig.getInstanceFromEnv();
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
index ba3f023..649eeb6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidRecommenderUtil.java
@@ -39,20 +39,21 @@ public class CuboidRecommenderUtil {
return null;
}
- CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig());
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig());
if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
|| cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
logger.info("Cuboid Statistics is not enabled.");
return null;
}
- long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId();
+ CubeInstance cube = segment.getCubeInstance();
+ long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
|| cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) {
logger.info("Base cuboid count in cuboid statistics is 0.");
return null;
}
- String key = segment.getCubeInstance().getName();
+ String key = cube.getName();
CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(),
cubeStatsReader.getCuboidSizeMap()).build();
return CuboidRecommender.getInstance().getRecommendCuboidList(cuboidStats, segment.getConfig(), false);
@@ -81,20 +82,21 @@ public class CuboidRecommenderUtil {
return null;
}
- CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, segment.getConfig());
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(segment, null, segment.getConfig());
if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
|| cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
logger.info("Cuboid Statistics is not enabled.");
return null;
}
- long baseCuboid = segment.getCuboidScheduler().getBaseCuboidId();
+ CubeInstance cube = segment.getCubeInstance();
+ long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
if (cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == null
|| cubeStatsReader.getCuboidRowEstimatesHLL().get(baseCuboid) == 0L) {
logger.info("Base cuboid count in cuboid statistics is 0.");
return null;
}
- String key = segment.getCubeInstance().getName() + "-" + segment.getName();
+ String key = cube.getName() + "-" + segment.getName();
CuboidStats cuboidStats = new CuboidStats.Builder(key, baseCuboid, cubeStatsReader.getCuboidRowEstimatesHLL(),
cubeStatsReader.getCuboidSizeMap()).setHitFrequencyMap(hitFrequencyMap)
.setRollingUpCountSourceMap(rollingUpCountSourceMap,
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
new file mode 100644
index 0000000..d684c04
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler;
+import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager;
+
+public class CuboidSchedulerUtil {
+
+ public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, String cuboidModeName) {
+ return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidModeName));
+ }
+
+ public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, CuboidModeEnum cuboidMode) {
+ return getCuboidSchedulerByMode(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode));
+ }
+
+ public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, Set<Long> cuboidSet) {
+ CuboidScheduler cuboidScheduler;
+ try {
+ cuboidScheduler = TreeCuboidSchedulerManager.getInstance().getTreeCuboidScheduler(segment.getCubeDesc(), //
+ CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment));
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to cube stats for segment" + segment + " due to " + e);
+ }
+
+ if (cuboidScheduler == null) {
+ cuboidScheduler = new DefaultCuboidScheduler(segment.getCubeDesc());
+ }
+ return cuboidScheduler;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/99fbd755/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
index 68380f3..56ab504 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsReaderUtil.java
@@ -74,7 +74,7 @@ public class CuboidStatsReaderUtil {
Map<Long, HLLCounter> cuboidHLLMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size());
Map<Long, Double> sizeMapMerged = Maps.newHashMapWithExpectedSize(cuboidSet.size());
for (CubeSegment pSegment : segmentList) {
- CubeStatsReader pReader = new CubeStatsReader(pSegment, pSegment.getConfig());
+ CubeStatsReader pReader = new CubeStatsReader(pSegment, null, pSegment.getConfig());
Map<Long, HLLCounter> pHLLMap = pReader.getCuboidRowHLLCounters();
if (pHLLMap == null || pHLLMap.isEmpty()) {
logger.info("Cuboid Statistics for segment " + pSegment.getName() + " is not enabled.");
@@ -113,7 +113,7 @@ public class CuboidStatsReaderUtil {
return null;
}
- CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cubeSegment.getConfig());
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, null, cubeSegment.getConfig());
if (cubeStatsReader.getCuboidRowEstimatesHLL() == null
|| cubeStatsReader.getCuboidRowEstimatesHLL().isEmpty()) {
logger.info("Cuboid Statistics is not enabled.");
@@ -132,4 +132,5 @@ public class CuboidStatsReaderUtil {
}
return cuboidsWithStats;
}
+
}