You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/10/22 09:08:30 UTC

[1/2] incubator-kylin git commit: optimizing spark sampling process

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 801a7921f -> a8ead00fa


optimizing spark sampling process


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a8ead00f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a8ead00f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a8ead00f

Branch: refs/heads/2.x-staging
Commit: a8ead00fa55b95bce3a69b1e1b88a0d1c1327d1a
Parents: a2425c0
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Oct 15 16:13:05 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Oct 21 14:47:20 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/engine/spark/SparkCubing.java  | 76 ++++++++++----------
 .../storage/hbase/steps/CreateHTableJob.java    | 13 ++--
 2 files changed, 45 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a8ead00f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index dd1fd99..4df77e2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 import com.google.common.primitives.UnsignedBytes;
@@ -202,34 +203,53 @@ public class SparkCubing extends AbstractSparkApplication {
     }
 
     private Map<Long, HyperLogLogPlusCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName) throws Exception {
-        final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        HashMap<Long, HyperLogLogPlusCounter> zeroValue = Maps.newHashMap();
-        for (Long id : new CuboidScheduler(cubeInstance.getDescriptor()).getAllCuboidIds()) {
+        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+        List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
+        final HashMap<Long, HyperLogLogPlusCounter> zeroValue = Maps.newHashMap();
+        for (Long id : allCuboidIds) {
             zeroValue.put(id, new HyperLogLogPlusCounter(14));
         }
 
-        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        CubeDesc cubeDesc = CubeManager.getInstance(kylinConfig).getCube(cubeName).getDescriptor();
         CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-        CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+        
         final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
         final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
         final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        final List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
-
+        
+        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+        for (Long cuboidId : allCuboidIds) {
+            BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+            Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
+
+            long mask = Long.highestOneBit(baseCuboidId);
+            int position = 0;
+            for (int i = 0; i < nRowKey; i++) {
+                if ((mask & cuboidId) > 0) {
+                    cuboidBitSet[position] = i;
+                    position++;
+                }
+                mask = mask >> 1;
+            }
+            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+        }
+        final ByteArray[] row_hashcodes = new ByteArray[nRowKey];
+        for (int i = 0; i < nRowKey; ++i) {
+            row_hashcodes[i] = new ByteArray();
+        }
+        
         final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue,
                 new Function2<HashMap<Long, HyperLogLogPlusCounter>,
                         List<String>,
                         HashMap<Long, HyperLogLogPlusCounter>>() {
+                    
+                    final HashFunction hashFunction = Hashing.murmur3_32();
 
                     @Override
                     public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, List<String> v2) throws Exception {
-                        ByteArray[] row_hashcodes = new ByteArray[nRowKey];
-                        for (int i = 0; i < nRowKey; ++i) {
-                            row_hashcodes[i] = new ByteArray();
-                        }
                         for (int i = 0; i < nRowKey; i++) {
-                            Hasher hc = Hashing.murmur3_32().newHasher();
+                            Hasher hc = hashFunction.newHasher();
                             String colValue = v2.get(rowKeyColumnIndexes[i]);
                             if (colValue != null) {
                                 row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
@@ -238,27 +258,10 @@ public class SparkCubing extends AbstractSparkApplication {
                             }
                         }
 
-                        final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
-                        for (Long cuboidId : allCuboidIds) {
-                            BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
-                            Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
-
-                            long mask = Long.highestOneBit(baseCuboidId);
-                            int position = 0;
-                            for (int i = 0; i < nRowKey; i++) {
-                                if ((mask & cuboidId) > 0) {
-                                    cuboidBitSet[position] = i;
-                                    position++;
-                                }
-                                mask = mask >> 1;
-                            }
-                            allCuboidsBitSet.put(cuboidId, cuboidBitSet);
-                        }
-
-                        for (Long cuboidId : allCuboidIds) {
-                            Hasher hc = Hashing.murmur3_32().newHasher();
-                            HyperLogLogPlusCounter counter = v1.get(cuboidId);
-                            final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+                        for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
+                            Hasher hc = hashFunction.newHasher();
+                            HyperLogLogPlusCounter counter = v1.get(entry.getKey());
+                            final Integer[] cuboidBitSet = entry.getValue();
                             for (int position = 0; position < cuboidBitSet.length; position++) {
                                 hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
                             }
@@ -277,9 +280,7 @@ public class SparkCubing extends AbstractSparkApplication {
                         for (Map.Entry<Long, HyperLogLogPlusCounter> entry : v1.entrySet()) {
                             final HyperLogLogPlusCounter counter1 = entry.getValue();
                             final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey());
-                            if (counter2 != null) {
-                                counter1.merge(counter2);
-                            }
+                            counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
                         }
                         return v1;
                     }
@@ -446,6 +447,7 @@ public class SparkCubing extends AbstractSparkApplication {
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
         final Map<Long, Long> cubeSizeMap = CreateHTableJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100);
+        System.out.println("cube size estimation:" + cubeSizeMap);
         final byte[][] splitKeys = CreateHTableJob.getSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment);
         CubeHTableUtil.createHTable(cubeDesc, cubeSegment.getStorageLocationIdentifier(), splitKeys);
         System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a8ead00f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 1e61d04..eee201b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -173,24 +173,23 @@ public class CreateHTableJob extends AbstractHadoopJob {
             IOUtils.closeStream(is);
             IOUtils.closeStream(tempFileStream);
         }
-
-        Map<Long, Long> cuboidSizeMap = Maps.newHashMap();
+        Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
+        
         FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+        int samplingPercentage = 25;
         SequenceFile.Reader reader = null;
         try {
             reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
             LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
             BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-            int samplingPercentage = 25;
             while (reader.next(key, value)) {
-                if (key.get() == 0l) {
+                if (key.get() == 0L) {
                     samplingPercentage = Bytes.toInt(value.getBytes());
                 } else {
                     HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
                     ByteArray byteArray = new ByteArray(value.getBytes());
                     hll.readRegisters(byteArray.asBuffer());
-
-                    cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / samplingPercentage);
+                    counterMap.put(key.get(), hll);
                 }
 
             }
@@ -201,7 +200,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
             IOUtils.closeStream(reader);
             tempFile.delete();
         }
-        return cuboidSizeMap;
+        return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage);
     }
 
     public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException {


[2/2] incubator-kylin git commit: minor set MR sampling default percentage to 100

Posted by qh...@apache.org.
minor set MR sampling default percentage to 100


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a2425c0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a2425c0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a2425c0b

Branch: refs/heads/2.x-staging
Commit: a2425c0b4be8eb581e2916a493b8f897437a0372
Parents: 801a792
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Oct 15 16:07:09 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Oct 21 14:47:20 2015 +0800

----------------------------------------------------------------------
 build/conf/kylin.properties                                      | 4 ++--
 .../src/main/java/org/apache/kylin/common/KylinConfig.java       | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a2425c0b/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 5b56f31..fe953fc 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -51,8 +51,8 @@ kylin.job.cubing.inMem=true
 #default compression codec for htable,snappy,lzo,gzip,lz4
 kylin.hbase.default.compression.codec=snappy
 
-#the percentage of the sampling, default 25%
-kylin.job.cubing.inMem.sampling.percent=25
+#the percentage of the sampling, default 100%
+kylin.job.cubing.inMem.sampling.percent=100
 
 # The cut size for hbase region, in GB.
 # E.g, for cube whose capacity be marked as "SMALL", split region per 10GB by default

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a2425c0b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 5f8fb07..709e5aa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -505,7 +505,7 @@ public class KylinConfig implements Serializable {
     }
 
     public int getCubingInMemSamplingPercent() {
-        int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "5"));
+        int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "100"));
         percent = Math.max(percent, 1);
         percent = Math.min(percent, 100);
         return percent;