You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 16:15:53 UTC
[15/18] kylin git commit: KYLIN-1851:unfinished add
TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey
KYLIN-1851:unfinished add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey
Signed-off-by: Yang Li <li...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0804f95
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0804f95
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0804f95
Branch: refs/heads/KYLIN-2006
Commit: f0804f95ae59ef7adcc0e6e4fe9a3b3620586b96
Parents: ddec049
Author: xiefan46 <95...@qq.com>
Authored: Mon Nov 7 14:37:22 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Nov 8 23:23:34 2016 +0800
----------------------------------------------------------------------
.../org/apache/kylin/dict/ByteComparator.java | 44 ++
.../kylin/dict/NumberDictionaryForest.java | 278 ++++++++
.../dict/NumberDictionaryForestBuilder.java | 58 ++
.../apache/kylin/dict/TrieDictionaryForest.java | 406 ++++++++++++
.../kylin/dict/TrieDictionaryForestBuilder.java | 125 ++++
.../kylin/dict/TrieDictionaryForestTest.java | 657 +++++++++++++++++++
.../fdc2/FactDistinctColumnPartitioner2.java | 47 ++
.../fdc2/FactDistinctColumnsCombiner2.java | 44 ++
.../mr/steps/fdc2/FactDistinctColumnsJob2.java | 149 +++++
.../fdc2/FactDistinctColumnsMapperBase2.java | 102 +++
.../fdc2/FactDistinctHiveColumnsMapper2.java | 232 +++++++
.../mr/steps/fdc2/SelfDefineSortableKey.java | 130 ++++
.../kylin/engine/mr/steps/fdc2/TypeFlag.java | 28 +
.../mr/steps/NumberDictionaryForestTest.java | 214 ++++++
.../mr/steps/SelfDefineSortableKeyTest.java | 228 +++++++
15 files changed, 2742 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
new file mode 100644
index 0000000..74d5ec5
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.ByteArray;
+
+import java.util.Comparator;
+
+/**
+ * Created by xiefan on 16-10-28.
+ */
+public class ByteComparator<T> implements Comparator<T> {
+ private BytesConverter<T> converter;
+
+ public ByteComparator(BytesConverter<T> converter) {
+ this.converter = converter;
+ }
+
+ @Override
+ public int compare(T o1, T o2) {
+ //return BytesUtil.safeCompareBytes(converter.convertToBytes(o1),converter.convertToBytes(o2));
+ byte[] b1 = converter.convertToBytes(o1);
+ byte[] b2 = converter.convertToBytes(o2);
+ ByteArray ba1 = new ByteArray(b1, 0, b1.length);
+ ByteArray ba2 = new ByteArray(b2, 0, b2.length);
+ return ba1.compareTo(ba2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
new file mode 100644
index 0000000..8caa4b6
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Created by xiefan on 16-11-1.
+ * <p>
+ * notice:number dictionary forest currently could not handle
+ * very big or very small double and float value such as 4.9E-324
+ */
+public class NumberDictionaryForest<T> extends Dictionary<T> {
+
+ public static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = 19;
+
+ // encode a number into an order preserving byte sequence
+ // for positives -- padding '0'
+ // for negatives -- '-' sign, padding '9', invert digits, and terminate by ';'
+ static class NumberBytesCodec {
+ int maxDigitsBeforeDecimalPoint;
+ byte[] buf;
+ int bufOffset;
+ int bufLen;
+
+ NumberBytesCodec(int maxDigitsBeforeDecimalPoint) {
+ this.maxDigitsBeforeDecimalPoint = maxDigitsBeforeDecimalPoint;
+ this.buf = new byte[maxDigitsBeforeDecimalPoint * 3];
+ this.bufOffset = 0;
+ this.bufLen = 0;
+ }
+
+ void encodeNumber(byte[] value, int offset, int len) {
+ if (len == 0) {
+ bufOffset = 0;
+ bufLen = 0;
+ return;
+ }
+
+ if (len > buf.length) {
+ throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Internal buffer is only " + buf.length + " bytes");
+ }
+
+ boolean negative = value[offset] == '-';
+
+ // terminate negative ';'
+ int start = buf.length - len;
+ int end = buf.length;
+ if (negative) {
+ start--;
+ end--;
+ buf[end] = ';';
+ }
+
+ // copy & find decimal point
+ int decimalPoint = end;
+ for (int i = start, j = offset; i < end; i++, j++) {
+ buf[i] = value[j];
+ if (buf[i] == '.' && i < decimalPoint) {
+ decimalPoint = i;
+ }
+ }
+ // remove '-' sign
+ if (negative) {
+ start++;
+ }
+
+ // prepend '0'
+ int nZeroPadding = maxDigitsBeforeDecimalPoint - (decimalPoint - start);
+ if (nZeroPadding < 0 || nZeroPadding + 1 > start)
+ throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Expect " + maxDigitsBeforeDecimalPoint + " digits before decimal point at max.");
+ for (int i = 0; i < nZeroPadding; i++) {
+ buf[--start] = '0';
+ }
+
+ // consider negative
+ if (negative) {
+ buf[--start] = '-';
+ for (int i = start + 1; i < buf.length; i++) {
+ int c = buf[i];
+ if (c >= '0' && c <= '9') {
+ buf[i] = (byte) ('9' - (c - '0'));
+ }
+ }
+ } else {
+ buf[--start] = '0';
+ }
+
+ bufOffset = start;
+ bufLen = buf.length - start;
+ }
+
+ int decodeNumber(byte[] returnValue, int offset) {
+ if (bufLen == 0) {
+ return 0;
+ }
+
+ int in = bufOffset;
+ int end = bufOffset + bufLen;
+ int out = offset;
+
+ // sign
+ boolean negative = buf[in] == '-';
+ if (negative) {
+ returnValue[out++] = '-';
+ in++;
+ end--;
+ }
+
+ // remove padding
+ byte padding = (byte) (negative ? '9' : '0');
+ for (; in < end; in++) {
+ if (buf[in] != padding)
+ break;
+ }
+
+ // all paddings before '.', special case for '0'
+ if (in == end || !(buf[in] >= '0' && buf[in] <= '9')) {
+ returnValue[out++] = '0';
+ }
+
+ // copy the rest
+ if (negative) {
+ for (; in < end; in++, out++) {
+ int c = buf[in];
+ if (c >= '0' && c <= '9') {
+ c = '9' - (c - '0');
+ }
+ returnValue[out] = (byte) c;
+ }
+ } else {
+ System.arraycopy(buf, in, returnValue, out, end - in);
+ out += end - in;
+ }
+
+ return out - offset;
+ }
+ }
+
+ static ThreadLocal<NumberBytesCodec> localCodec =
+ new ThreadLocal<NumberBytesCodec>();
+
+ // ============================================================================
+
+ private TrieDictionaryForest<T> dict;
+
+ private BytesConverter<T> converter;
+
+ public NumberDictionaryForest() {
+ }
+
+ public NumberDictionaryForest(TrieDictionaryForest<T> dict, BytesConverter<T> converter) {
+ this.dict = dict;
+ this.converter = converter;
+ }
+
+ protected NumberBytesCodec getCodec() {
+ NumberBytesCodec codec = localCodec.get();
+ if (codec == null) {
+ codec = new NumberBytesCodec(MAX_DIGITS_BEFORE_DECIMAL_POINT);
+ localCodec.set(codec);
+ }
+ return codec;
+ }
+
+ @Override
+ public int getMinId() {
+ return dict.getMinId();
+ }
+
+ @Override
+ public int getMaxId() {
+ return dict.getMaxId();
+ }
+
+ @Override
+ public int getSizeOfId() {
+ return dict.getSizeOfId();
+ }
+
+ @Override
+ public int getSizeOfValue() {
+ return dict.getSizeOfValue();
+ }
+
+ @Override
+ public boolean contains(Dictionary<?> another) {
+ return dict.contains(another);
+ }
+
+ @Override
+ protected int getIdFromValueImpl(T value, int roundingFlag) {
+ if (value == null) return -1;
+ byte[] data = converter.convertToBytes(value);
+ return getIdFromValueBytesImpl(data, 0, data.length, roundingFlag);
+ }
+
+ @Override
+ protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+ NumberBytesCodec codec = getCodec();
+ codec.encodeNumber(value, offset, len);
+ return this.dict.getIdFromValueBytesImpl(codec.buf, codec.bufOffset, codec.bufLen, roundingFlag);
+ }
+
+ @Override
+ protected T getValueFromIdImpl(int id) {
+ byte[] data = getValueBytesFromIdImpl(id);
+ if (data == null) return null;
+ else return converter.convertFromBytes(data, 0, data.length);
+ }
+
+ @Override
+ protected byte[] getValueBytesFromIdImpl(int id) {
+ NumberBytesCodec codec = getCodec();
+ codec.bufOffset = 0;
+ byte[] buf = new byte[dict.getSizeOfValue()];
+ codec.bufLen = getValueBytesFromIdImpl(id, buf, 0);
+
+ if (codec.bufLen == buf.length) {
+ return buf;
+ } else {
+ byte[] result = new byte[codec.bufLen];
+ System.arraycopy(buf, 0, result, 0, codec.bufLen);
+ return result;
+ }
+ }
+
+ @Override
+ protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset) {
+ NumberBytesCodec codec = getCodec();
+ codec.bufOffset = 0;
+ codec.bufLen = this.dict.getValueBytesFromIdImpl(id, codec.buf, 0);
+ return codec.decodeNumber(returnValue, offset);
+ }
+
+ @Override
+ public void dump(PrintStream out) {
+ dict.dump(out);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ dict.write(out);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.dict = new TrieDictionaryForest<>();
+ dict.readFields(in);
+ this.converter = this.dict.getBytesConvert();
+ }
+
+ public BytesConverter<T> getConverter() {
+ return converter;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
new file mode 100644
index 0000000..5444bb7
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+public class NumberDictionaryForestBuilder<T> {
+
+ private TrieDictionaryForestBuilder<T> trieBuilder;
+
+ private BytesConverter<T> bytesConverter;
+
+ private NumberDictionaryForest.NumberBytesCodec codec = new NumberDictionaryForest.NumberBytesCodec(
+ NumberDictionaryForest.MAX_DIGITS_BEFORE_DECIMAL_POINT);
+
+ public NumberDictionaryForestBuilder(BytesConverter<T> bytesConverter) {
+ this(bytesConverter, 0);
+ }
+
+ public NumberDictionaryForestBuilder(BytesConverter<T> bytesConverter, int baseId) {
+ this.trieBuilder = new TrieDictionaryForestBuilder<T>(bytesConverter, baseId);
+ this.bytesConverter = bytesConverter;
+ }
+
+ public void addValue(T value) {
+ addValue(bytesConverter.convertToBytes(value));
+ }
+
+ public void addValue(byte[] value) {
+ codec.encodeNumber(value, 0, value.length);
+ byte[] copy = Bytes.copy(codec.buf, codec.bufOffset, codec.bufLen);
+ this.trieBuilder.addValue(copy);
+ }
+
+ public NumberDictionaryForest<T> build() {
+ TrieDictionaryForest<T> forest = trieBuilder.build();
+ return new NumberDictionaryForest<T>(forest, bytesConverter);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
new file mode 100755
index 0000000..e9ccc56
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * use trie forest to optimize trie dictionary
+ * <p>
+ * the input data must in an increase order(sort by org.apache.kylin.dict.ByteComparator)
+ * <p>
+ * Created by xiefan on 16-10-26.
+ */
+public class TrieDictionaryForest<T> extends Dictionary<T> {
+
+ private static final Logger logger = LoggerFactory.getLogger(TrieDictionaryForest.class);
+
+ private ArrayList<TrieDictionary<T>> trees;
+
+ //private ArrayList<byte[]> valueDivide; //find tree
+
+ private ArrayList<ByteArray> valueDivide;
+
+ private ArrayList<Integer> accuOffset; //find tree
+
+ private BytesConverter<T> bytesConvert;
+
+ private int baseId;
+
+ /*public AtomicLong getValueIndexTime = new AtomicLong(0);
+
+ public AtomicLong getValueTime = new AtomicLong(0);
+
+ public AtomicLong binarySearchTime = new AtomicLong(0);
+
+ public AtomicLong copyTime = new AtomicLong(0);
+
+ public AtomicLong getValueIndexTime2 = new AtomicLong(0);
+
+ public AtomicLong getValueTime2 = new AtomicLong(0);*/
+
+ public TrieDictionaryForest() { // default constructor for Writable interface
+
+ }
+
+ public TrieDictionaryForest(ArrayList<TrieDictionary<T>> trees,
+ ArrayList<ByteArray> valueDivide, ArrayList<Integer> accuOffset, BytesConverter<T> bytesConverter, int baseId) {
+ this.trees = trees;
+ this.valueDivide = valueDivide;
+ this.accuOffset = accuOffset;
+ this.bytesConvert = bytesConverter;
+ this.baseId = baseId;
+ }
+
+
+ @Override
+ public int getMinId() {
+ if (trees.isEmpty()) return -1;
+ return trees.get(0).getMinId() + baseId;
+ }
+
+ @Override
+ public int getMaxId() {
+ if (trees.isEmpty()) return -1;
+ int index = trees.size() - 1;
+ int id = accuOffset.get(index) + trees.get(index).getMaxId() + baseId;
+ return id;
+ }
+
+ @Override
+ public int getSizeOfId() {
+ if (trees.isEmpty()) return -1;
+ int maxOffset = accuOffset.get(accuOffset.size() - 1);
+ TrieDictionary<T> lastTree = trees.get(trees.size() - 1);
+ int sizeOfId = BytesUtil.sizeForValue(baseId + maxOffset + lastTree.getMaxId() + 1);
+ return sizeOfId;
+ }
+
+ @Override
+ public int getSizeOfValue() {
+ int maxValue = -1;
+ for (TrieDictionary<T> tree : trees)
+ maxValue = Math.max(maxValue, tree.getSizeOfValue());
+ return maxValue;
+ }
+
+ //value --> id
+ @Override
+ protected int getIdFromValueImpl(T value, int roundingFlag)
+ throws IllegalArgumentException {
+ byte[] valueBytes = bytesConvert.convertToBytes(value);
+ return getIdFromValueBytesImpl(valueBytes, 0, valueBytes.length, roundingFlag);
+ }
+
+
+ //id = tree_inner_offset + accumulate_offset + baseId
+ @Override
+ protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag)
+ throws IllegalArgumentException {
+
+ //long startTime = System.currentTimeMillis();
+ ByteArray search = new ByteArray(value, offset, len);
+ //copyTime.addAndGet(System.currentTimeMillis() - startTime);
+ int index = findIndexByValue(search);
+ //int index = findIndexByValue(value);
+ //binarySearchTime.addAndGet(System.currentTimeMillis() - startTime);
+ if (index < 0) {
+ //System.out.println("value divide:"+valueDivide.size()+" "+valueDivide);
+ throw new IllegalArgumentException("Tree Not Found. index < 0.Value:" + new String(Arrays.copyOfRange(value, offset, len)));
+ }
+ TrieDictionary<T> tree = trees.get(index);
+ //getValueIndexTime.addAndGet(System.currentTimeMillis() - startTime);
+ //startTime = System.currentTimeMillis();
+ int id = tree.getIdFromValueBytes(value, offset, len, roundingFlag);
+ id = id + accuOffset.get(index);
+ id += baseId;
+ //getValueTime.addAndGet(System.currentTimeMillis() - startTime);
+ return id;
+ }
+
+ //id --> value
+ @Override
+ protected T getValueFromIdImpl(int id) throws IllegalArgumentException {
+ //System.out.println("here");
+ byte[] data = getValueBytesFromIdImpl(id);
+ if (data != null) {
+ return bytesConvert.convertFromBytes(data, 0, data.length);
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset)
+ throws IllegalArgumentException {
+ //long startTime = System.currentTimeMillis();
+ int index = findIndexById(id);
+ int treeInnerOffset = getTreeInnerOffset(id, index);
+ TrieDictionary<T> tree = trees.get(index);
+ //getValueIndexTime2.addAndGet(System.currentTimeMillis() - startTime);
+ //startTime = System.currentTimeMillis();
+ int size = tree.getValueBytesFromIdImpl(treeInnerOffset, returnValue, offset);
+ //getValueTime2.addAndGet(System.currentTimeMillis() - startTime);
+ return size;
+ }
+
+
+ @Override
+ protected byte[] getValueBytesFromIdImpl(int id) throws IllegalArgumentException {
+ int index = findIndexById(id); //lower bound
+ if (index < 0) {
+ throw new IllegalArgumentException("Tree Not Found. index < 0");
+ }
+ int treeInnerOffset = getTreeInnerOffset(id, index);
+ TrieDictionary<T> tree = trees.get(index);
+ byte[] result = tree.getValueBytesFromId(treeInnerOffset);
+ return result;
+ }
+
+
+ private int getTreeInnerOffset(int id, int index) {
+ id -= baseId;
+ id = id - accuOffset.get(index);
+ return id;
+ }
+
+ @Override
+ public void dump(PrintStream out) {
+ for (int i = 0; i < trees.size(); i++) {
+ System.out.println("----tree " + i + "--------");
+ trees.get(i).dump(out);
+ }
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ writeHead(out);
+ writeBody(out);
+ }
+
+ private void writeHead(DataOutput out) throws IOException {
+ ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+ DataOutputStream headOut = new DataOutputStream(byteBuf);
+ headOut.writeInt(baseId);
+ headOut.writeUTF(bytesConvert == null ? "" : bytesConvert.getClass().getName());
+ //write accuOffset
+ headOut.writeInt(accuOffset.size());
+ for (int i = 0; i < accuOffset.size(); i++)
+ headOut.writeInt(accuOffset.get(i));
+ //write valueDivide
+ headOut.writeInt(valueDivide.size());
+ for (int i = 0; i < valueDivide.size(); i++) {
+ ByteArray ba = valueDivide.get(i);
+ byte[] byteStr = ba.toBytes();
+ headOut.writeInt(byteStr.length);
+ headOut.write(byteStr);
+ }
+ //write tree size
+ headOut.writeInt(trees.size());
+ headOut.close();
+ byte[] head = byteBuf.toByteArray();
+ //output
+ out.writeInt(head.length);
+ out.write(head);
+ }
+
+
+ private void writeBody(DataOutput out) throws IOException {
+ for (int i = 0; i < trees.size(); i++) {
+ TrieDictionary<T> tree = trees.get(i);
+ tree.write(out);
+ }
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ try {
+ int headSize = in.readInt();
+ this.baseId = in.readInt();
+ String converterName = in.readUTF();
+ if (converterName.isEmpty() == false)
+ this.bytesConvert = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+ //init accuOffset
+ int accuSize = in.readInt();
+ this.accuOffset = new ArrayList<>();
+ for (int i = 0; i < accuSize; i++) {
+ accuOffset.add(in.readInt());
+ }
+ //init valueDivide
+ int valueDivideSize = in.readInt();
+ this.valueDivide = new ArrayList<>();
+ for (int i = 0; i < valueDivideSize; i++) {
+ int length = in.readInt();
+ byte[] buffer = new byte[length];
+ in.readFully(buffer);
+ valueDivide.add(new ByteArray(buffer, 0, buffer.length));
+ }
+ int treeSize = in.readInt();
+ this.trees = new ArrayList<>();
+ for (int i = 0; i < treeSize; i++) {
+ TrieDictionary<T> dict = new TrieDictionary<>();
+ dict.readFields(in);
+ trees.add(dict);
+ }
+ } catch (Exception e) {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public boolean contains(Dictionary other) {
+ if (other.getSize() > this.getSize()) {
+ return false;
+ }
+
+ for (int i = other.getMinId(); i <= other.getMaxId(); ++i) {
+ T v = (T) other.getValueFromId(i);
+ if (!this.containsValue(v)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public List<TrieDictionary<T>> getTrees() {
+ return Collections.unmodifiableList(this.trees);
+ }
+
+ private boolean onlyOneTree() {
+ return trees.size() == 1;
+ }
+
+ private int findIndexByValue(T value) {
+ byte[] valueBytes = bytesConvert.convertToBytes(value);
+ return findIndexByValue(new ByteArray(valueBytes, 0, valueBytes.length));
+ }
+
+ private int findIndexByValue(ByteArray value) {
+ int index = lowerBound(value, new Comparator<ByteArray>() {
+ @Override
+ public int compare(ByteArray o1, ByteArray o2) {
+ return o1.compareTo(o2);
+ }
+ }, this.valueDivide);
+ return index;
+ }
+
+ private int findIndexById(Integer id) {
+ id -= baseId;
+ int index = lowerBound(id, new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return o1.compareTo(o2);
+ }
+ }, this.accuOffset);
+ return index;
+ }
+
+
+ private static <K> int lowerBound(K lookfor, Comparator<K> comparator, ArrayList<K> list) {
+ if (list == null || list.isEmpty())
+ return 0; //return the first tree
+ int left = 0;
+ int right = list.size() - 1;
+ int mid = 0;
+ boolean found = false;
+ int comp = 0;
+ while (!found && left <= right) {
+ mid = left + (right - left) / 2;
+ comp = comparator.compare(lookfor, list.get(mid));
+ if (comp < 0)
+ right = mid - 1;
+ else if (comp > 0)
+ left = mid + 1;
+ else
+ found = true;
+ }
+ if (found) {
+ //System.out.println("look for:"+lookfor+" index:"+mid);
+ return mid;
+ } else {
+ //System.out.println("look for:"+lookfor+" index:"+Math.max(left,right));
+ return Math.min(left, right); //value may be bigger than the right tree
+ }
+ }
+
+ public static void main(String[] args) {
+ /*ArrayList<Integer> list = new ArrayList<>();
+ list.add(3);
+ list.add(10);
+ list.add(15);
+ Comparator<Integer> comp = new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return o1.compareTo(o2);
+ }
+ };
+ int[] nums = {-1,0,1,2,3,4,13,15,16};
+ for(int i : nums){
+ System.out.println("found value:"+i+" index:"+lowerBound(i,comp,list));
+ }*/
+ ArrayList<String> list = new ArrayList<>();
+ list.add("\u4e00");
+ list.add("\u4e8c");
+ list.add("\u4e09");
+ list.add("");
+ list.add("part");
+ list.add("par");
+ list.add("partition");
+ list.add("party");
+ list.add("parties");
+ list.add("paint");
+ Collections.sort(list);
+ for (String str : list) {
+ System.out.println("found value:" + str + " index:" + lowerBound(str, new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return o1.compareTo(o2);
+ }
+ }, list));
+ }
+ //System.out.println(BytesUtil.safeCompareBytes("\u4e8c".getBytes(),"\u4e09".getBytes()));
+ }
+
+ public BytesConverter<T> getBytesConvert() {
+ return bytesConvert;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
new file mode 100755
index 0000000..3c03c08
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+
+public class TrieDictionaryForestBuilder<T> {
+
+ public static int MaxTrieTreeSize = 1024 * 1024;//1M
+
+ private BytesConverter<T> bytesConverter;
+
+ private int curTreeSize = 0;
+
+ private TrieDictionaryBuilder<T> trieBuilder;
+
+ private ArrayList<TrieDictionary<T>> trees = new ArrayList<>();
+
+ private ArrayList<ByteArray> valueDivide = new ArrayList<>(); //find tree
+
+ private ArrayList<Integer> accuOffset = new ArrayList<>(); //find tree
+
+ private ByteArray previousValue = null; //value use for remove duplicate
+
+ private static final Logger logger = LoggerFactory.getLogger(TrieDictionaryForestBuilder.class);
+
+ private int baseId;
+
+ private int curOffset;
+
+
+ public TrieDictionaryForestBuilder(BytesConverter<T> bytesConverter) {
+ this(bytesConverter, 0);
+ }
+
+ public TrieDictionaryForestBuilder(BytesConverter<T> bytesConverter, int baseId) {
+ this.bytesConverter = bytesConverter;
+ this.trieBuilder = new TrieDictionaryBuilder<T>(bytesConverter);
+ this.baseId = baseId;
+ curOffset = 0;
+ //stringComparator = new ByteComparator<>(new StringBytesConverter());
+ }
+
+ public void addValue(T value) {
+ if (value == null) return;
+ byte[] valueBytes = bytesConverter.convertToBytes(value);
+ addValue(new ByteArray(valueBytes, 0, valueBytes.length));
+ }
+
+ public void addValue(byte[] value) {
+ if (value == null) return;
+ ByteArray array = new ByteArray(value, 0, value.length);
+ addValue(array);
+ }
+
+ public void addValue(ByteArray value) {
+ //System.out.println("value length:"+value.length);
+ if (value == null) return;
+ if (previousValue == null) {
+ previousValue = value;
+ } else {
+ int comp = previousValue.compareTo(value);
+ if (comp == 0) return; //duplicate value
+ if (comp > 0) {
+ //logger.info("values not in ascending order");
+ //System.out.println(".");
+ }
+ }
+ this.trieBuilder.addValue(value.array());
+ previousValue = value;
+ this.curTreeSize += value.length();
+ if (curTreeSize >= MaxTrieTreeSize) {
+ TrieDictionary<T> tree = trieBuilder.build(0);
+ addTree(tree);
+ reset();
+ }
+ }
+
+ public TrieDictionaryForest<T> build() {
+ if (curTreeSize != 0) { //last tree
+ TrieDictionary<T> tree = trieBuilder.build(0);
+ addTree(tree);
+ reset();
+ }
+ TrieDictionaryForest<T> forest = new TrieDictionaryForest<T>(this.trees,
+ this.valueDivide, this.accuOffset, this.bytesConverter, baseId);
+ return forest;
+ }
+
+ private void addTree(TrieDictionary<T> tree) {
+ trees.add(tree);
+ int minId = tree.getMinId();
+ accuOffset.add(curOffset);
+ byte[] valueBytes = tree.getValueBytesFromId(minId);
+ valueDivide.add(new ByteArray(valueBytes, 0, valueBytes.length));
+ curOffset += (tree.getMaxId() + 1);
+ //System.out.println(" curOffset:"+ curOffset);
+ }
+
+ private void reset() {
+ curTreeSize = 0;
+ trieBuilder = new TrieDictionaryBuilder<T>(bytesConverter);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
new file mode 100755
index 0000000..624d6ba
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.dict;
+
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by xiefan on 16-10-26.
+ */
+
+public class TrieDictionaryForestTest {
+
+
+ @Test
+ public void testBasicFound() {
+ ArrayList<String> strs = new ArrayList<String>();
+ strs.add("part");
+ strs.add("par");
+ strs.add("partition");
+ strs.add("party");
+ strs.add("parties");
+ strs.add("paint");
+ Collections.sort(strs);
+ int baseId = 0;
+ TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId);
+ TrieDictionaryForest<String> dict = builder.build();
+ dict.dump(System.out);
+ int expectId = baseId;
+ for (String s : strs) {
+ System.out.println("value:" + s + " expect id:" + expectId);
+ assertEquals(expectId, dict.getIdFromValue(s));
+ expectId++;
+ }
+ System.out.println("test ok");
+ }
+
+ @Test //one string one tree
+ public void testMultiTree() {
+ ArrayList<String> strs = new ArrayList<String>();
+ strs.add("part");
+ strs.add("par");
+ strs.add("partition");
+ strs.add("party");
+ strs.add("parties");
+ strs.add("paint");
+ strs.add("\u4e00\u4e8c\u4e09"); //Chinese test
+ strs.add("\u56db\u4e94\u516d");
+ strs.add("");
+ Collections.sort(strs, new ByteComparator<String>(new StringBytesConverter()));
+ int baseId = 5;
+ int maxTreeSize = 0;
+ TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId, maxTreeSize);
+ TrieDictionaryForest<String> dict = builder.build();
+ dict.dump(System.out);
+ assertEquals(strs.size(), dict.getTrees().size());
+ int expectId = baseId;
+ for (String s : strs) {
+ System.out.println("value:" + s + " expect id:" + expectId);
+ assertEquals(expectId, dict.getIdFromValue(s));
+ expectId++;
+ }
+ System.out.println("test ok");
+ }
+
+ public void duplicateDataTest() {
+ //todo
+ }
+
+ @Test
+ public void testBigDataSet() {
+ //h=generate data
+ ArrayList<String> strs = new ArrayList<>();
+ Iterator<String> it = new RandomStrings(100 * 10000).iterator();
+ int totalSize = 0;
+ final StringBytesConverter converter = new StringBytesConverter();
+ while (it.hasNext()) {
+ String str = it.next();
+ byte[] data = converter.convertToBytes(str);
+ if (data != null) {
+ totalSize += data.length;
+ }
+ strs.add(str);
+ }
+ Collections.sort(strs);
+ int baseId = 20;
+ int maxTreeSize = totalSize / 10;
+ System.out.println("data size:" + totalSize / 1024 + "KB max tree size:" + maxTreeSize / 1024 + "KB");
+ //create the answer set
+ Map<String, Integer> idMap = rightIdMap(baseId, strs);
+ //build tree
+ TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId, maxTreeSize);
+ TrieDictionaryForest<String> dict = builder.build();
+ System.out.println("tree num:" + dict.getTrees().size());
+ //check
+ for (Map.Entry<String, Integer> entry : idMap.entrySet()) {
+ //System.out.println("my id:"+dict.getIdFromValue(entry.getKey())+" right id:"+entry.getValue());
+ assertEquals(0, dict.getIdFromValue(entry.getKey()) - entry.getValue());
+ assertEquals(entry.getKey(), dict.getValueFromId(entry.getValue()));
+ }
+ }
+
+ @Test
+ public void partOverflowTest() {
+ ArrayList<String> str = new ArrayList<String>();
+ // str.add("");
+ str.add("part");
+ str.add("par");
+ str.add("partition");
+ str.add("party");
+ str.add("parties");
+ str.add("paint");
+ String longStr = "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
+ + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk";
+ System.out.println("The length of the long string is " + longStr.length());
+ str.add(longStr);
+
+ str.add("zzzzzz" + longStr);// another long string
+ int baseId = 10;
+ int maxSize = 100 * 1024 * 1024;
+ TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId, maxSize);
+ TrieDictionaryForest<String> dict = b.build();
+ TreeSet<String> set = new TreeSet<String>();
+ for (String s : str) {
+ set.add(s);
+ }
+ // test basic id<==>value
+ Iterator<String> it = set.iterator();
+ int id = 0;
+ int previousId = -1;
+ for (; it.hasNext(); id++) {
+ String value = it.next();
+
+ // in case of overflow parts, there exist interpolation nodes
+ // they exist to make sure that any node's part is shorter than 255
+ int actualId = dict.getIdFromValue(value);
+ assertTrue(actualId >= id);
+ assertTrue(actualId > previousId);
+ previousId = actualId;
+
+ assertEquals(value, dict.getValueFromId(actualId));
+ }
+ }
+
+ @Test
+ public void notFoundTest() {
+ ArrayList<String> str = new ArrayList<String>();
+ str.add("part");
+ str.add("par");
+ str.add("partition");
+ str.add("party");
+ str.add("parties");
+ str.add("paint");
+ Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+
+ ArrayList<String> notFound = new ArrayList<String>();
+ notFound.add("");
+ notFound.add("p");
+ notFound.add("pa");
+ notFound.add("pb");
+ notFound.add("parti");
+ notFound.add("partz");
+ notFound.add("partyz");
+
+ testStringDictionary(str, notFound);
+ }
+
+
+ @Test
+ public void dictionaryContainTest() {
+ ArrayList<String> str = new ArrayList<String>();
+ str.add("part");
+ str.add("part"); // meant to be dup
+ str.add("par");
+ str.add("partition");
+ str.add("party");
+ str.add("parties");
+ str.add("paint");
+ Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+ int baseId = new Random().nextInt(100);
+ TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId);
+ TrieDictionaryForest<String> dict = b.build();
+ str.add("py");
+ Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+ b = newDictBuilder(str, baseId);
+ baseId = new Random().nextInt(100);
+ TrieDictionaryForest<String> dict2 = b.build();
+
+ assertEquals(true, dict2.contains(dict));
+ assertEquals(false, dict.contains(dict2));
+ }
+
+ @Test
+ public void englishWordsTest() throws Exception {
+ InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
+ ArrayList<String> str = loadStrings(is);
+ Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+ testStringDictionary(str, null);
+ }
+
+ @Test
+ public void categoryNamesTest() throws Exception {
+ InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
+ ArrayList<String> str = loadStrings(is);
+ Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+ testStringDictionary(str, null);
+ }
+
+ @Test
+ public void serializeTest() {
+ ArrayList<String> testData = getTestData(10);
+ TrieDictionaryForestBuilder<String> b = newDictBuilder(testData, 10, 0);
+ TrieDictionaryForest<String> dict = b.build();
+ dict = testSerialize(dict);
+ dict.dump(System.out);
+ for (String str : testData) {
+ assertEquals(str, dict.getValueFromId(dict.getIdFromValue(str)));
+ }
+ }
+
+
+ private static TrieDictionaryForest<String> testSerialize(TrieDictionaryForest<String> dict) {
+ try {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ DataOutputStream dataout = new DataOutputStream(bout);
+ dict.write(dataout);
+ dataout.close();
+ ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+ DataInputStream datain = new DataInputStream(bin);
+ TrieDictionaryForest<String> r = new TrieDictionaryForest<>();
+ //r.dump(System.out);
+ r.readFields(datain);
+ //r.dump(System.out);
+ datain.close();
+ return r;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*@Test
+ public void getIdFromValueBytesTest() throws Exception{
+ String value = "\u4e00\u4e8c\u4e09";
+ BytesConverter<String> converter = new StringBytesConverter();
+ TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<>(converter,0);
+ b.addValue(value);
+ TrieDictionaryForest<String> dict = b.build();
+ dict.dump(System.out);
+ byte[] data = converter.convertToBytes(value);
+ int id = dict.getIdFromValueBytes(data,0,data.length);
+
+ }*/
+
+ //benchmark
+ @Deprecated
+ public void memoryUsageBenchmarkTest() throws Exception {
+ //create data
+ ArrayList<String> testData = getTestData((int) (Integer.MAX_VALUE * 0.8 / 640));
+ int testTimes = 1;
+ System.out.println("start memory:" + Runtime.getRuntime().maxMemory());
+ System.out.println("start memory:" + Runtime.getRuntime().totalMemory());
+ for (int i = 0; i < testTimes; i++) {
+ long start = MemoryBudgetController.gcAndGetSystemAvailMB();
+ TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<>(new StringBytesConverter());
+ for (String str : testData)
+ b.addValue(str);
+ long end = MemoryBudgetController.gcAndGetSystemAvailMB();
+ System.out.println("object trie memory usage:" + (end - start) + "MB");
+ System.out.println("start memory:" + Runtime.getRuntime().maxMemory());
+ System.out.println("start memory:" + Runtime.getRuntime().totalMemory());
+ /*System.out.println(b == null);
+ startMemUse = getSystemCurUsedMemory();
+ TrieDictionary<String> dict = b.build(0);
+ memUse = getSystemCurUsedMemory();
+ System.out.println("array trie memory usage:"+(memUse-startMemUse)/(1024*1024)+"MB");
+ System.out.println(b == null );
+ System.out.println(dict == null);*/
+ }
+
+
+ }
+
+ @Deprecated
+ private long getSystemCurUsedMemory() throws Exception {
+ System.gc();
+ Thread.currentThread().sleep(1000);
+ long totalMem = Runtime.getRuntime().totalMemory();
+ long useMem = totalMem - Runtime.getRuntime().freeMemory();
+ return useMem;
+ }
+
+ //@Test
+ public void buildTimeBenchmarkTest() throws Exception {
+ //create data
+ ArrayList<String> testData = getTestData((int) (Integer.MAX_VALUE * 0.8 / 640));
+ //build time compare
+ int testTimes = 5;
+ long oldDictTotalBuildTime = 0;
+ long newDictTotalBuildTime = 0;
+
+ //old dict
+ System.gc();
+ Thread.currentThread().sleep(1000);
+ for (int i = 0; i < testTimes; i++) {
+ int keep = 0;
+ long startTime = System.currentTimeMillis();
+ TrieDictionaryBuilder<String> oldTrieBuilder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+ for (String str : testData)
+ oldTrieBuilder.addValue(str);
+ TrieDictionary<String> oldDict = oldTrieBuilder.build(0);
+ keep |= oldDict.getIdFromValue(testData.get(0));
+ oldDictTotalBuildTime += (System.currentTimeMillis() - startTime);
+ System.out.println("times:" + i);
+ }
+
+ //new dict
+ System.gc();
+ Thread.currentThread().sleep(1000);
+ for (int i = 0; i < testTimes; i++) {
+ int keep = 0;
+ long startTime = System.currentTimeMillis();
+ BytesConverter<String> converter = new StringBytesConverter();
+ TrieDictionaryForestBuilder<String> newTrieBuilder = new TrieDictionaryForestBuilder<String>(converter, 0);
+ for (String str : testData)
+ newTrieBuilder.addValue(str);
+ TrieDictionaryForest<String> newDict = newTrieBuilder.build();
+ keep |= newDict.getIdFromValue(testData.get(0));
+ newDictTotalBuildTime += (System.currentTimeMillis() - startTime);
+ System.out.println("times:" + i);
+ }
+
+
+ System.out.println("compare build time. Old trie : " + oldDictTotalBuildTime / 1000.0 + "s.New trie : " + newDictTotalBuildTime / 1000.0 + "s");
+ }
+
+
+ @Test
+ public void queryTimeBenchmarkTest() throws Exception {
+ int count = (int) (Integer.MAX_VALUE * 0.8 / 640);
+ //int count = (int) (2);
+ benchmarkStringDictionary(new RandomStrings(count));
+ }
+
+
+ private void evaluateDataSize(ArrayList<String> list) {
+ long size = 0;
+ for (String str : list)
+ size += str.getBytes().length;
+ System.out.println("test data size : " + size / (1024 * 1024) + " MB");
+ }
+
+ private void evaluateDataSize(int count) {
+ RandomStrings rs = new RandomStrings(count);
+ Iterator<String> itr = rs.iterator();
+ long bytesCount = 0;
+ while (itr.hasNext())
+ bytesCount += itr.next().getBytes().length;
+ System.out.println("test data size : " + bytesCount / (1024 * 1024) + " MB");
+ }
+
+ private static void benchmarkStringDictionary(Iterable<String> str) throws IOException {
+ //System.out.println("test values:");
+ Iterator<String> itr = str.iterator();
+ ArrayList<String> testData = new ArrayList<>();
+ while (itr.hasNext())
+ testData.add(itr.next());
+ Collections.sort(testData);
+ TrieDictionaryForestBuilder<String> b = newDictBuilder(testData, 0);
+ TrieDictionaryForest<String> dict = b.build();
+ System.out.println("tree size:" + dict.getTrees().size());
+ BytesConverter<String> converter = new StringBytesConverter();
+ TreeSet<String> set = new TreeSet<String>();
+ for (String s : testData) {
+ set.add(s);
+ }
+ //System.out.println("print set");
+ //System.out.println(set);
+ //dict.dump(System.out);
+ // prepare id==>value array and value==>id map
+ HashMap<String, Integer> map = new HashMap<String, Integer>();
+ String[] strArray = new String[set.size()];
+ byte[][] array = new byte[set.size()][];
+ Iterator<String> it = set.iterator();
+ for (int id = 0; it.hasNext(); id++) {
+ String value = it.next();
+ map.put(value, id);
+ strArray[id] = value;
+ //array[id] = value.getBytes("UTF-8");
+ array[id] = converter.convertToBytes(value);
+ }
+
+
+ // System.out.println("Dict size in bytes: " +
+ //MemoryUtil.deepMemoryUsageOf(dict));
+ // System.out.println("Map size in bytes: " +
+ // MemoryUtil.deepMemoryUsageOf(map));
+ // System.out.println("Array size in bytes: " +
+ // MemoryUtil.deepMemoryUsageOf(strArray));
+
+ // warm-up, said that code only got JIT after run 1k-10k times,
+ // following jvm options may help
+ // -XX:CompileThreshold=1500
+ // -XX:+PrintCompilation
+ System.out.println("Benchmark awaitig...");
+ benchmark("Warm up", dict, set, map, strArray, array);
+ benchmark("Benchmark", dict, set, map, strArray, array);
+ }
+
+ private static int benchmark(String msg, TrieDictionaryForest<String> dict, TreeSet<String> set, HashMap<String, Integer> map, String[] strArray, byte[][] array) {
+ int n = set.size();
+ int times = Math.max(10 * 1000 * 1000 / n, 1); // run 10 million lookups
+ int keep = 0; // make sure JIT don't OPT OUT function calls under test
+ byte[] valueBytes = new byte[dict.getSizeOfValue()];
+ long start;
+
+ // benchmark value==>id, via HashMap
+ System.out.println(msg + " HashMap lookup value==>id");
+ start = System.currentTimeMillis();
+ for (int i = 0; i < times; i++) {
+ for (int j = 0; j < n; j++) {
+ keep |= map.get(strArray[j]);
+ }
+ }
+ long timeValueToIdByMap = System.currentTimeMillis() - start;
+ System.out.println(timeValueToIdByMap);
+
+ // benchmark value==>id, via Dict
+ System.out.println(msg + " Dictionary lookup value==>id");
+ //dict.dump(System.out);
+
+ start = System.currentTimeMillis();
+ for (int i = 0; i < times; i++) {
+ for (int j = 0; j < n; j++) {
+ //System.out.println("looking for value:"+new String(array[j]));
+ keep |= dict.getIdFromValueBytes(array[j], 0, array[j].length);
+ }
+ }
+ long timeValueToIdByDict = System.currentTimeMillis() - start;
+ System.out.println(timeValueToIdByDict);
+ /*System.out.println("detail time. get index time"+dict.getValueIndexTime.get()+" get value time"+
+ dict.getValueTime.get() +" binary search time:"+dict.binarySearchTime.get() + " copy time:"+
+ dict.copyTime.get());*/
+
+ // benchmark id==>value, via Array
+ System.out.println(msg + " Array lookup id==>value");
+ start = System.currentTimeMillis();
+ for (int i = 0; i < times; i++) {
+ for (int j = 0; j < n; j++) {
+ keep |= strArray[j].length();
+ }
+ }
+ long timeIdToValueByArray = System.currentTimeMillis() - start;
+ System.out.println(timeIdToValueByArray);
+
+ // benchmark id==>value, via Dict
+ System.out.println(msg + " Dictionary lookup id==>value");
+ start = System.currentTimeMillis();
+ for (int i = 0; i < times; i++) {
+ for (int j = 0; j < n; j++) {
+ keep |= dict.getValueBytesFromId(j, valueBytes, 0);
+ }
+ }
+ long timeIdToValueByDict = System.currentTimeMillis() - start;
+ System.out.println(timeIdToValueByDict);
+ /*System.out.println("detail time. get index time"+dict.getValueIndexTime2.get()+" get value time"+
+ dict.getValueTime2.get());*/
+
+ return keep;
+ }
+
+ private static void testStringDictionary(ArrayList<String> str, ArrayList<String> notFound) {
+ int baseId = new Random().nextInt(100);
+ TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId, 2);
+ TrieDictionaryForest<String> dict = b.build();
+ //dict.dump(System.out);
+ TreeSet<String> set = new TreeSet<String>();
+ for (String s : str) {
+ set.add(s);
+ }
+
+ // test serialize
+ //dict = testSerialize(dict);
+
+ // test basic id<==>value
+ Iterator<String> it = set.iterator();
+ int id = baseId;
+ for (; it.hasNext(); id++) {
+ String value = it.next();
+ // System.out.println("checking " + id + " <==> " + value);
+
+ assertEquals(id, dict.getIdFromValue(value));
+ assertEquals(value, dict.getValueFromId(id));
+ }
+
+ //test not found value
+ if (notFound != null) {
+ for (String s : notFound) {
+ try {
+ int nullId = dict.getIdFromValue(s);
+ System.out.println("null value id:" + nullId);
+ fail("For not found value '" + s + "', IllegalArgumentException is expected");
+ } catch (IllegalArgumentException e) {
+ // good
+ }
+ }
+ }
+ int maxId = dict.getMaxId();
+ int[] notExistIds = {-10, -20, -Integer.MIN_VALUE, -Integer.MAX_VALUE, maxId + 1, maxId + 2};
+ for (Integer i : notExistIds) {
+ try {
+ dict.getValueFromId(i);
+ fail("For not found id '" + i + "', IllegalArgumentException is expected");
+ } catch (IllegalArgumentException e) {
+ // good
+ }
+ }
+
+ // test null value
+ int nullId = dict.getIdFromValue(null);
+ assertNull(dict.getValueFromId(nullId));
+ int nullId2 = dict.getIdFromValueBytes(null, 0, 0);
+ assertEquals(dict.getValueBytesFromId(nullId2, null, 0), -1);
+ assertEquals(nullId, nullId2);
+ }
+
+ private Map<String, Integer> rightIdMap(int baseId, ArrayList<String> strs) {
+ Map<String, Integer> result = new HashMap<>();
+ int expectId = baseId;
+ for (String str : strs) {
+ result.put(str, expectId);
+ expectId++;
+ }
+ return result;
+ }
+
+ private static TrieDictionaryForestBuilder<String> newDictBuilder(Iterable<String> strs, int baseId) {
+ TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), baseId);
+ for (String s : strs)
+ b.addValue(s);
+ return b;
+ }
+
+ private static TrieDictionaryForestBuilder<String> newDictBuilder(Iterable<String> strs, int baseId, int treeSize) {
+ TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), baseId);
+ TrieDictionaryForestBuilder.MaxTrieTreeSize = treeSize;
+ for (String s : strs)
+ b.addValue(s);
+ return b;
+ }
+
+ private static class RandomStrings implements Iterable<String> {
+ final private int size;
+
+ public RandomStrings(int size) {
+ this.size = size;
+ //System.out.println("size = " + size);
+ }
+
+ @Override
+ public Iterator<String> iterator() {
+ return new Iterator<String>() {
+ Random rand = new Random(System.currentTimeMillis());
+ int i = 0;
+
+ @Override
+ public boolean hasNext() {
+ return i < size;
+ }
+
+ @Override
+ public String next() {
+ if (hasNext() == false)
+ throw new NoSuchElementException();
+
+ i++;
+ //if (i % 1000000 == 0)
+ //System.out.println(i);
+
+ return nextString();
+ }
+
+ private String nextString() {
+ StringBuffer buf = new StringBuffer();
+ for (int i = 0; i < 64; i++) {
+ int v = rand.nextInt(16);
+ char c;
+ if (v >= 0 && v <= 9)
+ c = (char) ('0' + v);
+ else
+ c = (char) ('a' + v - 10);
+ buf.append(c);
+ }
+ return buf.toString();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ }
+
+ private static ArrayList<String> loadStrings(InputStream is) throws Exception {
+ ArrayList<String> r = new ArrayList<String>();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ try {
+ String word;
+ while ((word = reader.readLine()) != null) {
+ word = word.trim();
+ if (word.isEmpty() == false)
+ r.add(word);
+ }
+ } finally {
+ reader.close();
+ is.close();
+ }
+ return r;
+ }
+
+
+ private ArrayList<String> getTestData(int count) {
+ RandomStrings rs = new RandomStrings(count);
+ Iterator<String> itr = rs.iterator();
+ ArrayList<String> testData = new ArrayList<>();
+ while (itr.hasNext())
+ testData.add(itr.next());
+ Collections.sort(testData, new ByteComparator<String>(new StringBytesConverter()));
+ evaluateDataSize(testData);
+ return testData;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
new file mode 100644
index 0000000..dfc6b2c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ */
+public class FactDistinctColumnPartitioner2 extends Partitioner<SelfDefineSortableKey, Text> {
+ private Configuration conf;
+
+ @Override
+ public int getPartition(SelfDefineSortableKey key, Text value, int numReduceTasks) {
+
+ if (key.getText().getBytes()[0] == FactDistinctHiveColumnsMapper2.MARK_FOR_HLL) {
+ // the last reducer is for merging hll
+ return numReduceTasks - 1;
+ } else if (key.getText().getBytes()[0] == FactDistinctHiveColumnsMapper2.MARK_FOR_PARTITION_COL) {
+ // the last reducer is for merging hll
+ return numReduceTasks - 2;
+ } else {
+ int colIndex = BytesUtil.readUnsigned(key.getText().getBytes(), 0, 1);
+ return colIndex;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
new file mode 100644
index 0000000..6ff07f0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinReducer;
+
+import java.io.IOException;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctColumnsCombiner2 extends KylinReducer<SelfDefineSortableKey, Text, Text, Text> {
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ }
+
+ @Override
+ public void reduce(SelfDefineSortableKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+ // for hll, each key only has one output, no need to do local combine;
+ // for normal col, values are empty text
+ context.write(key.getText(), values.iterator().next());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
new file mode 100644
index 0000000..4d26402
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ */
+public class FactDistinctColumnsJob2 extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsJob2.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_CUBING_JOB_ID);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_STATISTICS_ENABLED);
+ options.addOption(OPTION_STATISTICS_OUTPUT);
+ options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+ job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
+ String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
+ String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+
+ // ----------------------------------------------------------------------------
+ // add metadata to distributed cache
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
+
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+ logger.info("Starting: " + job.getJobName());
+ logger.info("using FactDistinctColumnsJob2");
+
+ setJobClasspath(job, cube.getConfig());
+
+ CubeSegment segment = cube.getSegmentById(segmentID);
+ if (segment == null) {
+ logger.error("Failed to find {} in cube {}", segmentID, cube);
+ System.out.println("Failed to find {} in cube {} " + segmentID + "," + cube);
+ for (CubeSegment s : cube.getSegments()) {
+ logger.error(s.getName() + " with status " + s.getStatus());
+ System.out.println(s.getName() + " with status " + s.getStatus());
+ }
+ throw new IllegalStateException();
+ } else {
+ logger.info("Found segment: " + segment);
+ System.out.println("Found segment " + segment);
+ }
+ setupMapper(cube.getSegmentById(segmentID));
+ setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
+
+ attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+ return waitForCompletion(job);
+
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+
+ }
+
+ private void setupMapper(CubeSegment cubeSeg) throws IOException {
+ IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+ flatTableInputFormat.configureJob(job);
+
+ job.setMapperClass(FactDistinctHiveColumnsMapper2.class);
+ job.setCombinerClass(FactDistinctColumnsCombiner2.class);
+ job.setMapOutputKeyClass(SelfDefineSortableKey.class);
+ job.setMapOutputValueClass(Text.class);
+ }
+
+ private void setupReducer(Path output, int numberOfReducers) throws IOException {
+ job.setReducerClass(FactDistinctColumnsReducer.class); //reducer do not need to change
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setPartitionerClass(FactDistinctColumnPartitioner2.class);
+ job.setNumReduceTasks(numberOfReducers);
+
+ FileOutputFormat.setOutputPath(job, output);
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+ deletePath(job.getConfiguration(), output);
+ }
+
+ public static void main(String[] args) throws Exception {
+ FactDistinctColumnsJob2 job = new FactDistinctColumnsJob2();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
new file mode 100644
index 0000000..2e9a2dc
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ */
+public class FactDistinctColumnsMapperBase2<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
+
+ protected String cubeName;
+ protected CubeInstance cube;
+ protected CubeSegment cubeSeg;
+ protected CubeDesc cubeDesc;
+ protected long baseCuboidId;
+ protected List<TblColRef> factDictCols;
+ protected IMRTableInputFormat flatTableInputFormat;
+
+ protected Text outputKey = new Text();
+ protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+ protected Text outputValue = new Text();
+ protected int errorRecordCounter = 0;
+
+ protected CubeJoinedFlatTableEnrich intermediateTableDesc;
+ protected int[] dictionaryColumnIndex;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ bindCurrentConfiguration(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
+ cubeDesc = cube.getDescriptor();
+ baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
+
+ flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+
+ intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
+ dictionaryColumnIndex = new int[factDictCols.size()];
+ for (int i = 0; i < factDictCols.size(); i++) {
+ TblColRef colRef = factDictCols.get(i);
+ int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+ dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+ }
+
+ }
+
+ protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
+
+ System.err.println("Insane record: " + Arrays.toString(record));
+ ex.printStackTrace(System.err);
+
+ errorRecordCounter++;
+ if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) {
+ if (ex instanceof IOException)
+ throw (IOException) ex;
+ else if (ex instanceof RuntimeException)
+ throw (RuntimeException) ex;
+ else
+ throw new RuntimeException("", ex);
+ }
+ }
+}