You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/10/22 09:08:30 UTC
[1/2] incubator-kylin git commit: optimizing spark sampling process
Repository: incubator-kylin
Updated Branches:
refs/heads/2.x-staging 801a7921f -> a8ead00fa
optimizing spark sampling process
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a8ead00f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a8ead00f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a8ead00f
Branch: refs/heads/2.x-staging
Commit: a8ead00fa55b95bce3a69b1e1b88a0d1c1327d1a
Parents: a2425c0
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Oct 15 16:13:05 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Oct 21 14:47:20 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/engine/spark/SparkCubing.java | 76 ++++++++++----------
.../storage/hbase/steps/CreateHTableJob.java | 13 ++--
2 files changed, 45 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a8ead00f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index dd1fd99..4df77e2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.primitives.UnsignedBytes;
@@ -202,34 +203,53 @@ public class SparkCubing extends AbstractSparkApplication {
}
private Map<Long, HyperLogLogPlusCounter> sampling(final JavaRDD<List<String>> rowJavaRDD, final String cubeName) throws Exception {
- final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
- HashMap<Long, HyperLogLogPlusCounter> zeroValue = Maps.newHashMap();
- for (Long id : new CuboidScheduler(cubeInstance.getDescriptor()).getAllCuboidIds()) {
+ CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+ List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
+ final HashMap<Long, HyperLogLogPlusCounter> zeroValue = Maps.newHashMap();
+ for (Long id : allCuboidIds) {
zeroValue.put(id, new HyperLogLogPlusCounter(14));
}
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- CubeDesc cubeDesc = CubeManager.getInstance(kylinConfig).getCube(cubeName).getDescriptor();
CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
- CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+
final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- final List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
-
+
+ final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+ for (Long cuboidId : allCuboidIds) {
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < nRowKey; i++) {
+ if ((mask & cuboidId) > 0) {
+ cuboidBitSet[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+ allCuboidsBitSet.put(cuboidId, cuboidBitSet);
+ }
+ final ByteArray[] row_hashcodes = new ByteArray[nRowKey];
+ for (int i = 0; i < nRowKey; ++i) {
+ row_hashcodes[i] = new ByteArray();
+ }
+
final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue,
new Function2<HashMap<Long, HyperLogLogPlusCounter>,
List<String>,
HashMap<Long, HyperLogLogPlusCounter>>() {
+
+ final HashFunction hashFunction = Hashing.murmur3_32();
@Override
public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, List<String> v2) throws Exception {
- ByteArray[] row_hashcodes = new ByteArray[nRowKey];
- for (int i = 0; i < nRowKey; ++i) {
- row_hashcodes[i] = new ByteArray();
- }
for (int i = 0; i < nRowKey; i++) {
- Hasher hc = Hashing.murmur3_32().newHasher();
+ Hasher hc = hashFunction.newHasher();
String colValue = v2.get(rowKeyColumnIndexes[i]);
if (colValue != null) {
row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
@@ -238,27 +258,10 @@ public class SparkCubing extends AbstractSparkApplication {
}
}
- final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
- for (Long cuboidId : allCuboidIds) {
- BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
- Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
-
- long mask = Long.highestOneBit(baseCuboidId);
- int position = 0;
- for (int i = 0; i < nRowKey; i++) {
- if ((mask & cuboidId) > 0) {
- cuboidBitSet[position] = i;
- position++;
- }
- mask = mask >> 1;
- }
- allCuboidsBitSet.put(cuboidId, cuboidBitSet);
- }
-
- for (Long cuboidId : allCuboidIds) {
- Hasher hc = Hashing.murmur3_32().newHasher();
- HyperLogLogPlusCounter counter = v1.get(cuboidId);
- final Integer[] cuboidBitSet = allCuboidsBitSet.get(cuboidId);
+ for (Map.Entry<Long, Integer[]> entry : allCuboidsBitSet.entrySet()) {
+ Hasher hc = hashFunction.newHasher();
+ HyperLogLogPlusCounter counter = v1.get(entry.getKey());
+ final Integer[] cuboidBitSet = entry.getValue();
for (int position = 0; position < cuboidBitSet.length; position++) {
hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
}
@@ -277,9 +280,7 @@ public class SparkCubing extends AbstractSparkApplication {
for (Map.Entry<Long, HyperLogLogPlusCounter> entry : v1.entrySet()) {
final HyperLogLogPlusCounter counter1 = entry.getValue();
final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey());
- if (counter2 != null) {
- counter1.merge(counter2);
- }
+ counter1.merge(Preconditions.checkNotNull(counter2, "counter cannot be null"));
}
return v1;
}
@@ -446,6 +447,7 @@ public class SparkCubing extends AbstractSparkApplication {
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
final Map<Long, Long> cubeSizeMap = CreateHTableJob.getCubeRowCountMapFromCuboidStatistics(samplingResult, 100);
+ System.out.println("cube size estimation:" + cubeSizeMap);
final byte[][] splitKeys = CreateHTableJob.getSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment);
CubeHTableUtil.createHTable(cubeDesc, cubeSegment.getStorageLocationIdentifier(), splitKeys);
System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a8ead00f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 1e61d04..eee201b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -173,24 +173,23 @@ public class CreateHTableJob extends AbstractHadoopJob {
IOUtils.closeStream(is);
IOUtils.closeStream(tempFileStream);
}
-
- Map<Long, Long> cuboidSizeMap = Maps.newHashMap();
+ Map<Long, HyperLogLogPlusCounter> counterMap = Maps.newHashMap();
+
FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+ int samplingPercentage = 25;
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- int samplingPercentage = 25;
while (reader.next(key, value)) {
- if (key.get() == 0l) {
+ if (key.get() == 0L) {
samplingPercentage = Bytes.toInt(value.getBytes());
} else {
HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
ByteArray byteArray = new ByteArray(value.getBytes());
hll.readRegisters(byteArray.asBuffer());
-
- cuboidSizeMap.put(key.get(), hll.getCountEstimate() * 100 / samplingPercentage);
+ counterMap.put(key.get(), hll);
}
}
@@ -201,7 +200,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
IOUtils.closeStream(reader);
tempFile.delete();
}
- return cuboidSizeMap;
+ return getCubeRowCountMapFromCuboidStatistics(counterMap, samplingPercentage);
}
public static Map<Long, Long> getCubeRowCountMapFromCuboidStatistics(Map<Long, HyperLogLogPlusCounter> counterMap, final int samplingPercentage) throws IOException {
[2/2] incubator-kylin git commit: minor set MR sampling default
percentage to 100
Posted by qh...@apache.org.
minor set MR sampling default percentage to 100
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a2425c0b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a2425c0b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a2425c0b
Branch: refs/heads/2.x-staging
Commit: a2425c0b4be8eb581e2916a493b8f897437a0372
Parents: 801a792
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Oct 15 16:07:09 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Wed Oct 21 14:47:20 2015 +0800
----------------------------------------------------------------------
build/conf/kylin.properties | 4 ++--
.../src/main/java/org/apache/kylin/common/KylinConfig.java | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a2425c0b/build/conf/kylin.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties
index 5b56f31..fe953fc 100644
--- a/build/conf/kylin.properties
+++ b/build/conf/kylin.properties
@@ -51,8 +51,8 @@ kylin.job.cubing.inMem=true
#default compression codec for htable,snappy,lzo,gzip,lz4
kylin.hbase.default.compression.codec=snappy
-#the percentage of the sampling, default 25%
-kylin.job.cubing.inMem.sampling.percent=25
+#the percentage of the sampling, default 100%
+kylin.job.cubing.inMem.sampling.percent=100
# The cut size for hbase region, in GB.
# E.g, for cube whose capacity be marked as "SMALL", split region per 10GB by default
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a2425c0b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 5f8fb07..709e5aa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -505,7 +505,7 @@ public class KylinConfig implements Serializable {
}
public int getCubingInMemSamplingPercent() {
- int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "5"));
+ int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "100"));
percent = Math.max(percent, 1);
percent = Math.min(percent, 100);
return percent;