You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/05/15 05:06:50 UTC
[38/50] [abbrv] incubator-kylin git commit: KYLIN-760 Improve the
hasing performance in Sampling cuboid size
KYLIN-760 Improve the hasing performance in Sampling cuboid size
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e1e1aea9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e1e1aea9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e1e1aea9
Branch: refs/heads/streaming-localdict
Commit: e1e1aea9f90116ba46f4399acb47dcaeecd8d14a
Parents: d939eb1
Author: Shao Feng, Shi <sh...@ebay.com>
Authored: Wed May 13 10:09:42 2015 +0800
Committer: Shao Feng, Shi <sh...@ebay.com>
Committed: Wed May 13 10:10:25 2015 +0800
----------------------------------------------------------------------
.../cube/FactDistinctHiveColumnsMapper.java | 28 +++-
.../job/hadoop/cubev2/CubeSamplingTest.java | 135 +++++++++++++++++++
2 files changed, 157 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e1e1aea9/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
index 20df4c4..a884465 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctHiveColumnsMapper.java
@@ -22,11 +22,12 @@ import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
-import org.apache.kylin.common.util.Bytes;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
@@ -58,6 +59,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
private HashFunction hf = null;
private int rowCount = 0;
private int SAMPING_PERCENTAGE = 5;
+ private ByteArray[] row_hashcodes = null;
@Override
protected void setup(Context context) throws IOException {
@@ -83,7 +85,11 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
}
- hf = Hashing.md5();
+ hf = Hashing.murmur3_32();
+ row_hashcodes = new ByteArray[nRowKey];
+ for (int i = 0; i < nRowKey; i++) {
+ row_hashcodes[i] = new ByteArray();
+ }
}
}
@@ -96,7 +102,7 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
int position = 0;
for (int i = 0; i < nRowKey; i++) {
if ((mask & cuboidId) > 0) {
- indice[position] = intermediateTableDesc.getRowKeyColumnIndexes()[i];
+ indice[position] = i;
position++;
}
mask = mask >> 1;
@@ -136,12 +142,22 @@ public class FactDistinctHiveColumnsMapper<KEYIN> extends FactDistinctColumnsMap
}
private void putRowKeyToHLL(List<String> row) {
+
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ if (row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]) != null) {
+ row_hashcodes[i].set(hc.putString(row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i])).hash().asBytes());
+ } else {
+ row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+ }
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
Hasher hc = hf.newHasher();
for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
- if (row.get(allCuboidsBitSet[i][position]) != null)
- hc.putString(row.get(allCuboidsBitSet[i][position]));
- hc.putString(",");
+ hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
}
allCuboidsHLL[i].add(hc.hash().asBytes());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e1e1aea9/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
new file mode 100644
index 0000000..f321fc3
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cubev2/CubeSamplingTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+/**
+ * Created by shaoshi on 5/12/15.
+ */
+public class CubeSamplingTest {
+
+ private static final int ROW_LENGTH = 10;
+
+ private final List<String> row = new ArrayList<String>(ROW_LENGTH);
+ private final ByteArray[] row_index = new ByteArray[ROW_LENGTH];
+
+ private Integer[][] allCuboidsBitSet;
+ private HashFunction hf = null;
+ private long baseCuboidId;
+ private HyperLogLogPlusCounter[] allCuboidsHLL = null;
+ private final byte[] seperator = Bytes.toBytes(",");
+
+ @Before
+ public void setup() {
+
+ baseCuboidId = (1l << ROW_LENGTH) - 1;
+ List<Long> allCuboids = Lists.newArrayList();
+ List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+ for (long i = 1; i < baseCuboidId; i++) {
+ allCuboids.add(i);
+ addCuboidBitSet(i, allCuboidsBitSetList);
+ }
+
+ allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[allCuboidsBitSetList.size()][]);
+ System.out.println("Totally have " + allCuboidsBitSet.length + " cuboids.");
+ allCuboidsHLL = new HyperLogLogPlusCounter[allCuboids.size()];
+ for (int i = 0; i < allCuboids.size(); i++) {
+ allCuboidsHLL[i] = new HyperLogLogPlusCounter(14);
+ }
+
+ // hf = Hashing.goodFastHash(32);
+// hf = Hashing.md5();
+ hf = Hashing.murmur3_32();
+
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ row_index[i] = new ByteArray();
+ }
+ }
+
+ private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet) {
+ BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+ Integer[] indice = new Integer[bitSet.cardinality()];
+
+ long mask = Long.highestOneBit(baseCuboidId);
+ int position = 0;
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ if ((mask & cuboidId) > 0) {
+ indice[position] = i;
+ position++;
+ }
+ mask = mask >> 1;
+ }
+
+ allCuboidsBitSet.add(indice);
+
+ }
+
+ @Test
+ public void test() {
+
+ long start = System.currentTimeMillis();
+ List<String> row;
+ for (int i = 0; i < 10000; i++) {
+ row = getRandomRow();
+ putRowKeyToHLL(row);
+ }
+
+ long duration = System.currentTimeMillis() - start;
+ System.out.println("The test takes " + duration / 1000 + "seconds.");
+ }
+
+ private void putRowKeyToHLL(List<String> row) {
+ int x = 0;
+ for (String field : row) {
+ Hasher hc = hf.newHasher();
+ row_index[x++].set(hc.putString(field).hash().asBytes());
+ }
+
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ hc.putBytes(row_index[allCuboidsBitSet[i][position]].array());
+ hc.putBytes(seperator);
+ }
+ allCuboidsHLL[i].add(hc.hash().asBytes());
+ }
+ }
+
+ private List<String> getRandomRow() {
+ row.clear();
+ for (int i = 0; i < ROW_LENGTH; i++) {
+ row.add(RandomStringUtils.random(10));
+ }
+ return row;
+ }
+}