You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/12/05 13:03:11 UTC
kylin git commit: KYLIN-2246 redesign the way to decide layer cubing
reducer count
Repository: kylin
Updated Branches:
refs/heads/KYLIN-2246 [created] 815887e73
KYLIN-2246 redesign the way to decide layer cubing reducer count
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/815887e7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/815887e7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/815887e7
Branch: refs/heads/KYLIN-2246
Commit: 815887e73a5c3b0852b6cf5650400235797d7ce9
Parents: 59a30f6
Author: Hongbin Ma <ma...@apache.org>
Authored: Mon Dec 5 21:02:36 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Mon Dec 5 21:02:44 2016 +0800
----------------------------------------------------------------------
.../kylin/cube/cuboid/CuboidScheduler.java | 31 +++++++-
.../kylin/engine/mr/common/CubeStatsReader.java | 26 ++++++-
.../apache/kylin/engine/mr/steps/CuboidJob.java | 52 +------------
.../engine/mr/steps/LayerReduerNumSizing.java | 82 ++++++++++++++++++++
.../kylin/engine/mr/steps/MergeCuboidJob.java | 2 +-
5 files changed, 138 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index bd6a37a..733aded 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -18,7 +18,7 @@
package org.apache.kylin.cube.cuboid;
-/**
+/**
*/
import java.util.Collections;
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.cube.model.AggregationGroup;
import org.apache.kylin.cube.model.CubeDesc;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -39,6 +40,7 @@ public class CuboidScheduler {
private final CubeDesc cubeDesc;
private final long max;
private final Map<Long, List<Long>> cache;
+ private List<List<Long>> cuboidsByLayer;
public CuboidScheduler(CubeDesc cubeDesc) {
this.cubeDesc = cubeDesc;
@@ -232,4 +234,31 @@ public class CuboidScheduler {
getSubCuboidIds(cuboidId, result);
}
}
+
+ public List<List<Long>> getCuboidsByLayer() {
+ if (cuboidsByLayer != null) {
+ return cuboidsByLayer;
+ }
+
+ int totalNum = 0;
+ int layerNum = cubeDesc.getBuildLevel();
+ cuboidsByLayer = Lists.newArrayList();
+
+ cuboidsByLayer.add(Collections.singletonList(Cuboid.getBaseCuboidId(cubeDesc)));
+ totalNum++;
+
+ for (int i = 1; i <= layerNum; i++) {
+ List<Long> lastLayer = cuboidsByLayer.get(i - 1);
+ List<Long> newLayer = Lists.newArrayList();
+ for (Long parent : lastLayer) {
+ newLayer.addAll(getSpanningCuboid(parent));
+ }
+ cuboidsByLayer.add(newLayer);
+ totalNum += newLayer.size();
+ }
+
+ int size = getAllCuboidIds().size();
+ Preconditions.checkState(totalNum == size, "total Num: " + totalNum + " actual size: " + size);
+ return cuboidsByLayer;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index c917cfb..1cf5da6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
@@ -75,9 +76,11 @@ public class CubeStatsReader {
final int mapperNumberOfFirstBuild; // becomes meaningless after merge
final double mapperOverlapRatioOfFirstBuild; // becomes meaningless after merge
final Map<Long, HyperLogLogPlusCounter> cuboidRowEstimatesHLL;
+ final CuboidScheduler cuboidScheduler;
public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException {
ResourceStore store = ResourceStore.getStore(kylinConfig);
+ cuboidScheduler = new CuboidScheduler(cubeSegment.getCubeDesc());
String statsKey = cubeSegment.getStatisticsResourcePath();
File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream);
Reader reader = null;
@@ -145,6 +148,10 @@ public class CubeStatsReader {
return getCuboidSizeMapFromRowCount(seg, getCuboidRowEstimatesHLL());
}
+ public double estimateCubeSize() {
+ return SumHelper.sumDouble(getCuboidSizeMap().values());
+ }
+
public int getMapperNumberOfFirstBuild() {
return mapperNumberOfFirstBuild;
}
@@ -248,12 +255,23 @@ public class CubeStatsReader {
out.println("----------------------------------------------------------------------------");
}
+ //return MB
+ public double estimateLayerSize(int level) {
+ List<List<Long>> layeredCuboids = cuboidScheduler.getCuboidsByLayer();
+ Map<Long, Double> cuboidSizeMap = getCuboidSizeMap();
+ double ret = 0;
+ for (Long cuboidId : layeredCuboids.get(level)) {
+ ret += cuboidSizeMap.get(cuboidId);
+ }
+
+ logger.info("Estimating size for layer {}, all cuboids are {}, total size is {}", level, StringUtils.join(layeredCuboids.get(level), ","), ret);
+ return ret;
+ }
+
private void printCuboidInfoTreeEntry(Map<Long, Long> cuboidRows, Map<Long, Double> cuboidSizes, PrintWriter out) {
- CubeDesc cubeDesc = seg.getCubeDesc();
- CuboidScheduler scheduler = new CuboidScheduler(cubeDesc);
- long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
+ long baseCuboid = Cuboid.getBaseCuboidId(seg.getCubeDesc());
int dimensionCount = Long.bitCount(baseCuboid);
- printCuboidInfoTree(-1L, baseCuboid, scheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out);
+ printCuboidInfoTree(-1L, baseCuboid, cuboidScheduler, cuboidRows, cuboidSizes, dimensionCount, 0, out);
}
private void printKVInfo(PrintWriter writer) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index ddd21b7..d3cb494 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
@@ -35,14 +34,11 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.CuboidCLI;
-import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.execution.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,6 +96,7 @@ public class CuboidJob extends AbstractHadoopJob {
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment segment = cube.getSegmentById(segmentID);
if (checkSkip(cubingJobId)) {
logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]");
@@ -113,7 +110,7 @@ public class CuboidJob extends AbstractHadoopJob {
setJobClasspath(job, cube.getConfig());
// Mapper
- configureMapperInputFormat(cube.getSegmentById(segmentID));
+ configureMapperInputFormat(segment);
job.setMapperClass(this.mapperClass);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
@@ -134,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob {
// add metadata to distributed cache
attachKylinPropsAndMetadata(cube, job.getConfiguration());
- setReduceTaskNum(job, cube.getDescriptor(), nCuboidLevel);
+ LayerReduerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel);
this.deletePath(job.getConfiguration(), output);
@@ -163,49 +160,6 @@ public class CuboidJob extends AbstractHadoopJob {
}
}
- protected void setReduceTaskNum(Job job, CubeDesc cubeDesc, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
- KylinConfig kylinConfig = cubeDesc.getConfig();
-
- double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
- double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-
- // total map input MB
- double totalMapInputMB = this.getTotalMapInputMB();
-
- // output / input ratio
- int preLevelCuboids, thisLevelCuboids;
- if (level == 0) { // base cuboid
- preLevelCuboids = thisLevelCuboids = 1;
- } else { // n-cuboid
- int[] allLevelCount = CuboidCLI.calculateAllLevelCount(cubeDesc);
- preLevelCuboids = allLevelCount[level - 1];
- thisLevelCuboids = allLevelCount[level];
- }
-
- // total reduce input MB
- double totalReduceInputMB = totalMapInputMB * thisLevelCuboids / preLevelCuboids;
-
- // number of reduce tasks
- int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
-
- // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
- if (cubeDesc.hasMemoryHungryMeasures()) {
- numReduceTasks = numReduceTasks * 4;
- }
-
- // at least 1 reducer by default
- numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
- // no more than 500 reducer by default
- numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
- job.setNumReduceTasks(numReduceTasks);
-
- logger.info("Having total map input MB " + Math.round(totalMapInputMB));
- logger.info("Having level " + level + ", pre-level cuboids " + preLevelCuboids + ", this level cuboids " + thisLevelCuboids);
- logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio);
- logger.info("Setting " + Context.NUM_REDUCES + "=" + numReduceTasks);
- }
-
/**
* @param mapperClass
* the mapperClass to set
http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
new file mode 100644
index 0000000..6bddcbd
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LayerReduerNumSizing {
+
+ private static final Logger logger = LoggerFactory.getLogger(LayerReduerNumSizing.class);
+
+ public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+ KylinConfig kylinConfig = cubeDesc.getConfig();
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+ logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level);
+
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig);
+
+ double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+ if (level == -1) {
+ //merge case
+ adjustedCurrentLayerSizeEst = cubeStatsReader.estimateCubeSize();
+ logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+ } else if (level == 0) {
+ //base cuboid case
+ adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+ logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+ } else {
+ parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+ currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+ adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+ logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+ }
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio);
+
+ // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+ if (cubeDesc.hasMemoryHungryMeasures()) {
+ numReduceTasks = numReduceTasks * 4;
+ }
+
+ // at least 1 reducer by default
+ numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+ // no more than 500 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ job.setNumReduceTasks(numReduceTasks);
+
+ logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/815887e7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index 810da23..e805d25 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -81,7 +81,7 @@ public class MergeCuboidJob extends CuboidJob {
// add metadata to distributed cache
attachKylinPropsAndMetadata(cube, job.getConfiguration());
- setReduceTaskNum(job, cube.getDescriptor(), 0);
+ LayerReduerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1);
this.deletePath(job.getConfiguration(), output);