You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by lu...@apache.org on 2015/01/07 15:46:55 UTC
[34/51] [partial] incubator-kylin git commit: migrate repo from
github.com to apache git
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/NumberDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/NumberDictionary.java b/dictionary/src/main/java/com/kylinolap/dict/NumberDictionary.java
new file mode 100644
index 0000000..fb3c2c1
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/NumberDictionary.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * @author yangli9
+ *
+ */
+public class NumberDictionary<T> extends TrieDictionary<T> {
+
+ public static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = 16;
+
+ // encode a number into an order preserving byte sequence
+ // for positives -- padding '0'
+ // for negatives -- '-' sign, padding '9', invert digits, and terminate by ';'
+ static class NumberBytesCodec {
+
+ byte[] buf = new byte[MAX_DIGITS_BEFORE_DECIMAL_POINT * 2];
+ int bufOffset = 0;
+ int bufLen = 0;
+
+ void encodeNumber(byte[] value, int offset, int len) {
+ if (len == 0) {
+ bufOffset = 0;
+ bufLen = 0;
+ return;
+ }
+
+ if (len > buf.length) {
+ throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Internal buffer is only " + buf.length + " bytes");
+ }
+
+ boolean negative = value[offset] == '-';
+
+ // terminate negative ';'
+ int start = buf.length - len;
+ int end = buf.length;
+ if (negative) {
+ start--;
+ end--;
+ buf[end] = ';';
+ }
+
+ // copy & find decimal point
+ int decimalPoint = end;
+ for (int i = start, j = offset; i < end; i++, j++) {
+ buf[i] = value[j];
+ if (buf[i] == '.' && i < decimalPoint) {
+ decimalPoint = i;
+ }
+ }
+ // remove '-' sign
+ if (negative) {
+ start++;
+ }
+
+ // prepend '0'
+ int nZeroPadding = MAX_DIGITS_BEFORE_DECIMAL_POINT - (decimalPoint - start);
+ if (nZeroPadding < 0 || nZeroPadding + 1 > start)
+ throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Expect " + MAX_DIGITS_BEFORE_DECIMAL_POINT + " digits before decimal point at max.");
+ for (int i = 0; i < nZeroPadding; i++) {
+ buf[--start] = '0';
+ }
+
+ // consider negative
+ if (negative) {
+ buf[--start] = '-';
+ for (int i = start + 1; i < buf.length; i++) {
+ int c = buf[i];
+ if (c >= '0' && c <= '9') {
+ buf[i] = (byte) ('9' - (c - '0'));
+ }
+ }
+ } else {
+ buf[--start] = '0';
+ }
+
+ bufOffset = start;
+ bufLen = buf.length - start;
+ }
+
+ int decodeNumber(byte[] returnValue, int offset) {
+ if (bufLen == 0) {
+ return 0;
+ }
+
+ int in = bufOffset;
+ int end = bufOffset + bufLen;
+ int out = offset;
+
+ // sign
+ boolean negative = buf[in] == '-';
+ if (negative) {
+ returnValue[out++] = '-';
+ in++;
+ end--;
+ }
+
+ // remove padding
+ byte padding = (byte) (negative ? '9' : '0');
+ for (; in < end; in++) {
+ if (buf[in] != padding)
+ break;
+ }
+
+ // all paddings before '.', special case for '0'
+ if (in == end || !(buf[in] >= '0' && buf[in] <= '9')) {
+ returnValue[out++] = '0';
+ }
+
+ // copy the rest
+ if (negative) {
+ for (; in < end; in++, out++) {
+ int c = buf[in];
+ if (c >= '0' && c <= '9') {
+ c = '9' - (c - '0');
+ }
+ returnValue[out] = (byte) c;
+ }
+ } else {
+ System.arraycopy(buf, in, returnValue, out, end - in);
+ out += end - in;
+ }
+
+ return out - offset;
+ }
+ }
+
+ static ThreadLocal<NumberBytesCodec> localCodec = new ThreadLocal<NumberBytesCodec>();
+
+ // ============================================================================
+
+ public NumberDictionary() { // default constructor for Writable interface
+ super();
+ }
+
+ public NumberDictionary(byte[] trieBytes) {
+ super(trieBytes);
+ }
+
+ private NumberBytesCodec getCodec() {
+ NumberBytesCodec codec = localCodec.get();
+ if (codec == null) {
+ codec = new NumberBytesCodec();
+ localCodec.set(codec);
+ }
+ return codec;
+ }
+
+ @Override
+ protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+ NumberBytesCodec codec = getCodec();
+ codec.encodeNumber(value, offset, len);
+ return super.getIdFromValueBytesImpl(codec.buf, codec.bufOffset, codec.bufLen, roundingFlag);
+ }
+
+ @Override
+ protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset) {
+ NumberBytesCodec codec = getCodec();
+ codec.bufOffset = 0;
+ codec.bufLen = super.getValueBytesFromIdImpl(id, codec.buf, 0);
+ return codec.decodeNumber(returnValue, offset);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/NumberDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/NumberDictionaryBuilder.java b/dictionary/src/main/java/com/kylinolap/dict/NumberDictionaryBuilder.java
new file mode 100644
index 0000000..9f0c931
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/NumberDictionaryBuilder.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * @author yangli9
+ *
+ */
+public class NumberDictionaryBuilder<T> extends TrieDictionaryBuilder<T> {
+
+ NumberDictionary.NumberBytesCodec codec = new NumberDictionary.NumberBytesCodec();
+
+ public NumberDictionaryBuilder(BytesConverter<T> bytesConverter) {
+ super(bytesConverter);
+ }
+
+ @Override
+ public void addValue(byte[] value) {
+ codec.encodeNumber(value, 0, value.length);
+ byte[] copy = Bytes.copy(codec.buf, codec.bufOffset, codec.bufLen);
+ super.addValue(copy);
+ }
+
+ public NumberDictionary<T> build(int baseId) {
+ byte[] trieBytes = buildTrieBytes(baseId);
+ NumberDictionary<T> r = new NumberDictionary<T>(trieBytes);
+ return r;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/StringBytesConverter.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/StringBytesConverter.java b/dictionary/src/main/java/com/kylinolap/dict/StringBytesConverter.java
new file mode 100644
index 0000000..f99b750
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/StringBytesConverter.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.dict;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class StringBytesConverter implements BytesConverter<String> {
+
+ @Override
+ public byte[] convertToBytes(String v) {
+ return Bytes.toBytes(v);
+ }
+
+ @Override
+ public String convertFromBytes(byte[] b, int offset, int length) {
+ return Bytes.toString(b, offset, length);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/TrieDictionary.java b/dictionary/src/main/java/com/kylinolap/dict/TrieDictionary.java
new file mode 100644
index 0000000..c348dbb
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/TrieDictionary.java
@@ -0,0 +1,478 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.dict;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.ref.SoftReference;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import com.kylinolap.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A dictionary based on Trie data structure that maps enumerations of byte[] to
+ * int IDs.
+ *
+ * With Trie the memory footprint of the mapping is kinda minimized at the cost
+ * CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is
+ * roughly 10 times slower, so there's a cache layer overlays on top of Trie and
+ * gracefully fall back to Trie using a weak reference.
+ *
+ * The implementation is thread-safe.
+ *
+ * @author yangli9
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class TrieDictionary<T> extends Dictionary<T> {
+
+ public static final byte[] HEAD_MAGIC = new byte[] { 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "TrieDict"
+ public static final int HEAD_SIZE_I = HEAD_MAGIC.length;
+
+ public static final int BIT_IS_LAST_CHILD = 0x80;
+ public static final int BIT_IS_END_OF_VALUE = 0x40;
+
+ private static final Logger logger = LoggerFactory.getLogger(TrieDictionary.class);
+
+ private byte[] trieBytes;
+
+ // non-persistent part
+ transient private int headSize;
+ @SuppressWarnings("unused")
+ transient private int bodyLen;
+ transient private int sizeChildOffset;
+ transient private int sizeNoValuesBeneath;
+ transient private int baseId;
+ transient private int maxValueLength;
+ transient private BytesConverter<T> bytesConvert;
+
+ transient private int nValues;
+ transient private int sizeOfId;
+ transient private int childOffsetMask;
+ transient private int firstByteOffset;
+
+ transient private boolean enableCache = true;
+ transient private SoftReference<HashMap> valueToIdCache;
+ transient private SoftReference<Object[]> idToValueCache;
+
+ public TrieDictionary() { // default constructor for Writable interface
+ }
+
+ public TrieDictionary(byte[] trieBytes) {
+ init(trieBytes);
+ }
+
+ private void init(byte[] trieBytes) {
+ this.trieBytes = trieBytes;
+ if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0)
+ throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+ try {
+ DataInputStream headIn = new DataInputStream( //
+ new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I));
+ this.headSize = headIn.readShort();
+ this.bodyLen = headIn.readInt();
+ this.sizeChildOffset = headIn.read();
+ this.sizeNoValuesBeneath = headIn.read();
+ this.baseId = headIn.readShort();
+ this.maxValueLength = headIn.readShort();
+
+ String converterName = headIn.readUTF();
+ if (converterName.isEmpty() == false)
+ this.bytesConvert = (BytesConverter<T>) Class.forName(converterName).newInstance();
+
+ this.nValues = BytesUtil.readUnsigned(trieBytes, headSize + sizeChildOffset, sizeNoValuesBeneath);
+ this.sizeOfId = BytesUtil.sizeForValue(baseId + nValues + 1); // note
+ // baseId
+ // could
+ // raise
+ // 1
+ // byte
+ // in
+ // ID
+ // space,
+ // +1
+ // to
+ // reserve
+ // all
+ // 0xFF
+ // for
+ // NULL
+ // case
+ this.childOffsetMask = ~((BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE) << ((sizeChildOffset - 1) * 8));
+ this.firstByteOffset = sizeChildOffset + sizeNoValuesBeneath + 1; // the
+ // offset
+ // from
+ // begin
+ // of
+ // node
+ // to
+ // its
+ // first
+ // value
+ // byte
+ } catch (Exception e) {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
+ }
+
+ if (enableCache) {
+ valueToIdCache = new SoftReference<HashMap>(new HashMap());
+ idToValueCache = new SoftReference<Object[]>(new Object[nValues]);
+ }
+ }
+
+ @Override
+ public int getMinId() {
+ return baseId;
+ }
+
+ @Override
+ public int getMaxId() {
+ return baseId + nValues - 1;
+ }
+
+ @Override
+ public int getSizeOfId() {
+ return sizeOfId;
+ }
+
+ @Override
+ public int getSizeOfValue() {
+ return maxValueLength;
+ }
+
+ @Override
+ final protected int getIdFromValueImpl(T value, int roundingFlag) {
+ if (enableCache && roundingFlag == 0) {
+ HashMap cache = valueToIdCache.get(); // SoftReference to skip cache
+ // gracefully when short of
+ // memory
+ if (cache != null) {
+ Integer id = null;
+ id = (Integer) cache.get(value);
+ if (id != null)
+ return id.intValue();
+
+ byte[] valueBytes = bytesConvert.convertToBytes(value);
+ id = getIdFromValueBytes(valueBytes, 0, valueBytes.length, roundingFlag);
+
+ cache.put(value, id);
+ return id;
+ }
+ }
+ byte[] valueBytes = bytesConvert.convertToBytes(value);
+ return getIdFromValueBytes(valueBytes, 0, valueBytes.length, roundingFlag);
+ }
+
+ @Override
+ protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+ int seq = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
+ int id = calcIdFromSeqNo(seq);
+ if (id < 0)
+ throw new IllegalArgumentException("Not a valid value: " + bytesConvert.convertFromBytes(value, offset, len));
+ return id;
+ }
+
+ /**
+ * returns a code point from [0, nValues), preserving order of value
+ *
+ * @param n
+ * -- the offset of current node
+ * @param inp
+ * -- input value bytes to lookup
+ * @param o
+ * -- offset in the input value bytes matched so far
+ * @param inpEnd
+ * -- end of input
+ * @param roundingFlag
+ * -- =0: return -1 if not found -- <0: return closest smaller if
+ * not found, might be -1 -- >0: return closest bigger if not
+ * found, might be nValues
+ */
+ private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
+ if (inp.length == 0) // special 'empty' value
+ return checkFlag(headSize, BIT_IS_END_OF_VALUE) ? 0 : roundSeqNo(roundingFlag, -1, -1, 0);
+
+ int seq = 0; // the sequence no under track
+
+ while (true) {
+ // match the current node, note [0] of node's value has been matched
+ // when this node is selected by its parent
+ int p = n + firstByteOffset; // start of node's value
+ int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of
+ // node's
+ // value
+ for (p++; p < end && o < inpEnd; p++, o++) { // note matching start
+ // from [1]
+ if (trieBytes[p] != inp[o]) {
+ int comp = BytesUtil.compareByteUnsigned(trieBytes[p], inp[o]);
+ if (comp < 0) {
+ seq += BytesUtil.readUnsigned(trieBytes, n + sizeChildOffset, sizeNoValuesBeneath);
+ }
+ return roundSeqNo(roundingFlag, seq - 1, -1, seq); // mismatch
+ }
+ }
+
+ // node completely matched, is input all consumed?
+ boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+ if (o == inpEnd) {
+ return p == end && isEndOfValue ? seq : roundSeqNo(roundingFlag, seq - 1, -1, seq); // input
+ // all
+ // matched
+ }
+ if (isEndOfValue)
+ seq++;
+
+ // find a child to continue
+ int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask);
+ if (c == headSize) // has no children
+ return roundSeqNo(roundingFlag, seq - 1, -1, seq); // input only
+ // partially
+ // matched
+ byte inpByte = inp[o];
+ int comp;
+ while (true) {
+ p = c + firstByteOffset;
+ comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte);
+ if (comp == 0) { // continue in the matching child, reset n and
+ // loop again
+ n = c;
+ o++;
+ break;
+ } else if (comp < 0) { // try next child
+ seq += BytesUtil.readUnsigned(trieBytes, c + sizeChildOffset, sizeNoValuesBeneath);
+ if (checkFlag(c, BIT_IS_LAST_CHILD))
+ return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no
+ // child
+ // can
+ // match
+ // the
+ // next
+ // byte
+ // of
+ // input
+ c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+ } else { // children are ordered by their first value byte
+ return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no
+ // child
+ // can
+ // match
+ // the
+ // next
+ // byte
+ // of
+ // input
+ }
+ }
+ }
+ }
+
+ private int roundSeqNo(int roundingFlag, int i, int j, int k) {
+ if (roundingFlag == 0)
+ return j;
+ else if (roundingFlag < 0)
+ return i;
+ else
+ return k;
+ }
+
+ @Override
+ final protected T getValueFromIdImpl(int id) {
+ if (enableCache) {
+ Object[] cache = idToValueCache.get(); // SoftReference to skip
+ // cache gracefully when
+ // short of memory
+ if (cache != null) {
+ int seq = calcSeqNoFromId(id);
+ if (seq < 0 || seq >= nValues)
+ throw new IllegalArgumentException("Not a valid ID: " + id);
+ if (cache[seq] != null)
+ return (T) cache[seq];
+
+ byte[] value = new byte[getSizeOfValue()];
+ int length = getValueBytesFromId(id, value, 0);
+ T result = bytesConvert.convertFromBytes(value, 0, length);
+
+ cache[seq] = result;
+ return result;
+ }
+ }
+ byte[] value = new byte[getSizeOfValue()];
+ int length = getValueBytesFromId(id, value, 0);
+ return bytesConvert.convertFromBytes(value, 0, length);
+ }
+
+ @Override
+ protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset) {
+ if (id < baseId || id >= baseId + nValues)
+ throw new IllegalArgumentException("Not a valid ID: " + id);
+
+ int seq = calcSeqNoFromId(id);
+
+ return lookupValueFromSeqNo(headSize, seq, returnValue, offset);
+ }
+
+ /**
+ * returns a code point from [0, nValues), preserving order of value, or -1
+ * if not found
+ *
+ * @param n
+ * -- the offset of current node
+ * @param seq
+ * -- the code point under track
+ * @param returnValue
+ * -- where return value is written to
+ */
+ private int lookupValueFromSeqNo(int n, int seq, byte[] returnValue, int offset) {
+ int o = offset;
+ while (true) {
+ // write current node value
+ int p = n + firstByteOffset;
+ int len = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+ System.arraycopy(trieBytes, p, returnValue, o, len);
+ o += len;
+
+ // if the value is ended
+ boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+ if (isEndOfValue) {
+ seq--;
+ if (seq < 0)
+ return o - offset;
+ }
+
+ // find a child to continue
+ int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask);
+ if (c == headSize) // has no children
+ return -1; // no child? corrupted dictionary!
+ int nValuesBeneath;
+ while (true) {
+ nValuesBeneath = BytesUtil.readUnsigned(trieBytes, c + sizeChildOffset, sizeNoValuesBeneath);
+ if (seq - nValuesBeneath < 0) { // value is under this child,
+ // reset n and loop again
+ n = c;
+ break;
+ } else { // go to next child
+ seq -= nValuesBeneath;
+ if (checkFlag(c, BIT_IS_LAST_CHILD))
+ return -1; // no more child? corrupted dictionary!
+ p = c + firstByteOffset;
+ c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+ }
+ }
+ }
+ }
+
+ private boolean checkFlag(int offset, int bit) {
+ return (trieBytes[offset] & bit) > 0;
+ }
+
+ private int calcIdFromSeqNo(int seq) {
+ if (seq < 0 || seq >= nValues)
+ return -1;
+ else
+ return baseId + seq;
+ }
+
+ private int calcSeqNoFromId(int id) {
+ return id - baseId;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.write(trieBytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE + Integer.SIZE];
+ in.readFully(headPartial);
+
+ if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0)
+ throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+ DataInputStream headIn = new DataInputStream( //
+ new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
+ int headSize = headIn.readShort();
+ int bodyLen = headIn.readInt();
+ headIn.close();
+
+ byte[] all = new byte[headSize + bodyLen];
+ System.arraycopy(headPartial, 0, all, 0, headPartial.length);
+ in.readFully(all, headPartial.length, all.length - headPartial.length);
+
+ init(all);
+ }
+
+ @Override
+ public void dump(PrintStream out) {
+ out.println("Total " + nValues + " values");
+ for (int i = 0; i < nValues; i++) {
+ int id = calcIdFromSeqNo(i);
+ T value = getValueFromId(id);
+ out.println(id + " (" + Integer.toHexString(id) + "): " + value);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(trieBytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if ((o instanceof TrieDictionary) == false) {
+ logger.info("Equals return false because o is not TrieDictionary");
+ return false;
+ }
+ TrieDictionary that = (TrieDictionary) o;
+ return Arrays.equals(this.trieBytes, that.trieBytes);
+ }
+
+ public static void main(String[] args) throws Exception {
+ TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter());
+ // b.addValue("part");
+ // b.print();
+ // b.addValue("part");
+ // b.print();
+ // b.addValue("par");
+ // b.print();
+ // b.addValue("partition");
+ // b.print();
+ // b.addValue("party");
+ // b.print();
+ // b.addValue("parties");
+ // b.print();
+ // b.addValue("paint");
+ // b.print();
+ b.addValue("-000000.41");
+ b.addValue("0000101.81");
+ b.addValue("6779331");
+ String t = "0000001.6131";
+ TrieDictionary<String> dict = b.build(0);
+
+ System.out.println(dict.getIdFromValue(t, -1));
+ System.out.println(dict.getIdFromValue(t, 1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/TrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/TrieDictionaryBuilder.java b/dictionary/src/main/java/com/kylinolap/dict/TrieDictionaryBuilder.java
new file mode 100644
index 0000000..9a6e2ce
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/TrieDictionaryBuilder.java
@@ -0,0 +1,536 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.kylinolap.dict;
+
+import static com.kylinolap.common.util.BytesUtil.*;
+import static com.kylinolap.dict.TrieDictionary.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+
+import com.kylinolap.common.util.BytesUtil;
+
+/**
+ * Builds a dictionary using Trie structure. All values are taken in byte[] form
+ * and organized in a Trie with ordering. Then numeric IDs are assigned in
+ * sequence.
+ *
+ * @author yangli9
+ */
+public class TrieDictionaryBuilder<T> {
+
+ public static class Node {
+ public byte[] part;
+ public boolean isEndOfValue;
+ public ArrayList<Node> children;
+
+ public int nValuesBeneath; // only present after stats()
+
+ Node(byte[] value, boolean isEndOfValue) {
+ reset(value, isEndOfValue);
+ }
+
+ Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
+ reset(value, isEndOfValue, children);
+ }
+
+ void reset(byte[] value, boolean isEndOfValue) {
+ reset(value, isEndOfValue, new ArrayList<Node>());
+ }
+
+ void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
+ this.part = value;
+ this.isEndOfValue = isEndOfValue;
+ this.children = children;
+ }
+ }
+
+ public static interface Visitor {
+ void visit(Node n, int level);
+ }
+
+ // ============================================================================
+
+ private Node root;
+ private BytesConverter<T> bytesConverter;
+
+ public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) {
+ this.root = new Node(new byte[0], false);
+ this.bytesConverter = bytesConverter;
+ }
+
+ public void addValue(T value) {
+ addValue(bytesConverter.convertToBytes(value));
+ }
+
+ public void addValue(byte[] value) {
+ addValueR(root, value, 0);
+ }
+
+ private void addValueR(Node node, byte[] value, int start) {
+ // match the value part of current node
+ int i = 0, j = start;
+ int n = node.part.length, nn = value.length;
+ int comp = 0;
+ for (; i < n && j < nn; i++, j++) {
+ comp = compareByteUnsigned(node.part[i], value[j]);
+ if (comp != 0)
+ break;
+ }
+
+ // if value fully matched within the current node
+ if (j == nn) {
+ // if equals to current node, just mark end of value
+ if (i == n) {
+ node.isEndOfValue = true;
+ }
+ // otherwise, split the current node into two
+ else {
+ Node c = new Node(subarray(node.part, i, n), node.isEndOfValue, node.children);
+ node.reset(subarray(node.part, 0, i), true);
+ node.children.add(c);
+ }
+ return;
+ }
+
+ // if partially matched the current, split the current node, add the new
+ // value, make a 3-way
+ if (i < n) {
+ Node c1 = new Node(subarray(node.part, i, n), node.isEndOfValue, node.children);
+ Node c2 = new Node(subarray(value, j, nn), true);
+ node.reset(subarray(node.part, 0, i), false);
+ if (comp < 0) {
+ node.children.add(c1);
+ node.children.add(c2);
+ } else {
+ node.children.add(c2);
+ node.children.add(c1);
+ }
+ return;
+ }
+
+ // out matched the current, binary search the next byte for a child node
+ // to continue
+ byte lookfor = value[j];
+ int lo = 0;
+ int hi = node.children.size() - 1;
+ int mid = 0;
+ boolean found = false;
+ comp = 0;
+ while (!found && lo <= hi) {
+ mid = lo + (hi - lo) / 2;
+ comp = compareByteUnsigned(lookfor, node.children.get(mid).part[0]);
+ if (comp < 0)
+ hi = mid - 1;
+ else if (comp > 0)
+ lo = mid + 1;
+ else
+ found = true;
+ }
+ // found a child node matching the first byte, continue in that child
+ if (found) {
+ addValueR(node.children.get(mid), value, j);
+ }
+ // otherwise, make the value a new child
+ else {
+ Node c = new Node(subarray(value, j, nn), true);
+ node.children.add(comp <= 0 ? mid : mid + 1, c);
+ }
+ }
+
+ public void traverse(Visitor visitor) {
+ traverseR(root, visitor, 0);
+ }
+
+ private void traverseR(Node node, Visitor visitor, int level) {
+ visitor.visit(node, level);
+ for (Node c : node.children)
+ traverseR(c, visitor, level + 1);
+ }
+
+ public void traversePostOrder(Visitor visitor) {
+ traversePostOrderR(root, visitor, 0);
+ }
+
+ private void traversePostOrderR(Node node, Visitor visitor, int level) {
+ for (Node c : node.children)
+ traversePostOrderR(c, visitor, level + 1);
+ visitor.visit(node, level);
+ }
+
+ public static class Stats {
+ public int nValues; // number of values in total
+ public int nValueBytesPlain; // number of bytes for all values
+ // uncompressed
+ public int nValueBytesCompressed; // number of values bytes in Trie
+ // (compressed)
+ public int maxValueLength; // size of longest value in bytes
+
+ // the trie is multi-byte-per-node
+ public int mbpn_nNodes; // number of nodes in trie
+ public int mbpn_trieDepth; // depth of trie
+ public int mbpn_maxFanOut; // the maximum no. children
+ public int mbpn_nChildLookups; // number of child lookups during lookup
+ // every value once
+ public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
+ // value once
+ public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+ public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+ public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath,
+ // depends on cardinality
+ public int mbpn_sizeChildOffset; // size of field childOffset, points to
+ // first child in flattened array
+ public int mbpn_footprint; // MBPN footprint in bytes
+
+ // stats for one-byte-per-node as well, so there's comparison
+ public int obpn_sizeValue; // size of value per node, always 1
+ public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath,
+ // depends on cardinality
+ public int obpn_sizeChildCount; // size of field childCount, enables
+ // binary search among children
+ public int obpn_sizeChildOffset; // size of field childOffset, points to
+ // first child in flattened array
+ public int obpn_nNodes; // no. nodes in OBPN trie
+ public int obpn_footprint; // OBPN footprint in bytes
+
+ public void print() {
+ PrintStream out = System.out;
+ out.println("============================================================================");
+ out.println("No. values: " + nValues);
+ out.println("No. bytes raw: " + nValueBytesPlain);
+ out.println("No. bytes in trie: " + nValueBytesCompressed);
+ out.println("Longest value length: " + maxValueLength);
+
+ // flatten trie footprint calculation, case of One-Byte-Per-Node
+ out.println("----------------------------------------------------------------------------");
+ out.println("OBPN node size: " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset);
+ out.println("OBPN no. nodes: " + obpn_nNodes);
+ out.println("OBPN trie depth: " + maxValueLength);
+ out.println("OBPN footprint: " + obpn_footprint + " in bytes");
+
+ // flatten trie footprint calculation, case of Multi-Byte-Per-Node
+ out.println("----------------------------------------------------------------------------");
+ out.println("MBPN max fan out: " + mbpn_maxFanOut);
+ out.println("MBPN no. child lookups: " + mbpn_nChildLookups);
+ out.println("MBPN total fan out: " + mbpn_nTotalFanOut);
+ out.println("MBPN average fan out: " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups);
+ out.println("MBPN values size total: " + mbpn_sizeValueTotal);
+ out.println("MBPN node size: " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset);
+ out.println("MBPN no. nodes: " + mbpn_nNodes);
+ out.println("MBPN trie depth: " + mbpn_trieDepth);
+ out.println("MBPN footprint: " + mbpn_footprint + " in bytes");
+ }
+ }
+
+ /** out print some statistics of the trie and the dictionary built from it */
+ public Stats stats() {
+ // calculate nEndValueBeneath
+ traversePostOrder(new Visitor() {
+ @Override
+ public void visit(Node n, int level) {
+ n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+ for (Node c : n.children)
+ n.nValuesBeneath += c.nValuesBeneath;
+ }
+ });
+
+ // run stats
+ final Stats s = new Stats();
+ final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+ traverse(new Visitor() {
+ @Override
+ public void visit(Node n, int level) {
+ if (n.isEndOfValue)
+ s.nValues++;
+ s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+ s.nValueBytesCompressed += n.part.length;
+ s.mbpn_nNodes++;
+ if (s.mbpn_trieDepth < level + 1)
+ s.mbpn_trieDepth = level + 1;
+ if (n.children.size() > 0) {
+ if (s.mbpn_maxFanOut < n.children.size())
+ s.mbpn_maxFanOut = n.children.size();
+ int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+ s.mbpn_nChildLookups += childLookups;
+ s.mbpn_nTotalFanOut += childLookups * n.children.size();
+ }
+
+ if (level < lenAtLvl.size())
+ lenAtLvl.set(level, n.part.length);
+ else
+ lenAtLvl.add(n.part.length);
+ int lenSoFar = 0;
+ for (int i = 0; i <= level; i++)
+ lenSoFar += lenAtLvl.get(i);
+ if (lenSoFar > s.maxValueLength)
+ s.maxValueLength = lenSoFar;
+ }
+ });
+
+ // flatten trie footprint calculation, case of One-Byte-Per-Node
+ s.obpn_sizeValue = 1;
+ s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues);
+ s.obpn_sizeChildCount = 1;
+ s.obpn_sizeChildOffset = 4; // MSB used as isEndOfValue flag
+ s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total
+ // number of compressed
+ // bytes in OBPN
+ s.obpn_footprint = s.obpn_nNodes * (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset);
+ while (true) { // minimize the offset size to match the footprint
+ int t = s.obpn_nNodes * (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1);
+ if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2
+ // because
+ // MSB
+ // of
+ // offset
+ // is
+ // used
+ // for
+ // isEndOfValue
+ // flag
+ s.obpn_sizeChildOffset--;
+ s.obpn_footprint = t;
+ } else
+ break;
+ }
+
+ // flatten trie footprint calculation, case of Multi-Byte-Per-Node
+ s.mbpn_sizeValueTotal = s.nValueBytesCompressed;
+ s.mbpn_sizeNoValueBytes = 1;
+ s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues);
+ s.mbpn_sizeChildOffset = 4;
+ s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset);
+ while (true) { // minimize the offset size to match the footprint
+ int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1);
+ if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4
+ // because
+ // 2
+ // MSB
+ // of
+ // offset
+ // is
+ // used
+ // for
+ // isEndOfValue
+ // &
+ // isEndChild
+ // flag
+ s.mbpn_sizeChildOffset--;
+ s.mbpn_footprint = t;
+ } else
+ break;
+ }
+
+ return s;
+ }
+
+ /** out print trie for debug */
+ public void print() {
+ print(System.out);
+ }
+
+ public void print(final PrintStream out) {
+ traverse(new Visitor() {
+ @Override
+ public void visit(Node n, int level) {
+ try {
+ for (int i = 0; i < level; i++)
+ out.print(" ");
+ out.print(new String(n.part, "UTF-8"));
+ out.print(" - ");
+ if (n.nValuesBeneath > 0)
+ out.print(n.nValuesBeneath);
+ if (n.isEndOfValue)
+ out.print("*");
+ out.print("\n");
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ private CompleteParts completeParts = new CompleteParts();
+
+ private class CompleteParts {
+ byte[] data = new byte[4096];
+ int current = 0;
+
+ public void append(byte[] part) {
+ while (current + part.length > data.length)
+ expand();
+
+ System.arraycopy(part, 0, data, current, part.length);
+ current += part.length;
+ }
+
+ public void withdraw(int size) {
+ current -= size;
+ }
+
+ public byte[] retrieve() {
+ return Arrays.copyOf(data, current);
+ }
+
+ private void expand() {
+ byte[] temp = new byte[2 * data.length];
+ System.arraycopy(data, 0, temp, 0, data.length);
+ data = temp;
+ }
+ }
+
+ // there is a 255 limitation of length for each node's part.
+ // we interpolate nodes to satisfy this when a node's part becomes
+ // too long(overflow)
+ private void checkOverflowParts(Node node) {
+ LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children);
+ for (Node child : childrenCopy) {
+ if (child.part.length > 255) {
+ byte[] first255 = Arrays.copyOf(child.part, 255);
+
+ completeParts.append(node.part);
+ completeParts.append(first255);
+ byte[] visited = completeParts.retrieve();
+ this.addValue(visited);
+ completeParts.withdraw(255);
+ completeParts.withdraw(node.part.length);
+ }
+ }
+
+ completeParts.append(node.part);// by here the node.children may have
+ // been changed
+ for (Node child : node.children) {
+ checkOverflowParts(child);
+ }
+ completeParts.withdraw(node.part.length);
+ }
+
+ /**
+ * Flatten the trie into a byte array for a minimized memory footprint.
+ * Lookup remains fast. Cost is inflexibility to modify (becomes immutable).
+ *
+ * Flattened node structure is HEAD + NODEs, for each node: - o byte, offset
+ * to child node, o = stats.mbpn_sizeChildOffset - 1 bit, isLastChild flag,
+ * the 1st MSB of o - 1 bit, isEndOfValue flag, the 2nd MSB of o - c byte,
+ * number of values beneath, c = stats.mbpn_sizeNoValueBeneath - 1 byte,
+ * number of value bytes - n byte, value bytes
+ */
+ public TrieDictionary<T> build(int baseId) {
+ byte[] trieBytes = buildTrieBytes(baseId);
+ TrieDictionary<T> r = new TrieDictionary<T>(trieBytes);
+ return r;
+ }
+
+ protected byte[] buildTrieBytes(int baseId) {
+ checkOverflowParts(this.root);
+
+ Stats stats = stats();
+ int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath;
+ int sizeChildOffset = stats.mbpn_sizeChildOffset;
+
+ // write head
+ byte[] head;
+ try {
+ ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+ DataOutputStream headOut = new DataOutputStream(byteBuf);
+ headOut.write(HEAD_MAGIC);
+ headOut.writeShort(0); // head size, will back fill
+ headOut.writeInt(stats.mbpn_footprint); // body size
+ headOut.write(sizeChildOffset);
+ headOut.write(sizeNoValuesBeneath);
+ headOut.writeShort(baseId);
+ headOut.writeShort(stats.maxValueLength);
+ headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName());
+ headOut.close();
+ head = byteBuf.toByteArray();
+ BytesUtil.writeUnsigned(head.length, head, HEAD_SIZE_I, 2);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // shall not happen, as we are
+ // writing in memory
+ }
+
+ byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
+ System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+ LinkedList<Node> open = new LinkedList<Node>();
+ IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>();
+
+ // write body
+ int o = head.length;
+ offsetMap.put(root, o);
+ o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
+ if (root.children.isEmpty() == false)
+ open.addLast(root);
+
+ while (open.isEmpty() == false) {
+ Node parent = open.removeFirst();
+ build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+ for (int i = 0; i < parent.children.size(); i++) {
+ Node c = parent.children.get(i);
+ boolean isLastChild = (i == parent.children.size() - 1);
+ offsetMap.put(c, o);
+ o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
+ if (c.children.isEmpty() == false)
+ open.addLast(c);
+ }
+ }
+
+ if (o != trieBytes.length)
+ throw new RuntimeException();
+ return trieBytes;
+ }
+
+ private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+ int flags = (int) trieBytes[parentOffset] & (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE);
+ BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+ trieBytes[parentOffset] |= flags;
+ }
+
+ private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) {
+ int o = offset;
+
+ // childOffset
+ if (isLastChild)
+ trieBytes[o] |= BIT_IS_LAST_CHILD;
+ if (n.isEndOfValue)
+ trieBytes[o] |= BIT_IS_END_OF_VALUE;
+ o += sizeChildOffset;
+
+ // nValuesBeneath
+ BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath);
+ o += sizeNoValuesBeneath;
+
+ // nValueBytes
+ if (n.part.length > 255)
+ throw new RuntimeException();
+ BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+ o++;
+
+ // valueBytes
+ System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+ o += n.part.length;
+
+ return o;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/lookup/DictionaryLookupTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/DictionaryLookupTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/DictionaryLookupTable.java
new file mode 100644
index 0000000..4d7686f
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/DictionaryLookupTable.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict.lookup;
+
+import java.io.IOException;
+
+import com.kylinolap.dict.Dictionary;
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+/**
+ * An in-memory lookup table indexed by a dictionary column. The column value
+ * must be unique within the table.
+ *
+ * @author yangli9
+ */
+public class DictionaryLookupTable {
+
+ private static final int MAX_CARDINALITY = 1000000;
+
+ private TableDesc tableDesc;
+ private String keyCol; // whose value must be unique across table
+ private Dictionary<String> dict;
+ private String tablePath;
+
+ private int keyColIndex;
+ private String[][] table;
+
+ public DictionaryLookupTable(TableDesc tableDesc, String keyCol, Dictionary<String> dict, String tablePath) throws IOException {
+ this.tableDesc = tableDesc;
+ this.keyCol = keyCol;
+ this.dict = dict;
+ this.tablePath = tablePath;
+ init();
+ }
+
+ private void init() throws IOException {
+ keyColIndex = tableDesc.findColumnByName(keyCol).getZeroBasedIndex();
+ table = new String[dict.getMaxId() - dict.getMinId() + 1][];
+
+ if (table.length > MAX_CARDINALITY) // 1 million
+ throw new IllegalStateException("Too high cardinality of table " + tableDesc + " as an in-mem lookup: " + table.length);
+
+ TableReader reader = new FileTable(tablePath, tableDesc.getColumnCount()).getReader();
+ try {
+ while (reader.next()) {
+ String[] cols = reader.getRow();
+ String key = cols[keyColIndex];
+ int rowNo = getRowNoByValue(key);
+
+ if (table[rowNo] != null) // dup key
+ throw new IllegalStateException("Dup key found, key=" + key + ", value1=" + toString(table[rowNo]) + ", value2=" + toString(cols));
+
+ table[rowNo] = cols;
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ public String[] getRow(int id) {
+ return table[getRowNoByID(id)];
+ }
+
+ public String[] getRow(String key) {
+ return table[getRowNoByValue(key)];
+ }
+
+ private int getRowNoByValue(String key) {
+ return getRowNoByID(dict.getIdFromValue(key));
+ }
+
+ private int getRowNoByID(int id) {
+ int rowNo = id - dict.getMinId();
+ return rowNo;
+ }
+
+ public void dump() {
+ for (int i = 0; i < table.length; i++) {
+ String key = dict.getValueFromId(i + dict.getMinId());
+ System.out.println(key + " => " + toString(table[i]));
+ }
+ }
+
+ private String toString(String[] cols) {
+ StringBuilder b = new StringBuilder();
+ b.append("[");
+ for (int i = 0; i < cols.length; i++) {
+ if (i > 0)
+ b.append(",");
+ b.append(cols[i]);
+ }
+ b.append("]");
+ return b.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/lookup/FileTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/FileTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/FileTable.java
new file mode 100644
index 0000000..f502adf
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/FileTable.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict.lookup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.kylinolap.common.util.HadoopUtil;
+
+/**
+ * @author yangli9
+ *
+ */
+public class FileTable implements ReadableTable {
+
+ String path;
+ String delim;
+ int nColumns;
+
+ public FileTable(String path, int nColumns) {
+ this(path, ReadableTable.DELIM_AUTO, nColumns);
+ }
+
+ public FileTable(String path, String delim, int nColumns) {
+ this.path = path;
+ this.delim = delim;
+ this.nColumns = nColumns;
+ }
+
+ @Override
+ public String getColumnDelimeter() {
+ return delim;
+ }
+
+ @Override
+ public TableReader getReader() throws IOException {
+ return new FileTableReader(path, delim, nColumns);
+ }
+
+ @Override
+ public TableSignature getSignature() throws IOException {
+ FileSystem fs = HadoopUtil.getFileSystem(path);
+ FileStatus status = fs.getFileStatus(new Path(path));
+ return new TableSignature(path, status.getLen(), status.getModificationTime());
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/lookup/FileTableReader.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/FileTableReader.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/FileTableReader.java
new file mode 100644
index 0000000..e113bc5
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/FileTableReader.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict.lookup;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import org.apache.commons.lang.StringEscapeUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.common.util.StringSplitter;
+
+/**
+ * Tables are typically CSV or SEQ file.
+ *
+ * @author yangli9
+ */
+public class FileTableReader implements TableReader {
+
+ private static final Logger logger = LoggerFactory.getLogger(FileTableReader.class);
+ private static final char CSV_QUOTE = '"';
+ private static final String[] DETECT_DELIMS = new String[] { "\177", "|", "\t", "," };
+
+ private String filePath;
+ private String delim;
+ private RowReader reader;
+
+ private String curLine;
+ private String[] curColumns;
+ private int expectedColumnNumber = -1; // helps delimiter detection
+
+ public FileTableReader(String filePath, String delim, int expectedColumnNumber) throws IOException {
+ this.filePath = filePath;
+ this.delim = delim;
+ this.expectedColumnNumber = expectedColumnNumber;
+
+ FileSystem fs = HadoopUtil.getFileSystem(filePath);
+
+ try {
+ this.reader = new SeqRowReader(HadoopUtil.getDefaultConfiguration(), fs, filePath);
+
+ } catch (IOException e) {
+ if (isExceptionSayingNotSeqFile(e) == false)
+ throw e;
+
+ this.reader = new CsvRowReader(fs, filePath);
+ }
+ }
+
+ private boolean isExceptionSayingNotSeqFile(IOException e) {
+ if (e.getMessage() != null && e.getMessage().contains("not a SequenceFile"))
+ return true;
+
+ if (e instanceof EOFException) // in case the file is very very small
+ return true;
+
+ return false;
+ }
+
+ @Override
+ public void setExpectedColumnNumber(int expectedColumnNumber) {
+ this.expectedColumnNumber = expectedColumnNumber;
+ }
+
+ @Override
+ public boolean next() throws IOException {
+ curLine = reader.nextLine();
+ curColumns = null;
+ return curLine != null;
+ }
+
+ public String getLine() {
+ return curLine;
+ }
+
+ @Override
+ public String[] getRow() {
+ if (curColumns == null) {
+ if (ReadableTable.DELIM_AUTO.equals(delim))
+ delim = autoDetectDelim(curLine);
+
+ if (delim == null)
+ curColumns = new String[] { curLine };
+ else
+ curColumns = split(curLine, delim);
+ }
+ return curColumns;
+ }
+
+ private String[] split(String line, String delim) {
+ // FIXME CVS line should be parsed considering escapes
+ String str[] = StringSplitter.split(line, delim);
+
+ // un-escape CSV
+ if (ReadableTable.DELIM_COMMA.equals(delim)) {
+ for (int i = 0; i < str.length; i++) {
+ str[i] = unescapeCsv(str[i]);
+ }
+ }
+
+ return str;
+ }
+
+ private String unescapeCsv(String str) {
+ if (str == null || str.length() < 2)
+ return str;
+
+ str = StringEscapeUtils.unescapeCsv(str);
+
+ // unescapeCsv may not remove the outer most quotes
+ if (str.charAt(0) == CSV_QUOTE && str.charAt(str.length() - 1) == CSV_QUOTE)
+ str = str.substring(1, str.length() - 1);
+
+ return str;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null)
+ reader.close();
+ }
+
+ private String autoDetectDelim(String line) {
+ if (expectedColumnNumber > 0) {
+ for (String delim : DETECT_DELIMS) {
+ if (StringSplitter.split(line, delim).length == expectedColumnNumber) {
+ logger.info("Auto detect delim to be '" + delim + "', split line to " + expectedColumnNumber + " columns -- " + line);
+ return delim;
+ }
+ }
+ }
+
+ logger.info("Auto detect delim to be null, will take THE-WHOLE-LINE as a single value, for " + filePath);
+ return null;
+ }
+
+ // ============================================================================
+
+ private interface RowReader extends Closeable {
+ String nextLine() throws IOException; // return null on EOF
+ }
+
+ private class SeqRowReader implements RowReader {
+ Reader reader;
+ Writable key;
+ Text value;
+
+ SeqRowReader(Configuration hconf, FileSystem fs, String path) throws IOException {
+ reader = new Reader(hconf, SequenceFile.Reader.file(new Path(path)));
+ key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
+ value = new Text();
+ }
+
+ @Override
+ public String nextLine() throws IOException {
+ boolean hasNext = reader.next(key, value);
+ if (hasNext)
+ return Bytes.toString(value.getBytes(), 0, value.getLength());
+ else
+ return null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+ }
+
+ private class CsvRowReader implements RowReader {
+ BufferedReader reader;
+
+ CsvRowReader(FileSystem fs, String path) throws IOException {
+ FSDataInputStream in = fs.open(new Path(path));
+ reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
+ }
+
+ @Override
+ public String nextLine() throws IOException {
+ return reader.readLine();
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java
new file mode 100644
index 0000000..25d2a87
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/HiveTable.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict.lookup;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.util.HadoopUtil;
+import com.kylinolap.common.util.CliCommandExecutor;
+import com.kylinolap.metadata.MetadataManager;
+
+/**
+ * @author yangli9
+ *
+ */
+public class HiveTable implements ReadableTable {
+
+ private static final Logger logger = LoggerFactory.getLogger(HiveTable.class);
+
+ private String hiveTable;
+ private int nColumns;
+ private String hdfsLocation;
+ private FileTable fileTable;
+
+ public HiveTable(MetadataManager metaMgr, String table) {
+ this.hiveTable = table;
+ this.nColumns = metaMgr.getTableDesc(table).getColumnCount();
+ }
+
+ @Override
+ public String getColumnDelimeter() throws IOException {
+ return getFileTable().getColumnDelimeter();
+ }
+
+ @Override
+ public TableReader getReader() throws IOException {
+ return getFileTable().getReader();
+ }
+
+ @Override
+ public TableSignature getSignature() throws IOException {
+ return getFileTable().getSignature();
+ }
+
+ private FileTable getFileTable() throws IOException {
+ if (fileTable == null) {
+ fileTable = new FileTable(getHDFSLocation(true), nColumns);
+ }
+ return fileTable;
+ }
+
+ public String getHDFSLocation(boolean needFilePath) throws IOException {
+ if (hdfsLocation == null) {
+ hdfsLocation = computeHDFSLocation(needFilePath);
+ }
+ return hdfsLocation;
+ }
+
+ private String computeHDFSLocation(boolean needFilePath) throws IOException {
+
+ String override = KylinConfig.getInstanceFromEnv().getOverrideHiveTableLocation(hiveTable);
+ if (override != null) {
+ logger.debug("Override hive table location " + hiveTable + " -- " + override);
+ return override;
+ }
+
+ String cmd = "hive -e \"describe extended " + hiveTable + ";\"";
+ CliCommandExecutor exec = KylinConfig.getInstanceFromEnv().getCliCommandExecutor();
+ String output = exec.execute(cmd);
+
+ Pattern ptn = Pattern.compile("location:(.*?),");
+ Matcher m = ptn.matcher(output);
+ if (m.find() == false)
+ throw new IOException("Failed to find HDFS location for hive table " + hiveTable + " from output -- " + output);
+
+ String hdfsDir = m.group(1);
+
+ if (needFilePath) {
+ FileSystem fs = HadoopUtil.getFileSystem(hdfsDir);
+ FileStatus file = findOnlyFile(hdfsDir, fs);
+ return file.getPath().toString();
+ } else {
+ return hdfsDir;
+ }
+ }
+
+ private FileStatus findOnlyFile(String hdfsDir, FileSystem fs) throws FileNotFoundException, IOException {
+ FileStatus[] files = fs.listStatus(new Path(hdfsDir));
+ ArrayList<FileStatus> nonZeroFiles = Lists.newArrayList();
+ for (FileStatus f : files) {
+ if (f.getLen() > 0)
+ nonZeroFiles.add(f);
+ }
+ if (nonZeroFiles.size() != 1)
+ throw new IllegalStateException("Expect 1 and only 1 non-zero file under " + hdfsDir + ", but find " + nonZeroFiles.size());
+ return nonZeroFiles.get(0);
+ }
+
+ @Override
+ public String toString() {
+ return "hive:" + hiveTable;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupBytesTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupBytesTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupBytesTable.java
new file mode 100644
index 0000000..25f771f
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupBytesTable.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict.lookup;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.kylinolap.common.util.ByteArray;
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+public class LookupBytesTable extends LookupTable<ByteArray> {
+
+ public LookupBytesTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+ super(tableDesc, keyColumns, table);
+ }
+
+ @Override
+ protected ByteArray[] convertRow(String[] cols) {
+ ByteArray[] r = new ByteArray[cols.length];
+ for (int i = 0; i < cols.length; i++) {
+ r[i] = cols[i] == null ? null : new ByteArray(Bytes.toBytes(cols[i]));
+ }
+ return r;
+ }
+
+ @Override
+ protected String toString(ByteArray cell) {
+ return Bytes.toString(cell.data);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupStringTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupStringTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupStringTable.java
new file mode 100644
index 0000000..0b6cc86
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupStringTable.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict.lookup;
+
+import java.io.IOException;
+
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+/**
+ * @author yangli9
+ *
+ */
+public class LookupStringTable extends LookupTable<String> {
+
+ public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+ super(tableDesc, keyColumns, table);
+ }
+
+ @Override
+ protected String[] convertRow(String[] cols) {
+ return cols;
+ }
+
+ @Override
+ protected String toString(String cell) {
+ return cell;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupTable.java
new file mode 100644
index 0000000..19d40ca
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/LookupTable.java
@@ -0,0 +1,167 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict.lookup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Sets;
+import com.kylinolap.common.util.Array;
+import com.kylinolap.metadata.model.schema.TableDesc;
+
+/**
+ * An in-memory lookup table, in which each cell is an object of type T. The
+ * table is indexed by specified PK for fast lookup.
+ *
+ * @author yangli9
+ */
+abstract public class LookupTable<T extends Comparable<T>> {
+
+ protected TableDesc tableDesc;
+ protected String[] keyColumns;
+ protected ReadableTable table;
+ protected ConcurrentHashMap<Array<T>, T[]> data;
+
+ public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+ this.tableDesc = tableDesc;
+ this.keyColumns = keyColumns;
+ this.table = table;
+ this.data = new ConcurrentHashMap<Array<T>, T[]>();
+ init();
+ }
+
+ protected void init() throws IOException {
+ int[] keyIndex = new int[keyColumns.length];
+ for (int i = 0; i < keyColumns.length; i++) {
+ keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex();
+ }
+
+ TableReader reader = table.getReader();
+ try {
+ while (reader.next()) {
+ initRow(reader.getRow(), keyIndex);
+ }
+ } finally {
+ reader.close();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void initRow(String[] cols, int[] keyIndex) {
+ T[] value = convertRow(cols);
+ T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(value[0].getClass(), keyIndex.length);
+ for (int i = 0; i < keyCols.length; i++)
+ keyCols[i] = value[keyIndex[i]];
+
+ Array<T> key = new Array<T>(keyCols);
+
+ if (data.containsKey(key))
+ throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value));
+
+ data.put(key, value);
+ }
+
+ abstract protected T[] convertRow(String[] cols);
+
+ public T[] getRow(Array<T> key) {
+ return data.get(key);
+ }
+
+ public Collection<T[]> getAllRows() {
+ return data.values();
+ }
+
+ public List<T> scan(String col, List<T> values, String returnCol) {
+ ArrayList<T> result = new ArrayList<T>();
+ int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+ int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+ for (T[] row : data.values()) {
+ if (values.contains(row[colIdx]))
+ result.add(row[returnIdx]);
+ }
+ return result;
+ }
+
+ public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) {
+ int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+ int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+ T returnBegin = null;
+ T returnEnd = null;
+ for (T[] row : data.values()) {
+ if (between(beginValue, row[colIdx], endValue)) {
+ T returnValue = row[returnIdx];
+ if (returnBegin == null || returnValue.compareTo(returnBegin) < 0) {
+ returnBegin = returnValue;
+ }
+ if (returnEnd == null || returnValue.compareTo(returnEnd) > 0) {
+ returnEnd = returnValue;
+ }
+ }
+ }
+ if (returnBegin == null && returnEnd == null)
+ return null;
+ else
+ return new Pair<T, T>(returnBegin, returnEnd);
+ }
+
+ public Set<T> mapValues(String col, Set<T> values, String returnCol) {
+ int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+ int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+ Set<T> result = Sets.newHashSetWithExpectedSize(values.size());
+ for (T[] row : data.values()) {
+ if (values.contains(row[colIdx])) {
+ result.add(row[returnIdx]);
+ }
+ }
+ return result;
+ }
+
+ private boolean between(T beginValue, T v, T endValue) {
+ return (beginValue == null || beginValue.compareTo(v) <= 0) && (endValue == null || v.compareTo(endValue) <= 0);
+ }
+
+ public String toString() {
+ return "LookupTable [path=" + table + "]";
+ }
+
+ protected String toString(T[] cols) {
+ StringBuilder b = new StringBuilder();
+ b.append("[");
+ for (int i = 0; i < cols.length; i++) {
+ if (i > 0)
+ b.append(",");
+ b.append(toString(cols[i]));
+ }
+ b.append("]");
+ return b.toString();
+ }
+
+ abstract protected String toString(T cell);
+
+ public void dump() {
+ for (Array<T> key : data.keySet()) {
+ System.out.println(toString(key.data) + " => " + toString(data.get(key)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/4b631f92/dictionary/src/main/java/com/kylinolap/dict/lookup/ReadableTable.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/com/kylinolap/dict/lookup/ReadableTable.java b/dictionary/src/main/java/com/kylinolap/dict/lookup/ReadableTable.java
new file mode 100644
index 0000000..4f5c22f
--- /dev/null
+++ b/dictionary/src/main/java/com/kylinolap/dict/lookup/ReadableTable.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2013-2014 eBay Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kylinolap.dict.lookup;
+
+import java.io.IOException;
+
+/**
+ * @author yangli9
+ */
+public interface ReadableTable {
+
+ public static final String DELIM_AUTO = "auto";
+ public static final String DELIM_COMMA = ",";
+
+ public TableReader getReader() throws IOException;
+
+ public TableSignature getSignature() throws IOException;
+
+ public String getColumnDelimeter() throws IOException;
+
+}