You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/29 10:35:09 UTC
kylin git commit: minor improvement on logging for shard/reducer
sizing
Repository: kylin
Updated Branches:
refs/heads/yang22 3e725f50d -> f75d467c6
minor improvement on logging for shard/reducer sizing
temp
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f75d467c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f75d467c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f75d467c
Branch: refs/heads/yang22
Commit: f75d467c6055bd27a1e3c77853c96bfa9fbec18d
Parents: 3e725f5
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Dec 29 14:13:36 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Dec 29 18:30:15 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/engine/mr/steps/CuboidJob.java | 2 +-
.../kylin/engine/mr/steps/CuboidReducer.java | 2 +-
.../engine/mr/steps/InMemCuboidReducer.java | 2 +-
.../engine/mr/steps/LayerReducerNumSizing.java | 84 ++++++++++++++++++++
.../engine/mr/steps/LayerReduerNumSizing.java | 82 -------------------
.../kylin/engine/mr/steps/MergeCuboidJob.java | 2 +-
6 files changed, 88 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index d3cb494..9528a75 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -131,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob {
// add metadata to distributed cache
attachKylinPropsAndMetadata(cube, job.getConfiguration());
- LayerReduerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel);
+ LayerReducerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel);
this.deletePath(job.getConfiguration(), output);
http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index b1d4aaa..afd29e3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -91,7 +91,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
for (Text value : values) {
if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handling value with ordinal: " + vcounter);
+ logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
}
codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
if (cuboidLevel > 0) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index 04c9e90..244889f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -80,7 +80,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
for (ByteArrayWritable value : values) {
if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
- logger.info("Handling value with ordinal: " + vcounter);
+ logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
}
codec.decode(value.asBuffer(), input);
aggs.aggregate(input);
http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
new file mode 100644
index 0000000..713a95c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LayerReducerNumSizing {
+
+ private static final Logger logger = LoggerFactory.getLogger(LayerReducerNumSizing.class);
+
+ public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+ KylinConfig kylinConfig = cubeDesc.getConfig();
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+ logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level);
+
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig);
+
+ double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+ if (level == -1) {
+ //merge case
+ double estimatedSize = cubeStatsReader.estimateCubeSize();
+ adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+ logger.info("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst);
+ } else if (level == 0) {
+ //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+ adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+ logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+ } else {
+ parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+ currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+ adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+ logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+ }
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+
+ // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+ if (cubeDesc.hasMemoryHungryMeasures()) {
+ logger.info("Multiply reducer num by 4 to boost performance for memory hungry measures");
+ numReduceTasks = numReduceTasks * 4;
+ }
+
+ // at least 1 reducer by default
+ numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+ // no more than 500 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ job.setNumReduceTasks(numReduceTasks);
+
+ logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
deleted file mode 100644
index 6bddcbd..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LayerReduerNumSizing {
-
- private static final Logger logger = LoggerFactory.getLogger(LayerReduerNumSizing.class);
-
- public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
- CubeDesc cubeDesc = cubeSegment.getCubeDesc();
- KylinConfig kylinConfig = cubeDesc.getConfig();
-
- double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
- double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
- logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level);
-
- CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig);
-
- double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
-
- if (level == -1) {
- //merge case
- adjustedCurrentLayerSizeEst = cubeStatsReader.estimateCubeSize();
- logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
- } else if (level == 0) {
- //base cuboid case
- adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
- logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
- } else {
- parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
- currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
- adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
- logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
- }
-
- // number of reduce tasks
- int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio);
-
- // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
- if (cubeDesc.hasMemoryHungryMeasures()) {
- numReduceTasks = numReduceTasks * 4;
- }
-
- // at least 1 reducer by default
- numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
- // no more than 500 reducer by default
- numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
- job.setNumReduceTasks(numReduceTasks);
-
- logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index e805d25..8af3a4d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -81,7 +81,7 @@ public class MergeCuboidJob extends CuboidJob {
// add metadata to distributed cache
attachKylinPropsAndMetadata(cube, job.getConfiguration());
- LayerReduerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1);
+ LayerReducerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1);
this.deletePath(job.getConfiguration(), output);