You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/31 07:00:09 UTC
[19/21] kylin git commit: KYLIN-2518 Optimize put row key to hll
KYLIN-2518 Optimize put row key to hll
Signed-off-by: Hongbin Ma <ma...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c218214
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c218214
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c218214
Branch: refs/heads/KYLIN-2501
Commit: 4c21821471cb261cfecdf8289c5f8284af817b3e
Parents: b1cc0dd
Author: xiefan46 <95...@qq.com>
Authored: Mon Mar 27 18:13:03 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 29 11:01:24 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/measure/hllc/HLLCounter.java | 54 ++--
.../mr/steps/FactDistinctColumnsMapper.java | 31 +-
.../mr/steps/NewCubeSamplingMethodTest.java | 299 +++++++++++++++++++
3 files changed, 341 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
index 82c881b..b793465 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
@@ -60,7 +60,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
merge(another);
}
- HLLCounter(int p, RegisterType type) {
+ public HLLCounter(int p, RegisterType type) {
this(p, type, Hashing.murmur3_128());
}
@@ -99,6 +99,10 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
add(hashFunc.hashBytes(value, offset, length).asLong());
}
+ public void addHashDirectly(long hash){
+ add(hash);
+ }
+
protected void add(long hash) {
int bucketMask = m - 1;
int bucket = (int) (hash & bucketMask);
@@ -126,7 +130,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
}
private void toDenseIfNeeded() {
- if (register instanceof SparseRegister) {
+ if (register.getRegisterType() == RegisterType.SPARSE) {
if (isDense(register.getSize())) {
register = ((SparseRegister) register).toDense(p);
}
@@ -137,36 +141,36 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
assert this.p == another.p;
assert this.hashFunc == another.hashFunc;
switch (register.getRegisterType()) {
- case SINGLE_VALUE:
- switch (another.getRegisterType()) {
case SINGLE_VALUE:
- if (register.getSize() > 0 && another.register.getSize() > 0) {
- register = ((SingleValueRegister) register).toSparse();
- } else {
- SingleValueRegister sr = (SingleValueRegister) another.register;
- if (sr.getSize() > 0)
- register.set(sr.getSingleValuePos(), sr.getValue());
- return;
+ switch (another.getRegisterType()) {
+ case SINGLE_VALUE:
+ if (register.getSize() > 0 && another.register.getSize() > 0) {
+ register = ((SingleValueRegister) register).toSparse();
+ } else {
+ SingleValueRegister sr = (SingleValueRegister) another.register;
+ if (sr.getSize() > 0)
+ register.set(sr.getSingleValuePos(), sr.getValue());
+ return;
+ }
+ break;
+ case SPARSE:
+ register = ((SingleValueRegister) register).toSparse();
+ break;
+ case DENSE:
+ register = ((SingleValueRegister) register).toDense(this.p);
+ break;
+ default:
+ break;
}
+
break;
case SPARSE:
- register = ((SingleValueRegister) register).toSparse();
- break;
- case DENSE:
- register = ((SingleValueRegister) register).toDense(this.p);
+ if (another.getRegisterType() == RegisterType.DENSE) {
+ register = ((SparseRegister) register).toDense(p);
+ }
break;
default:
break;
- }
-
- break;
- case SPARSE:
- if (another.getRegisterType() == RegisterType.DENSE) {
- register = ((SparseRegister) register).toDense(p);
- }
- break;
- default:
- break;
}
register.merge(another.register);
toDenseIfNeeded();
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 9f65163..e6cea2b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -24,13 +24,13 @@ import java.util.Collection;
import java.util.List;
import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -62,7 +62,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
private HashFunction hf = null;
private int rowCount = 0;
private int samplingPercentage;
- private ByteArray[] row_hashcodes = null;
+ //private ByteArray[] row_hashcodes = null;
+ private long[] rowHashCodesLong = null;
private ByteBuffer tmpbuf;
private static final Text EMPTY_TEXT = new Text();
public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
@@ -92,14 +93,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
allCuboidsHLL = new HLLCounter[cuboidIds.length];
for (int i = 0; i < cuboidIds.length; i++) {
- allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+ allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
}
- hf = Hashing.murmur3_32();
- row_hashcodes = new ByteArray[nRowKey];
- for (int i = 0; i < nRowKey; i++) {
- row_hashcodes[i] = new ByteArray();
- }
+ hf = Hashing.murmur3_128();
+ rowHashCodesLong = new long[nRowKey];
TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
if (partitionColRef != null) {
@@ -211,26 +209,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
private void putRowKeyToHLL(String[] row) {
-
//generate hash for each row key column
for (int i = 0; i < nRowKey; i++) {
Hasher hc = hf.newHasher();
String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
- if (colValue != null) {
- row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
- } else {
- row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
- }
+ if (colValue == null)
+ colValue = "0";
+ byte[] bytes = hc.putString(colValue).hash().asBytes();
+ rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
}
// user the row key column hash to get a consolidated hash for each cuboid
for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
- Hasher hc = hf.newHasher();
+ long value = 0;
for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+ value += rowHashCodesLong[allCuboidsBitSet[i][position]];
}
-
- allCuboidsHLL[i].add(hc.hash().asBytes());
+ allCuboidsHLL[i].addHashDirectly(value);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
new file mode 100644
index 0000000..f018f28
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@Ignore
+public class NewCubeSamplingMethodTest {
+
+ private static final int ROW_LENGTH = 10;
+
+ private Integer[][] allCuboidsBitSet;
+
+ private long baseCuboidId;
+
+ private final int rowCount = 500000;
+
+ @Before
+ public void setup() {
+ baseCuboidId = (1L << ROW_LENGTH) - 1;
+ createAllCuboidBitSet();
+ System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
+ }
+
+ @Ignore
+ @Test
+ public void testRandomData() throws Exception {
+ List<List<String>> dataSet = getRandomDataset(rowCount);
+ comparePerformanceBasic(dataSet);
+ compareAccuracyBasic(dataSet);
+ }
+
+
+ @Ignore
+ @Test
+ public void testSmallCardData() throws Exception {
+ List<List<String>> dataSet = getSmallCardDataset(rowCount);
+ comparePerformanceBasic(dataSet);
+ compareAccuracyBasic(dataSet);
+ }
+
+
+ public void comparePerformanceBasic(final List<List<String>> rows) throws Exception {
+ //old hash method
+ ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
+ HLLCounter[] cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length);
+ long start = System.currentTimeMillis();
+ for (List<String> row : rows) {
+ putRowKeyToHLL(row, colHashValues, cuboidCounters, Hashing.murmur3_32());
+ }
+ long totalTime = System.currentTimeMillis() - start;
+ System.out.println("old method cost time : " + totalTime);
+ //new hash method
+ colHashValues = getNewColHashValues(ROW_LENGTH);
+ cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length);
+ start = System.currentTimeMillis();
+ long[] valueHashLong = new long[allCuboidsBitSet.length];
+ for (List<String> row : rows) {
+ putRowKeyToHLLNew(row, valueHashLong, cuboidCounters, Hashing.murmur3_128());
+ }
+ totalTime = System.currentTimeMillis() - start;
+ System.out.println("new method cost time : " + totalTime);
+ }
+
+ //test accuracy
+ public void compareAccuracyBasic(final List<List<String>> rows) throws Exception {
+ final long realCardinality = countCardinality(rows);
+ System.out.println("real cardinality : " + realCardinality);
+ //test1
+ long t1 = runAndGetTime(new TestCase() {
+ @Override
+ public void run() throws Exception {
+ HLLCounter counter = new HLLCounter(14, RegisterType.DENSE);
+ final ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
+ HashFunction hf = Hashing.murmur3_32();
+ for (List<String> row : rows) {
+
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hf.newHasher();
+ colHashValues[x++].set(hc.putString(field).hash().asBytes());
+ }
+
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < colHashValues.length; position++) {
+ hc.putBytes(colHashValues[position].array());
+ }
+ counter.add(hc.hash().asBytes());
+ }
+ long estimate = counter.getCountEstimate();
+ System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
+ }
+ });
+
+
+ long t2 = runAndGetTime(new TestCase() {
+ @Override
+ public void run() throws Exception {
+ HLLCounter counter = new HLLCounter(14, RegisterType.DENSE);
+ HashFunction hf2 = Hashing.murmur3_128();
+ long[] valueHashLong = new long[allCuboidsBitSet.length];
+ for (List<String> row : rows) {
+
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hf2.newHasher();
+ byte[] bytes = hc.putString(x + field).hash().asBytes();
+ valueHashLong[x++] = Bytes.toLong(bytes);
+ }
+
+ long value = 0;
+ for (int position = 0; position < row.size(); position++) {
+ value += valueHashLong[position];
+ }
+ counter.addHashDirectly(value);
+ }
+ long estimate = counter.getCountEstimate();
+ System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
+ }
+ });
+ }
+
+ public void createAllCuboidBitSet() {
+ List<Long> allCuboids = Lists.newArrayList();
+ List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+ for (long i = 1; i < baseCuboidId; i++) {
+ allCuboids.add(i);
+ addCuboidBitSet(i, allCuboidsBitSetList);
+ }
+ allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
+ }
+
+ private ByteArray[] getNewColHashValues(int rowLength) {
+ ByteArray[] colHashValues = new ByteArray[rowLength];
+ for (int i = 0; i < rowLength; i++) {
+ colHashValues[i] = new ByteArray();
+ }
+ return colHashValues;
+ }
+
+ private HLLCounter[] getNewCuboidCounters(int cuboidNum) {
+ HLLCounter[] counters = new HLLCounter[cuboidNum];
+ for (int i = 0; i < counters.length; i++)
+ counters[i] = new HLLCounter(14, RegisterType.DENSE);
+ return counters;
+ }
+
+
+ private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
+ Integer[] indice = new Integer[Long.bitCount(cuboidId)];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ if ((mask & cuboidId) > 0) {
+ indice[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+
+ allCuboidsBitSet.add(indice);
+
+ }
+
+ private long runAndGetTime(TestCase testCase) throws Exception {
+ long start = System.currentTimeMillis();
+ testCase.run();
+ long totalTime = System.currentTimeMillis() - start;
+ return totalTime;
+ }
+
+ interface TestCase {
+ void run() throws Exception;
+ }
+
+ private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hashFunction.newHasher();
+ colHashValues[x++].set(hc.putString(field).hash().asBytes());
+ }
+
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hashFunction.newHasher();
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ hc.putBytes(colHashValues[allCuboidsBitSet[i][position]].array());
+ //hc.putBytes(seperator);
+ }
+ cuboidCounters[i].add(hc.hash().asBytes());
+ }
+ }
+
+ private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hashFunction.newHasher();
+ byte[] bytes = hc.putString(x + field).hash().asBytes();
+ hashValuesLong[x++] = Bytes.toLong(bytes);
+ }
+
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ long value = 0;
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ value += hashValuesLong[allCuboidsBitSet[i][position]];
+ }
+ cuboidCounters[i].addHashDirectly(value);
+ }
+ }
+
+ private List<List<String>> getRandomDataset(int size) {
+ List<List<String>> rows = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ rows.add(getRandomRow());
+ }
+ return rows;
+ }
+
+ private List<List<String>> getSmallCardDataset(int size) {
+ List<List<String>> rows = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ rows.add(getSmallCardRow());
+ }
+ return rows;
+ }
+
+ private List<String> getRandomRow() {
+ List<String> row = new ArrayList<>();
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ row.add(RandomStringUtils.random(10));
+ }
+ return row;
+ }
+
+ private String[] smallCardRow = {"abc", "bcd", "jifea", "feaifj"};
+
+ private Random rand = new Random(System.currentTimeMillis());
+
+ private List<String> getSmallCardRow() {
+ List<String> row = new ArrayList<>();
+ row.add(smallCardRow[rand.nextInt(smallCardRow.length)]);
+ for (int i = 1; i < ROW_LENGTH; i++) {
+ row.add("abc");
+ }
+ return row;
+ }
+
+
+ private int countCardinality(List<List<String>> rows) {
+ Set<String> diffCols = new HashSet<String>();
+ for (List<String> row : rows) {
+ StringBuilder sb = new StringBuilder();
+ for (String str : row) {
+ sb.append(str);
+ }
+ diffCols.add(sb.toString());
+ }
+ return diffCols.size();
+ }
+
+ private double countErrorRate(long estimate, long real) {
+ double rate = Math.abs((estimate - real) * 1.0) / real;
+ return rate;
+ }
+}