You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:50 UTC

[13/50] incubator-kylin git commit: Collect cuboid statistics during fetching distinct columns;

Collect cuboid statistics during fetching distinct columns;

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d4a271df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d4a271df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d4a271df

Branch: refs/heads/streaming-localdict
Commit: d4a271df9d9b055e44d1a6fc1e3cc3055e14c2bd
Parents: 7360f5b
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Thu Mar 26 23:15:57 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Thu Mar 26 23:15:57 2015 +0800

----------------------------------------------------------------------
 .../cube/FactDistinctColumnsMapperBase.java     |  5 +-
 .../hadoop/cube/FactDistinctColumnsReducer.java | 53 ++++++++++++++------
 .../cube/FactDistinctHiveColumnsMapper.java     | 51 +++++++++++++------
 3 files changed, 77 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d4a271df/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
index c0455ff..9945769 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapperBase.java
@@ -5,6 +5,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hive.hcatalog.data.HCatRecord;
@@ -23,7 +24,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 /**
  * Created by Hongbin Ma(Binmahone) on 3/26/15.
  */
-public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, ShortWritable, Text> {
+public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, LongWritable, Text> {
 
     protected String cubeName;
     protected CubeInstance cube;
@@ -32,7 +33,7 @@ public class FactDistinctColumnsMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
     protected List<TblColRef> columns;
     protected ArrayList<Integer> factDictCols;
 
-    protected ShortWritable outputKey = new ShortWritable();
+    protected LongWritable outputKey = new LongWritable();
     protected Text outputValue = new Text();
     protected int errorRecordCounter =0;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d4a271df/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 2052d08..e1529d3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -19,13 +19,14 @@
 package org.apache.kylin.job.hadoop.cube;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.kylin.common.KylinConfig;
@@ -44,17 +45,20 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 
 /**
  * @author yangli9
  */
-public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
+public class FactDistinctColumnsReducer extends KylinReducer<LongWritable, Text, NullWritable, Text> {
 
     private List<TblColRef> columnList = new ArrayList<TblColRef>();
     private boolean collectStatistics = false;
     private String statisticsOutput = null;
     private List<Long> rowKeyCountInMappers;
-    private HyperLogLogPlusCounter totalHll;
+    private Map<Long, Long> rowKeyCountInCuboids;
+    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
+    protected long baseCuboidId;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -66,23 +70,24 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
         CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
         CubeDesc cubeDesc = cube.getDescriptor();
 
-        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
         columnList = baseCuboid.getColumns();
         collectStatistics = Boolean.parseBoolean(conf.get(BatchConstants.CFG_STATISTICS_ENABLED));
         statisticsOutput = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
 
         if (collectStatistics) {
-            totalHll = new HyperLogLogPlusCounter(16);
             rowKeyCountInMappers = Lists.newArrayList();
+            rowKeyCountInCuboids = Maps.newHashMap();
+            cuboidHLLMap = Maps.newHashMap();
         }
     }
 
     @Override
-    public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+    public void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
 
         if (key.get() >= 0) {
-            TblColRef col = columnList.get(key.get());
+            TblColRef col = columnList.get((int) key.get());
 
             HashSet<ByteArray> set = new HashSet<ByteArray>();
             for (Text textValue : values) {
@@ -105,26 +110,38 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
             }
         } else {
             // for hll
+            long cuboidId = 0 - key.get();
+
             for (Text value : values) {
                 HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(16);
                 ByteArray byteArray = new ByteArray(value.getBytes());
                 hll.readRegisters(byteArray.asBuffer());
 
-                rowKeyCountInMappers.add(hll.getCountEstimate());
-                // merge the hll with total hll
-                totalHll.merge(hll);
+                if (cuboidId > baseCuboidId) {
+                    // if this is the summary info from a mapper, record the number before merge
+                    rowKeyCountInMappers.add(hll.getCountEstimate());
+                }
+
+                if (cuboidHLLMap.get(cuboidId) != null) {
+                    hll.merge(cuboidHLLMap.get(cuboidId));
+                }
+                cuboidHLLMap.put(cuboidId, hll);
             }
         }
 
     }
 
     protected void cleanup(Reducer.Context context) throws IOException, InterruptedException {
+
+        for (Long cuboidId : cuboidHLLMap.keySet()) {
+            rowKeyCountInCuboids.put(cuboidId, cuboidHLLMap.get(cuboidId).getCountEstimate());
+        }
+
         //output the hll info;
         if (collectStatistics) {
             Configuration conf = context.getConfiguration();
             FileSystem fs = FileSystem.get(conf);
-            String outputPath = conf.get(BatchConstants.CFG_STATISTICS_OUTPUT);
-            FSDataOutputStream out = fs.create(new Path(outputPath, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION));
+            FSDataOutputStream out = fs.create(new Path(statisticsOutput, BatchConstants.CFG_STATISTICS_CUBE_ESTIMATION));
 
             try {
                 long totalSum = 0;
@@ -141,13 +158,21 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
                 out.write('\n');
 
 
-                msg = "The merged cube segment has " + totalHll.getCountEstimate() + " rows.";
+                long grantTotal = rowKeyCountInCuboids.get(baseCuboidId + 1);
+                msg = "The merged cube has " + grantTotal + " rows.";
                 out.write(msg.getBytes());
                 out.write('\n');
 
-                msg = "The compaction rate is " + (totalHll.getCountEstimate()) + "/" + totalSum + " = " + (totalHll.getCountEstimate() * 100.0) / totalSum + "%.";
+                msg = "The compaction rate is " + (grantTotal) + "/" + totalSum + " = " + (grantTotal * 100.0) / totalSum + "%.";
                 out.write(msg.getBytes());
                 out.write('\n');
+                out.write('\n');
+                
+                for (long i = 0; i < baseCuboidId; i++) {
+                    msg = "Cuboid " + i + " has " + rowKeyCountInCuboids.get(i) + " rows.";
+                    out.write(msg.getBytes());
+                    out.write('\n');
+                }
 
             } finally {
                 out.close();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d4a271df/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index 64ae353..9e9c096 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -18,11 +18,8 @@
 
 package org.apache.kylin.job.hadoop.cube;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hive.hcatalog.data.HCatRecord;
@@ -30,12 +27,17 @@ import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hive.hcatalog.data.schema.HCatSchema;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.lookup.HiveTableReader;
 import org.apache.kylin.job.constant.BatchConstants;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @author yangli9
@@ -48,7 +50,8 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     protected boolean collectStatistics = false;
     protected CuboidScheduler cuboidScheduler = null;
     protected List<String> rowKeyValues = null;
-    protected HyperLogLogPlusCounter hll;
+    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = null;
+    protected HyperLogLogPlusCounter totalHll = null;
     protected int nRowKey;
 
     @Override
@@ -58,11 +61,11 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
         schema = HCatInputFormat.getTableSchema(context.getConfiguration());
         intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
 
-
         collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
         if (collectStatistics) {
             cuboidScheduler = new CuboidScheduler(cubeDesc);
-            hll = new HyperLogLogPlusCounter(16);
+            cuboidHLLMap = Maps.newHashMap();
+            totalHll = new HyperLogLogPlusCounter(16);
             rowKeyValues = Lists.newArrayList();
             nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
         }
@@ -74,7 +77,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
             int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
             HCatFieldSchema fieldSchema;
             for (int i : factDictCols) {
-                outputKey.set((short) i);
+                outputKey.set((long) i);
                 fieldSchema = schema.get(flatTableIndexes[i]);
                 Object fieldValue = record.get(fieldSchema.getName(), schema);
                 if (fieldValue == null)
@@ -103,8 +106,13 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
             mask = mask >> 1;
         }
 
-        String key = StringUtils.join(rowKeyValues, ",");
-        hll.add(key);
+        HyperLogLogPlusCounter hll = cuboidHLLMap.get(cuboidId);
+        if (hll == null) {
+            hll = new HyperLogLogPlusCounter(16);
+            cuboidHLLMap.put(cuboidId, hll);
+        }
+
+        hll.add(StringUtils.join(rowKeyValues, ","));
 
         Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
         for (Long childId : children) {
@@ -116,11 +124,22 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     @Override
     protected void cleanup(Context context) throws IOException, InterruptedException {
         if (collectStatistics) {
-            // output hll to reducer, key is -1
-            // keyBuf = Bytes.toBytes(-1);
-            outputKey.set((short) -1);
+
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            for (Long cuboidId : cuboidHLLMap.keySet()) {
+                HyperLogLogPlusCounter hll = cuboidHLLMap.get(cuboidId);
+                totalHll.merge(hll); // merge each cuboid's counter to the total hll
+                outputKey.set(0 - cuboidId);
+                ByteBuffer hllBuf = ByteBuffer.allocate(64 * 1024);
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array());
+                context.write(outputKey, outputValue);
+            }
+
+            //output the total hll for this mapper;
+            outputKey.set(0 - baseCuboidId - 1);
             ByteBuffer hllBuf = ByteBuffer.allocate(64 * 1024);
-            hll.writeRegisters(hllBuf);
+            totalHll.writeRegisters(hllBuf);
             outputValue.set(hllBuf.array());
             context.write(outputKey, outputValue);
         }