You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:50 UTC

[38/50] [abbrv] incubator-kylin git commit: KYLIN-760 Improve the hasing performance in Sampling cuboid size

KYLIN-760 Improve the hasing performance in Sampling cuboid size


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e1e1aea9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e1e1aea9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e1e1aea9

Branch: refs/heads/streaming-localdict
Commit: e1e1aea9f90116ba46f4399acb47dcaeecd8d14a
Parents: d939eb1
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Wed May 13 10:09:42 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Wed May 13 10:10:25 2015 +0800

----------------------------------------------------------------------
 .../cube/FactDistinctHiveColumnsMapper.java     |  28 +++-
 .../job/hadoop/cubev2/CubeSamplingTest.java     | 135 +++++++++++++++++++
 2 files changed, 157 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e1e1aea9/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index 20df4c4..a884465 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -22,11 +22,12 @@ import com.google.common.collect.Lists;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.data.schema.HCatSchema;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
@@ -58,6 +59,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     private HashFunction hf = null;
     private int rowCount = 0;
     private int SAMPING_PERCENTAGE = 5;
+    private ByteArray[] row_hashcodes = null;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -83,7 +85,11 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
                 allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
             }
 
-            hf = Hashing.md5();
+            hf = Hashing.murmur3_32();
+            row_hashcodes = new ByteArray[nRowKey];
+            for (int i = 0; i < nRowKey; i++) {
+                row_hashcodes[i] = new ByteArray();
+            }
         }
     }
 
@@ -96,7 +102,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
         int position = 0;
         for (int i = 0; i < nRowKey; i++) {
             if ((mask & cuboidId) > 0) {
-                indice[position] = intermediateTableDesc.getRowKeyColumnIndexes()[i];
+                indice[position] = i;
                 position++;
             }
             mask = mask >> 1;
@@ -136,12 +142,22 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
     }
 
     private void putRowKeyToHLL(List<String> row) {
+
+        //generate hash for each row key column
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            if (row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]) != null) {
+                row_hashcodes[i].set(hc.putString(row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i])).hash().asBytes());
+            } else {
+                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+            }
+        }
+
+        // user the row key column hash to get a consolidated hash for each cuboid
         for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
             Hasher hc = hf.newHasher();
             for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
-                if (row.get(allCuboidsBitSet[i][position]) != null)
-                    hc.putString(row.get(allCuboidsBitSet[i][position]));
-                hc.putString(",");
+                hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
             }
 
             allCuboidsHLL[i].add(hc.hash().asBytes());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e1e1aea9/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
new file mode 100644
index 0000000..f321fc3
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+/**
+ * Created by shaoshi on 5/12/15.
+ */
+public class CubeSamplingTest {
+
+    private static final int ROW_LENGTH = 10;
+
+    private final List<String> row = new ArrayList<String>(ROW_LENGTH);
+    private final ByteArray[] row_index = new ByteArray[ROW_LENGTH];
+
+    private Integer[][] allCuboidsBitSet;
+    private HashFunction hf = null;
+    private long baseCuboidId;
+    private HyperLogLogPlusCounter[] allCuboidsHLL = null;
+    private final byte[] seperator = Bytes.toBytes(",");
+
+    @Before
+    public void setup() {
+
+        baseCuboidId = (1l << ROW_LENGTH) - 1;
+        List<Long> allCuboids = Lists.newArrayList();
+        List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+        for (long i = 1; i < baseCuboidId; i++) {
+            allCuboids.add(i);
+            addCuboidBitSet(i, allCuboidsBitSetList);
+        }
+
+        allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
+        System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
+        allCuboidsHLL = new HyperLogLogPlusCounter[allCuboids.size()];
+        for (int i = 0; i < allCuboids.size(); i++) {
+            allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
+        }
+
+        //  hf = Hashing.goodFastHash(32);
+//        hf = Hashing.md5();
+        hf = Hashing.murmur3_32();
+
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            row_index[i] = new ByteArray();
+        }
+    }
+
+    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
+        BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+        Integer[] indice = new Integer[bitSet.cardinality()];
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        int position = 0;
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            if ((mask & cuboidId) > 0) {
+                indice[position] = i;
+                position++;
+            }
+            mask = mask >> 1;
+        }
+
+        allCuboidsBitSet.add(indice);
+
+    }
+
+    @Test
+    public void test() {
+
+        long start = System.currentTimeMillis();
+        List<String> row;
+        for (int i = 0; i < 10000; i++) {
+            row = getRandomRow();
+            putRowKeyToHLL(row);
+        }
+
+        long duration = System.currentTimeMillis() - start;
+        System.out.println("The test takes " + duration / 1000 + "seconds.");
+    }
+
+    private void putRowKeyToHLL(List<String> row) {
+        int x = 0;
+        for (String field : row) {
+            Hasher hc = hf.newHasher();
+            row_index[x++].set(hc.putString(field).hash().asBytes());
+        }
+
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            Hasher hc = hf.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(row_index[allCuboidsBitSet[i][position]].array());
+                hc.putBytes(seperator);
+            }
+            allCuboidsHLL[i].add(hc.hash().asBytes());
+        }
+    }
+
+    private List<String> getRandomRow() {
+        row.clear();
+        for (int i = 0; i < ROW_LENGTH; i++) {
+            row.add(RandomStringUtils.random(10));
+        }
+        return row;
+    }
+}