You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/31 07:00:09 UTC

[19/21] kylin git commit: KYLIN-2518 Optimize put row key to hll

KYLIN-2518 Optimize put row key to hll

Signed-off-by: Hongbin Ma <ma...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4c218214
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4c218214
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4c218214

Branch: refs/heads/KYLIN-2501
Commit: 4c21821471cb261cfecdf8289c5f8284af817b3e
Parents: b1cc0dd
Author: xiefan46 <95...@qq.com>
Authored: Mon Mar 27 18:13:03 2017 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Mar 29 11:01:24 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/measure/hllc/HLLCounter.java   |  54 ++--
 .../mr/steps/FactDistinctColumnsMapper.java     |  31 +-
 .../mr/steps/NewCubeSamplingMethodTest.java     | 299 +++++++++++++++++++
 3 files changed, 341 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
index 82c881b..b793465 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCounter.java
@@ -60,7 +60,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
         merge(another);
     }
 
-    HLLCounter(int p, RegisterType type) {
+    public HLLCounter(int p, RegisterType type) {
         this(p, type, Hashing.murmur3_128());
     }
 
@@ -99,6 +99,10 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
         add(hashFunc.hashBytes(value, offset, length).asLong());
     }
 
+    public void addHashDirectly(long hash){
+        add(hash);
+    }
+
     protected void add(long hash) {
         int bucketMask = m - 1;
         int bucket = (int) (hash & bucketMask);
@@ -126,7 +130,7 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
     }
 
     private void toDenseIfNeeded() {
-        if (register instanceof SparseRegister) {
+        if (register.getRegisterType() == RegisterType.SPARSE) {
             if (isDense(register.getSize())) {
                 register = ((SparseRegister) register).toDense(p);
             }
@@ -137,36 +141,36 @@ public class HLLCounter implements Serializable, Comparable<HLLCounter> {
         assert this.p == another.p;
         assert this.hashFunc == another.hashFunc;
         switch (register.getRegisterType()) {
-        case SINGLE_VALUE:
-            switch (another.getRegisterType()) {
             case SINGLE_VALUE:
-                if (register.getSize() > 0 && another.register.getSize() > 0) {
-                    register = ((SingleValueRegister) register).toSparse();
-                } else {
-                    SingleValueRegister sr = (SingleValueRegister) another.register;
-                    if (sr.getSize() > 0)
-                        register.set(sr.getSingleValuePos(), sr.getValue());
-                    return;
+                switch (another.getRegisterType()) {
+                    case SINGLE_VALUE:
+                        if (register.getSize() > 0 && another.register.getSize() > 0) {
+                            register = ((SingleValueRegister) register).toSparse();
+                        } else {
+                            SingleValueRegister sr = (SingleValueRegister) another.register;
+                            if (sr.getSize() > 0)
+                                register.set(sr.getSingleValuePos(), sr.getValue());
+                            return;
+                        }
+                        break;
+                    case SPARSE:
+                        register = ((SingleValueRegister) register).toSparse();
+                        break;
+                    case DENSE:
+                        register = ((SingleValueRegister) register).toDense(this.p);
+                        break;
+                    default:
+                        break;
                 }
+
                 break;
             case SPARSE:
-                register = ((SingleValueRegister) register).toSparse();
-                break;
-            case DENSE:
-                register = ((SingleValueRegister) register).toDense(this.p);
+                if (another.getRegisterType() == RegisterType.DENSE) {
+                    register = ((SparseRegister) register).toDense(p);
+                }
                 break;
             default:
                 break;
-            }
-
-            break;
-        case SPARSE:
-            if (another.getRegisterType() == RegisterType.DENSE) {
-                register = ((SparseRegister) register).toDense(p);
-            }
-            break;
-        default:
-            break;
         }
         register.merge(another.register);
         toDenseIfNeeded();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 9f65163..e6cea2b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -24,13 +24,13 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -62,7 +62,8 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
     private HashFunction hf = null;
     private int rowCount = 0;
     private int samplingPercentage;
-    private ByteArray[] row_hashcodes = null;
+    //private ByteArray[] row_hashcodes = null;
+    private long[] rowHashCodesLong = null;
     private ByteBuffer tmpbuf;
     private static final Text EMPTY_TEXT = new Text();
     public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
@@ -92,14 +93,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
 
             allCuboidsHLL = new HLLCounter[cuboidIds.length];
             for (int i = 0; i < cuboidIds.length; i++) {
-                allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+                allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE);
             }
 
-            hf = Hashing.murmur3_32();
-            row_hashcodes = new ByteArray[nRowKey];
-            for (int i = 0; i < nRowKey; i++) {
-                row_hashcodes[i] = new ByteArray();
-            }
+            hf = Hashing.murmur3_128();
+            rowHashCodesLong = new long[nRowKey];
 
             TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
             if (partitionColRef != null) {
@@ -211,26 +209,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
     }
 
     private void putRowKeyToHLL(String[] row) {
-
         //generate hash for each row key column
         for (int i = 0; i < nRowKey; i++) {
             Hasher hc = hf.newHasher();
             String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
-            if (colValue != null) {
-                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
-            } else {
-                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
-            }
+            if (colValue == null)
+                colValue = "0";
+            byte[] bytes = hc.putString(colValue).hash().asBytes();
+            rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
         }
 
         // user the row key column hash to get a consolidated hash for each cuboid
         for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
-            Hasher hc = hf.newHasher();
+            long value = 0;
             for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+                value += rowHashCodesLong[allCuboidsBitSet[i][position]];
             }
-
-            allCuboidsHLL[i].add(hc.hash().asBytes());
+            allCuboidsHLL[i].addHashDirectly(value);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4c218214/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
new file mode 100644
index 0000000..f018f28
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NewCubeSamplingMethodTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.engine.mr.steps;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.measure.hllc.RegisterType;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@Ignore
+public class NewCubeSamplingMethodTest {
+
+    private static final int ROW_LENGTH = 10;
+
+    private Integer[][] allCuboidsBitSet;
+
+    private long baseCuboidId;
+
+    private final int rowCount = 500000;
+
+    @Before
+    public void setup() {
+        baseCuboidId = (1L << ROW_LENGTH) - 1;
+        createAllCuboidBitSet();
+        System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
+    }
+
+    @Ignore
+    @Test
+    public void testRandomData() throws Exception {
+        List<List<String>> dataSet = getRandomDataset(rowCount);
+        comparePerformanceBasic(dataSet);
+        compareAccuracyBasic(dataSet);
+    }
+
+
+    @Ignore
+    @Test
+    public void testSmallCardData() throws Exception {
+        List<List<String>> dataSet = getSmallCardDataset(rowCount);
+        comparePerformanceBasic(dataSet);
+        compareAccuracyBasic(dataSet);
+    }
+
+
+    public void comparePerformanceBasic(final List<List<String>> rows) throws Exception {
+        //old hash method
+        ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
+        HLLCounter[] cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length);
+        long start = System.currentTimeMillis();
+        for (List<String> row : rows) {
+            putRowKeyToHLL(row, colHashValues, cuboidCounters, Hashing.murmur3_32());
+        }
+        long totalTime = System.currentTimeMillis() - start;
+        System.out.println("old method cost time : " + totalTime);
+        //new hash method
+        colHashValues = getNewColHashValues(ROW_LENGTH);
+        cuboidCounters = getNewCuboidCounters(allCuboidsBitSet.length);
+        start = System.currentTimeMillis();
+        long[] valueHashLong = new long[allCuboidsBitSet.length];
+        for (List<String> row : rows) {
+            putRowKeyToHLLNew(row, valueHashLong, cuboidCounters, Hashing.murmur3_128());
+        }
+        totalTime = System.currentTimeMillis() - start;
+        System.out.println("new method cost time : " + totalTime);
+    }
+
+    //test accuracy
+    public void compareAccuracyBasic(final List<List<String>> rows) throws Exception {
+        final long realCardinality = countCardinality(rows);
+        System.out.println("real cardinality : " + realCardinality);
+        //test1
+        long t1 = runAndGetTime(new TestCase() {
+            @Override
+            public void run() throws Exception {
+                HLLCounter counter = new HLLCounter(14, RegisterType.DENSE);
+                final ByteArray[] colHashValues = getNewColHashValues(ROW_LENGTH);
+                HashFunction hf = Hashing.murmur3_32();
+                for (List<String> row : rows) {
+
+                    int x = 0;
+                    for (String field : row) {
+                        Hasher hc = hf.newHasher();
+                        colHashValues[x++].set(hc.putString(field).hash().asBytes());
+                    }
+
+                    Hasher hc = hf.newHasher();
+                    for (int position = 0; position < colHashValues.length; position++) {
+                        hc.putBytes(colHashValues[position].array());
+                    }
+                    counter.add(hc.hash().asBytes());
+                }
+                long estimate = counter.getCountEstimate();
+                System.out.println("old method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
+            }
+        });
+
+
+        long t2 = runAndGetTime(new TestCase() {
+            @Override
+            public void run() throws Exception {
+                HLLCounter counter = new HLLCounter(14, RegisterType.DENSE);
+                HashFunction hf2 = Hashing.murmur3_128();
+                long[] valueHashLong = new long[allCuboidsBitSet.length];
+                for (List<String> row : rows) {
+
+                    int x = 0;
+                    for (String field : row) {
+                        Hasher hc = hf2.newHasher();
+                        byte[] bytes = hc.putString(x + field).hash().asBytes();
+                        valueHashLong[x++] = Bytes.toLong(bytes);
+                    }
+
+                    long value = 0;
+                    for (int position = 0; position < row.size(); position++) {
+                        value += valueHashLong[position];
+                    }
+                    counter.addHashDirectly(value);
+                }
+                long estimate = counter.getCountEstimate();
+                System.out.println("new method finished. Estimate cardinality : " + estimate + ". Error rate : " + countErrorRate(estimate, realCardinality));
+            }
+        });
+    }
+
+    public void createAllCuboidBitSet() {
+        List<Long> allCuboids = Lists.newArrayList();
+        List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+        for (long i = 1; i < baseCuboidId; i++) {
+            allCuboids.add(i);
+            addCuboidBitSet(i, allCuboidsBitSetList);
+        }
+        allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
+    }
+
+    private ByteArray[] getNewColHashValues(int rowLength) {
+        ByteArray[] colHashValues = new ByteArray[rowLength];
+        for (int i = 0; i < rowLength; i++) {
+            colHashValues[i] = new ByteArray();
+        }
+        return colHashValues;
+    }
+
+    private HLLCounter[] getNewCuboidCounters(int cuboidNum) {
+        HLLCounter[] counters = new HLLCounter[cuboidNum];
+        for (int i = 0; i < counters.length; i++)
+            counters[i] = new HLLCounter(14, RegisterType.DENSE);
+        return counters;
+    }
+
+
+    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
+        Integer[] indice = new Integer[Long.bitCount(cuboidId)];
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        int position = 0;
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            if ((mask & cuboidId) > 0) {
+                indice[position] = i;
+                position++;
+            }
+            mask = mask >> 1;
+        }
+
+        allCuboidsBitSet.add(indice);
+
+    }
+
+    private long runAndGetTime(TestCase testCase) throws Exception {
+        long start = System.currentTimeMillis();
+        testCase.run();
+        long totalTime = System.currentTimeMillis() - start;
+        return totalTime;
+    }
+
+    interface TestCase {
+        void run() throws Exception;
+    }
+
+    private void putRowKeyToHLL(List<String> row, ByteArray[] colHashValues, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
+        int x = 0;
+        for (String field : row) {
+            Hasher hc = hashFunction.newHasher();
+            colHashValues[x++].set(hc.putString(field).hash().asBytes());
+        }
+
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            Hasher hc = hashFunction.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(colHashValues[allCuboidsBitSet[i][position]].array());
+                //hc.putBytes(seperator);
+            }
+            cuboidCounters[i].add(hc.hash().asBytes());
+        }
+    }
+
+    private void putRowKeyToHLLNew(List<String> row, long[] hashValuesLong, HLLCounter[] cuboidCounters, HashFunction hashFunction) {
+        int x = 0;
+        for (String field : row) {
+            Hasher hc = hashFunction.newHasher();
+            byte[] bytes = hc.putString(x + field).hash().asBytes();
+            hashValuesLong[x++] = Bytes.toLong(bytes);
+        }
+
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            long value = 0;
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                value += hashValuesLong[allCuboidsBitSet[i][position]];
+            }
+            cuboidCounters[i].addHashDirectly(value);
+        }
+    }
+
+    private List<List<String>> getRandomDataset(int size) {
+        List<List<String>> rows = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            rows.add(getRandomRow());
+        }
+        return rows;
+    }
+
+    private List<List<String>> getSmallCardDataset(int size) {
+        List<List<String>> rows = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            rows.add(getSmallCardRow());
+        }
+        return rows;
+    }
+
+    private List<String> getRandomRow() {
+        List<String> row = new ArrayList<>();
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            row.add(RandomStringUtils.random(10));
+        }
+        return row;
+    }
+
+    private String[] smallCardRow = {"abc", "bcd", "jifea", "feaifj"};
+
+    private Random rand = new Random(System.currentTimeMillis());
+
+    private List<String> getSmallCardRow() {
+        List<String> row = new ArrayList<>();
+        row.add(smallCardRow[rand.nextInt(smallCardRow.length)]);
+        for (int i = 1; i < ROW_LENGTH; i++) {
+            row.add("abc");
+        }
+        return row;
+    }
+
+
+    private int countCardinality(List<List<String>> rows) {
+        Set<String> diffCols = new HashSet<String>();
+        for (List<String> row : rows) {
+            StringBuilder sb = new StringBuilder();
+            for (String str : row) {
+                sb.append(str);
+            }
+            diffCols.add(sb.toString());
+        }
+        return diffCols.size();
+    }
+
+    private double countErrorRate(long estimate, long real) {
+        double rate = Math.abs((estimate - real) * 1.0) / real;
+        return rate;
+    }
+}