You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/29 10:35:09 UTC

kylin git commit: minor improvement on logging for shard/reducer sizing

Repository: kylin
Updated Branches:
  refs/heads/yang22 3e725f50d -> f75d467c6


minor improvement on logging for shard/reducer sizing

temp


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f75d467c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f75d467c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f75d467c

Branch: refs/heads/yang22
Commit: f75d467c6055bd27a1e3c77853c96bfa9fbec18d
Parents: 3e725f5
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Dec 29 14:13:36 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Dec 29 18:30:15 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/engine/mr/steps/CuboidJob.java |  2 +-
 .../kylin/engine/mr/steps/CuboidReducer.java    |  2 +-
 .../engine/mr/steps/InMemCuboidReducer.java     |  2 +-
 .../engine/mr/steps/LayerReducerNumSizing.java  | 84 ++++++++++++++++++++
 .../engine/mr/steps/LayerReduerNumSizing.java   | 82 -------------------
 .../kylin/engine/mr/steps/MergeCuboidJob.java   |  2 +-
 6 files changed, 88 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index d3cb494..9528a75 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -131,7 +131,7 @@ public class CuboidJob extends AbstractHadoopJob {
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
-            LayerReduerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel);
+            LayerReducerNumSizing.setReduceTaskNum(job, segment, getTotalMapInputMB(), nCuboidLevel);
 
             this.deletePath(job.getConfiguration(), output);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index b1d4aaa..afd29e3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -91,7 +91,7 @@ public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
 
         for (Text value : values) {
             if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-                logger.info("Handling value with ordinal: " + vcounter);
+                logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
             }
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), input);
             if (cuboidLevel > 0) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index 04c9e90..244889f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -80,7 +80,7 @@ public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArra
 
         for (ByteArrayWritable value : values) {
             if (vcounter++ % BatchConstants.NORMAL_RECORD_LOG_THRESHOLD == 0) {
-                logger.info("Handling value with ordinal: " + vcounter);
+                logger.info("Handling value with ordinal (This is not KV number!): " + vcounter);
             }
             codec.decode(value.asBuffer(), input);
             aggs.aggregate(input);

http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
new file mode 100644
index 0000000..713a95c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReducerNumSizing.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LayerReducerNumSizing {
+
+    private static final Logger logger = LoggerFactory.getLogger(LayerReducerNumSizing.class);
+
+    public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
+        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+        KylinConfig kylinConfig = cubeDesc.getConfig();
+
+        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level);
+
+        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig);
+
+        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+        if (level == -1) {
+            //merge case
+            double estimatedSize = cubeStatsReader.estimateCubeSize();
+            adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+            logger.info("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst);
+        } else if (level == 0) {
+            //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+            logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+        } else {
+            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+            logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+        }
+
+        // number of reduce tasks
+        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+
+        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+        if (cubeDesc.hasMemoryHungryMeasures()) {
+            logger.info("Multiply reducer num by 4 to boost performance for memory hungry measures");
+            numReduceTasks = numReduceTasks * 4;
+        }
+
+        // at least 1 reducer by default
+        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+        // no more than 500 reducer by default
+        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+        job.setNumReduceTasks(numReduceTasks);
+
+        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
deleted file mode 100644
index 6bddcbd..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/LayerReduerNumSizing.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *  
- *     http://www.apache.org/licenses/LICENSE-2.0
- *  
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class LayerReduerNumSizing {
-
-    private static final Logger logger = LoggerFactory.getLogger(LayerReduerNumSizing.class);
-
-    public static void setReduceTaskNum(Job job, CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
-        CubeDesc cubeDesc = cubeSegment.getCubeDesc();
-        KylinConfig kylinConfig = cubeDesc.getConfig();
-
-        double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-        double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
-        logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level);
-
-        CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig);
-
-        double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
-
-        if (level == -1) {
-            //merge case
-            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateCubeSize();
-            logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
-        } else if (level == 0) {
-            //base cuboid case
-            adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
-            logger.info("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
-        } else {
-            parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
-            currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
-            adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
-            logger.info("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
-        }
-
-        // number of reduce tasks
-        int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio);
-
-        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
-        if (cubeDesc.hasMemoryHungryMeasures()) {
-            numReduceTasks = numReduceTasks * 4;
-        }
-
-        // at least 1 reducer by default
-        numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
-        // no more than 500 reducer by default
-        numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
-        job.setNumReduceTasks(numReduceTasks);
-
-        logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/f75d467c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
index e805d25..8af3a4d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidJob.java
@@ -81,7 +81,7 @@ public class MergeCuboidJob extends CuboidJob {
             // add metadata to distributed cache
             attachKylinPropsAndMetadata(cube, job.getConfiguration());
 
-            LayerReduerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1);
+            LayerReducerNumSizing.setReduceTaskNum(job, cube.getSegmentById(segmentID), getTotalMapInputMB(), -1);
 
             this.deletePath(job.getConfiguration(), output);