You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/04/13 11:22:13 UTC
[47/50] [abbrv] kylin git commit: KYLIN-2506 Refactor Global
Dictionary
http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
new file mode 100644
index 0000000..dd9593a
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictNode.java
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+
+public class DictNode {
+ public byte[] part;
+ public int id = -1;
+ public boolean isEndOfValue;
+ public ArrayList<DictNode> children = new ArrayList<>();
+
+ public int nValuesBeneath;
+ public DictNode parent;
+ public int childrenCount = 1;
+
+ DictNode(byte[] value, boolean isEndOfValue) {
+ reset(value, isEndOfValue);
+ }
+
+ DictNode(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
+ reset(value, isEndOfValue, children);
+ }
+
+ void reset(byte[] value, boolean isEndOfValue) {
+ reset(value, isEndOfValue, new ArrayList<DictNode>());
+ }
+
+ void reset(byte[] value, boolean isEndOfValue, ArrayList<DictNode> children) {
+ this.part = value;
+ this.isEndOfValue = isEndOfValue;
+ clearChild();
+ for (DictNode child : children) {
+ addChild(child);
+ }
+ this.id = -1;
+ }
+
+ void clearChild() {
+ this.children.clear();
+ int childrenCountDelta = this.childrenCount - 1;
+ for (DictNode p = this; p != null; p = p.parent) {
+ p.childrenCount -= childrenCountDelta;
+ }
+ }
+
+ void addChild(DictNode child) {
+ addChild(-1, child);
+ }
+
+ void addChild(int index, DictNode child) {
+ child.parent = this;
+ if (index < 0) {
+ this.children.add(child);
+ } else {
+ this.children.add(index, child);
+ }
+ for (DictNode p = this; p != null; p = p.parent) {
+ p.childrenCount += child.childrenCount;
+ }
+ }
+
+ private DictNode removeChild(int index) {
+ DictNode child = children.remove(index);
+ child.parent = null;
+ for (DictNode p = this; p != null; p = p.parent) {
+ p.childrenCount -= child.childrenCount;
+ }
+ return child;
+ }
+
+ private DictNode duplicateNode() {
+ DictNode newChild = new DictNode(part, false);
+ newChild.parent = parent;
+ if (parent != null) {
+ int index = parent.children.indexOf(this);
+ parent.addChild(index + 1, newChild);
+ }
+ return newChild;
+ }
+
+ public byte[] firstValue() {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ DictNode p = this;
+ while (true) {
+ bytes.write(p.part, 0, p.part.length);
+ if (p.isEndOfValue || p.children.size() == 0) {
+ break;
+ }
+ p = p.children.get(0);
+ }
+ return bytes.toByteArray();
+ }
+
+ public static DictNode splitNodeTree(final DictNode splitNode) {
+ if (splitNode == null) {
+ return null;
+ }
+ DictNode current = splitNode;
+ DictNode p = current.parent;
+ while (p != null) {
+ int index = p.children.indexOf(current);
+ assert index != -1;
+ DictNode newParent = p.duplicateNode();
+ for (int i = p.children.size() - 1; i >= index; i--) {
+ DictNode child = p.removeChild(i);
+ newParent.addChild(0, child);
+ }
+ current = newParent;
+ p = p.parent;
+ }
+ return current;
+ }
+
+ public byte[] buildTrieBytes() {
+ Stats stats = Stats.stats(this);
+ int sizeChildOffset = stats.mbpn_sizeChildOffset;
+ int sizeId = stats.mbpn_sizeId;
+
+ // write head
+ byte[] head;
+ try {
+ ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+ DataOutputStream headOut = new DataOutputStream(byteBuf);
+ headOut.write(AppendTrieDictionary.HEAD_MAGIC);
+ headOut.writeShort(0); // head size, will back fill
+ headOut.writeInt(stats.mbpn_footprint); // body size
+ headOut.writeInt(stats.nValues);
+ headOut.write(sizeChildOffset);
+ headOut.write(sizeId);
+ headOut.close();
+ head = byteBuf.toByteArray();
+ BytesUtil.writeUnsigned(head.length, head, AppendTrieDictionary.HEAD_SIZE_I, 2);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // shall not happen, as we are
+ }
+
+ byte[] trieBytes = new byte[stats.mbpn_footprint + head.length];
+ System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+ LinkedList<DictNode> open = new LinkedList<DictNode>();
+ IdentityHashMap<DictNode, Integer> offsetMap = new IdentityHashMap<DictNode, Integer>();
+
+ // write body
+ int o = head.length;
+ offsetMap.put(this, o);
+ o = build_writeNode(this, o, true, sizeChildOffset, sizeId, trieBytes);
+ if (this.children.isEmpty() == false)
+ open.addLast(this);
+
+ while (open.isEmpty() == false) {
+ DictNode parent = open.removeFirst();
+ build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+ for (int i = 0; i < parent.children.size(); i++) {
+ DictNode c = parent.children.get(i);
+ boolean isLastChild = (i == parent.children.size() - 1);
+ offsetMap.put(c, o);
+ o = build_writeNode(c, o, isLastChild, sizeChildOffset, sizeId, trieBytes);
+ if (c.children.isEmpty() == false)
+ open.addLast(c);
+ }
+ }
+
+ if (o != trieBytes.length)
+ throw new RuntimeException();
+ return trieBytes;
+ }
+
+ private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+ int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
+ BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+ trieBytes[parentOffset] |= flags;
+ }
+
+ private int build_writeNode(DictNode n, int offset, boolean isLastChild, int sizeChildOffset, int sizeId, byte[] trieBytes) {
+ int o = offset;
+
+ // childOffset
+ if (isLastChild)
+ trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
+ if (n.isEndOfValue)
+ trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
+ o += sizeChildOffset;
+
+ // nValueBytes
+ if (n.part.length > 255)
+ throw new RuntimeException("Value length is " + n.part.length + " and larger than 255: " + Bytes.toStringBinary(n.part));
+ BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+ o++;
+
+ // valueBytes
+ System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+ o += n.part.length;
+
+ if (n.isEndOfValue) {
+ checkValidId(n.id);
+ BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
+ o += sizeId;
+ }
+
+ return o;
+ }
+
+ // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+ private void checkValidId(int id) {
+ if (id == 0 || id == -1) {
+ throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DictNode[root=%s, nodes=%d, firstValue=%s]", Bytes.toStringBinary(part), childrenCount, Bytes.toStringBinary(firstValue()));
+ }
+
+ static class Stats {
+ public interface Visitor {
+ void visit(DictNode n, int level);
+ }
+
+ private static void traverseR(DictNode node, Visitor visitor, int level) {
+ visitor.visit(node, level);
+ for (DictNode c : node.children)
+ traverseR(c, visitor, level + 1);
+ }
+
+ private static void traversePostOrderR(DictNode node, Visitor visitor, int level) {
+ for (DictNode c : node.children)
+ traversePostOrderR(c, visitor, level + 1);
+ visitor.visit(node, level);
+ }
+
+ public int nValues; // number of values in total
+ public int nValueBytesPlain; // number of bytes for all values
+ // uncompressed
+ public int nValueBytesCompressed; // number of values bytes in Trie
+ // (compressed)
+ public int maxValueLength; // size of longest value in bytes
+
+ // the trie is multi-byte-per-node
+ public int mbpn_nNodes; // number of nodes in trie
+ public int mbpn_trieDepth; // depth of trie
+ public int mbpn_maxFanOut; // the maximum no. children
+ public int mbpn_nChildLookups; // number of child lookups during lookup
+ // every value once
+ public int mbpn_nTotalFanOut; // the sum of fan outs during lookup every
+ // value once
+ public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+ public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+ public int mbpn_sizeChildOffset; // size of field childOffset, points to
+ // first child in flattened array
+ public int mbpn_sizeId; // size of id value, always be 4
+ public int mbpn_footprint; // MBPN footprint in bytes
+
+ /**
+ * out print some statistics of the trie and the dictionary built from it
+ */
+ public static Stats stats(DictNode root) {
+ // calculate nEndValueBeneath
+ traversePostOrderR(root, new Visitor() {
+ @Override
+ public void visit(DictNode n, int level) {
+ n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+ for (DictNode c : n.children)
+ n.nValuesBeneath += c.nValuesBeneath;
+ }
+ }, 0);
+
+ // run stats
+ final Stats s = new Stats();
+ final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+ traverseR(root, new Visitor() {
+ @Override
+ public void visit(DictNode n, int level) {
+ if (n.isEndOfValue)
+ s.nValues++;
+ s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+ s.nValueBytesCompressed += n.part.length;
+ s.mbpn_nNodes++;
+ if (s.mbpn_trieDepth < level + 1)
+ s.mbpn_trieDepth = level + 1;
+ if (n.children.size() > 0) {
+ if (s.mbpn_maxFanOut < n.children.size())
+ s.mbpn_maxFanOut = n.children.size();
+ int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+ s.mbpn_nChildLookups += childLookups;
+ s.mbpn_nTotalFanOut += childLookups * n.children.size();
+ }
+
+ if (level < lenAtLvl.size())
+ lenAtLvl.set(level, n.part.length);
+ else
+ lenAtLvl.add(n.part.length);
+ int lenSoFar = 0;
+ for (int i = 0; i <= level; i++)
+ lenSoFar += lenAtLvl.get(i);
+ if (lenSoFar > s.maxValueLength)
+ s.maxValueLength = lenSoFar;
+ }
+ }, 0);
+
+ // flatten trie footprint calculation, case of Multi-Byte-Per-DictNode
+ s.mbpn_sizeId = 4;
+ s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
+ s.mbpn_sizeNoValueBytes = 1;
+ s.mbpn_sizeChildOffset = 5;
+ s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
+ while (true) { // minimize the offset size to match the footprint
+ int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
+ // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
+ // expand t to long before *4, avoiding exceed Integer.MAX_VALUE
+ if (BytesUtil.sizeForValue((long) t * 4) <= s.mbpn_sizeChildOffset - 1) {
+ s.mbpn_sizeChildOffset--;
+ s.mbpn_footprint = t;
+ } else
+ break;
+ }
+
+ return s;
+ }
+
+ /**
+ * out print trie for debug
+ */
+ public void print(DictNode root) {
+ print(root, System.out);
+ }
+
+ public void print(DictNode root, final PrintStream out) {
+ traverseR(root, new Visitor() {
+ @Override
+ public void visit(DictNode n, int level) {
+ try {
+ for (int i = 0; i < level; i++)
+ out.print(" ");
+ out.print(new String(n.part, "UTF-8"));
+ out.print(" - ");
+ if (n.nValuesBeneath > 0)
+ out.print(n.nValuesBeneath);
+ if (n.isEndOfValue)
+ out.print("* [" + n.id + "]");
+ out.print("\n");
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+ }, 0);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
new file mode 100644
index 0000000..3ca0d8f
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSlice.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+
+public class DictSlice {
+ static final byte[] HEAD_MAGIC = new byte[] { 0x41, 0x70, 0x70, 0x65, 0x63, 0x64, 0x54, 0x72, 0x69, 0x65, 0x44, 0x69, 0x63, 0x74 }; // "AppendTrieDict"
+ static final int HEAD_SIZE_I = HEAD_MAGIC.length;
+ static final int BIT_IS_LAST_CHILD = 0x80;
+ static final int BIT_IS_END_OF_VALUE = 0x40;
+
+ private byte[] trieBytes;
+
+ // non-persistent part
+ transient private int headSize;
+ transient private int bodyLen;
+ transient private int sizeChildOffset;
+
+ transient private int nValues;
+ transient private int sizeOfId;
+ // mask MUST be long, since childOffset maybe 5 bytes at most
+ transient private long childOffsetMask;
+ transient private int firstByteOffset;
+
+ public DictSlice(byte[] bytes) {
+ this.trieBytes = bytes;
+ init();
+ }
+
+ private void init() {
+ if (BytesUtil.compareBytes(HEAD_MAGIC, 0, trieBytes, 0, HEAD_MAGIC.length) != 0)
+ throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+ try {
+ DataInputStream headIn = new DataInputStream(new ByteArrayInputStream(trieBytes, HEAD_SIZE_I, trieBytes.length - HEAD_SIZE_I));
+ this.headSize = headIn.readShort();
+ this.bodyLen = headIn.readInt();
+ this.nValues = headIn.readInt();
+ this.sizeChildOffset = headIn.read();
+ this.sizeOfId = headIn.read();
+
+ this.childOffsetMask = ~(((long) (BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8));
+ this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
+ } catch (Exception e) {
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static DictSlice deserializeFrom(DataInput in) throws IOException {
+ byte[] headPartial = new byte[HEAD_MAGIC.length + Short.SIZE / Byte.SIZE + Integer.SIZE / Byte.SIZE];
+ in.readFully(headPartial);
+
+ if (BytesUtil.compareBytes(HEAD_MAGIC, 0, headPartial, 0, HEAD_MAGIC.length) != 0)
+ throw new IllegalArgumentException("Wrong file type (magic does not match)");
+
+ DataInputStream headIn = new DataInputStream(//
+ new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
+ int headSize = headIn.readShort();
+ int bodyLen = headIn.readInt();
+ headIn.close();
+
+ byte[] all = new byte[headSize + bodyLen];
+ System.arraycopy(headPartial, 0, all, 0, headPartial.length);
+ in.readFully(all, headPartial.length, all.length - headPartial.length);
+
+ return new DictSlice(all);
+ }
+
+ public byte[] getFirstValue() {
+ int nodeOffset = headSize;
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ while (true) {
+ int valueLen = BytesUtil.readUnsigned(trieBytes, nodeOffset + firstByteOffset - 1, 1);
+ bytes.write(trieBytes, nodeOffset + firstByteOffset, valueLen);
+ if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
+ break;
+ }
+ nodeOffset = headSize + (int) (BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
+ if (nodeOffset == headSize) {
+ break;
+ }
+ }
+ return bytes.toByteArray();
+ }
+
+ /**
+ * returns a code point from [0, nValues), preserving order of value
+ *
+ * @param n -- the offset of current node
+ * @param inp -- input value bytes to lookup
+ * @param o -- offset in the input value bytes matched so far
+ * @param inpEnd -- end of input
+ * @param roundingFlag -- =0: return -1 if not found
+ * -- <0: return closest smaller if not found, return -1
+ * -- >0: return closest bigger if not found, return nValues
+ */
+ private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
+ while (true) {
+ // match the current node
+ int p = n + firstByteOffset; // start of node's value
+ int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
+ for (; p < end && o < inpEnd; p++, o++) { // note matching start from [0]
+ if (trieBytes[p] != inp[o]) {
+ return -1; // mismatch
+ }
+ }
+
+ // node completely matched, is input all consumed?
+ boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+ if (o == inpEnd) {
+ return p == end && isEndOfValue ? BytesUtil.readUnsigned(trieBytes, end, sizeOfId) : -1;
+ }
+
+ // find a child to continue
+ int c = headSize + (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
+ if (c == headSize) // has no children
+ return -1;
+ byte inpByte = inp[o];
+ int comp;
+ while (true) {
+ p = c + firstByteOffset;
+ comp = BytesUtil.compareByteUnsigned(trieBytes[p], inpByte);
+ if (comp == 0) { // continue in the matching child, reset n and loop again
+ n = c;
+ break;
+ } else if (comp < 0) { // try next child
+ if (checkFlag(c, BIT_IS_LAST_CHILD))
+ return -1;
+ c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1) + (checkFlag(c, BIT_IS_END_OF_VALUE) ? sizeOfId : 0);
+ } else { // children are ordered by their first value byte
+ return -1;
+ }
+ }
+ }
+ }
+
+ private boolean checkFlag(int offset, int bit) {
+ return (trieBytes[offset] & bit) > 0;
+ }
+
+ public int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+ int id = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
+ return id;
+ }
+
+ public DictNode rebuildTrieTree() {
+ return rebuildTrieTreeR(headSize, null);
+ }
+
+ private DictNode rebuildTrieTreeR(int n, DictNode parent) {
+ DictNode root = null;
+ while (true) {
+ int p = n + firstByteOffset;
+ int childOffset = (int) (BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
+ int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+ boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
+
+ byte[] value = new byte[parLen];
+ System.arraycopy(trieBytes, p, value, 0, parLen);
+
+ DictNode node = new DictNode(value, isEndOfValue);
+ if (isEndOfValue) {
+ int id = BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+ node.id = id;
+ }
+
+ if (parent == null) {
+ root = node;
+ } else {
+ parent.addChild(node);
+ }
+
+ if (childOffset != 0) {
+ rebuildTrieTreeR(childOffset + headSize, node);
+ }
+
+ if (checkFlag(n, BIT_IS_LAST_CHILD)) {
+ break;
+ } else {
+ n += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+ }
+ }
+ return root;
+ }
+
+ public boolean doCheck() {
+ int offset = headSize;
+ HashSet<Integer> parentSet = new HashSet<>();
+ boolean lastChild = false;
+
+ while (offset < trieBytes.length) {
+ if (lastChild) {
+ boolean contained = parentSet.remove(offset - headSize);
+ // Can't find parent, the data is corrupted
+ if (!contained) {
+ return false;
+ }
+ lastChild = false;
+ }
+ int p = offset + firstByteOffset;
+ int childOffset = (int) (BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask);
+ int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+ boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE);
+
+ // Copy value overflow, the data is corrupted
+ if (trieBytes.length < p + parLen) {
+ return false;
+ }
+
+ // Check id is fine
+ if (isEndOfValue) {
+ BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+ }
+
+ // Record it if has children
+ if (childOffset != 0) {
+ parentSet.add(childOffset);
+ }
+
+ // brothers done, move to next parent
+ if (checkFlag(offset, BIT_IS_LAST_CHILD)) {
+ lastChild = true;
+ }
+
+ // move to next node
+ offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+ }
+
+ // ParentMap is empty, meaning all nodes has parent, the data is correct
+ return parentSet.isEmpty();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("DictSlice[firstValue=%s, values=%d, bytes=%d]", Bytes.toStringBinary(getFirstValue()), nValues, bodyLen);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(trieBytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof DictSlice)) {
+ return false;
+ }
+ DictSlice that = (DictSlice) o;
+ return Arrays.equals(this.trieBytes, that.trieBytes);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
new file mode 100644
index 0000000..8fc3f78
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictSliceKey.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class DictSliceKey implements Comparable<DictSliceKey> {
+ static final DictSliceKey START_KEY = DictSliceKey.wrap(new byte[0]);
+
+ byte[] key;
+
+ public static DictSliceKey wrap(byte[] key) {
+ DictSliceKey dictKey = new DictSliceKey();
+ dictKey.key = key;
+ return dictKey;
+ }
+
+ @Override
+ public String toString() {
+ return Bytes.toStringBinary(key);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(key);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o instanceof DictSliceKey) {
+ DictSliceKey that = (DictSliceKey) o;
+ return Arrays.equals(this.key, that.key);
+ }
+ return false;
+ }
+
+ @Override
+ public int compareTo(DictSliceKey that) {
+ return Bytes.compareTo(key, that.key);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(key.length);
+ out.write(key);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ key = new byte[in.readInt()];
+ in.readFully(key);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
new file mode 100644
index 0000000..d9030d3
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictHDFSStore.java
@@ -0,0 +1,420 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+public class GlobalDictHDFSStore extends GlobalDictStore {
+
+ static final Logger logger = LoggerFactory.getLogger(GlobalDictHDFSStore.class);
+ static final String V1_INDEX_NAME = ".index";
+ static final String V2_INDEX_NAME = ".index_v2";
+ static final String VERSION_PREFIX = "version_";
+ static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+ private final Path basePath;
+ private final Configuration conf;
+ private final FileSystem fileSystem;
+
+ protected GlobalDictHDFSStore(String baseDir) throws IOException {
+ super(baseDir);
+ this.basePath = new Path(baseDir);
+ this.conf = HadoopUtil.getCurrentConfiguration();
+ this.fileSystem = HadoopUtil.getFileSystem(baseDir);
+
+ if (!fileSystem.exists(basePath)) {
+ logger.info("Global dict at {} doesn't exist, create a new one", basePath);
+ fileSystem.mkdirs(basePath);
+ }
+
+ migrateOldLayout();
+ }
+
+ // Previously we put slice files and index file directly in base directory,
+ // should migrate to the new versioned layout
+ private void migrateOldLayout() throws IOException {
+ FileStatus[] sliceFiles = fileSystem.listStatus(basePath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(IndexFormatV1.SLICE_PREFIX);
+ }
+ });
+ Path indexFile = new Path(basePath, V1_INDEX_NAME);
+
+ if (fileSystem.exists(indexFile) && sliceFiles.length > 0) { // old layout
+ final long version = System.currentTimeMillis();
+ Path tempDir = new Path(basePath, "tmp_" + VERSION_PREFIX + version);
+ Path versionDir = getVersionDir(version);
+
+ logger.info("Convert global dict at {} to new layout with version {}", basePath, version);
+
+ fileSystem.mkdirs(tempDir);
+ // convert to new layout
+ try {
+ // copy index and slice files to temp
+ FileUtil.copy(fileSystem, indexFile, fileSystem, tempDir, false, conf);
+ for (FileStatus sliceFile : sliceFiles) {
+ FileUtil.copy(fileSystem, sliceFile.getPath(), fileSystem, tempDir, false, conf);
+ }
+ // rename
+ fileSystem.rename(tempDir, versionDir);
+ // delete index and slices files in base dir
+ fileSystem.delete(indexFile, false);
+ for (FileStatus sliceFile : sliceFiles) {
+ fileSystem.delete(sliceFile.getPath(), true);
+ }
+
+ } finally {
+ if (fileSystem.exists(tempDir)) {
+ fileSystem.delete(tempDir, true);
+ }
+ }
+ }
+ }
+
+ @Override
+ void prepareForWrite(String workingDir) throws IOException {
+ // TODO create lock file
+ Path working = new Path(workingDir);
+
+ if (fileSystem.exists(working)) {
+ fileSystem.delete(working, true);
+ logger.info("Working directory {} exits, delete it first", working);
+ }
+
+ // when build dict, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
+ Long[] versions = listAllVersions();
+ if (versions.length > 0) {
+ Path latestVersion = getVersionDir(versions[versions.length - 1]);
+ FileUtil.copy(fileSystem, latestVersion, fileSystem, working, false, true, conf);
+ } else {
+ fileSystem.mkdirs(working);
+ }
+ }
+
+ @Override
+ Long[] listAllVersions() throws IOException {
+ FileStatus[] versionDirs = fileSystem.listStatus(basePath, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(VERSION_PREFIX);
+ }
+ });
+ TreeSet<Long> versions = new TreeSet<>();
+ for (int i = 0; i < versionDirs.length; i++) {
+ Path path = versionDirs[i].getPath();
+ versions.add(Long.parseLong(path.getName().substring(VERSION_PREFIX.length())));
+ }
+ return versions.toArray(new Long[versions.size()]);
+ }
+
+ @Override
+ public Path getVersionDir(long version) {
+ return new Path(basePath, VERSION_PREFIX + version);
+ }
+
+ @Override
+ GlobalDictMetadata getMetadata(long version) throws IOException {
+ Path versionDir = getVersionDir(version);
+ FileStatus[] indexFiles = fileSystem.listStatus(versionDir, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith(V1_INDEX_NAME);
+ }
+ });
+ checkState(indexFiles.length == 1, "zero or more than one index file found: %s", Arrays.toString(indexFiles));
+
+ IndexFormat format;
+ String indexFile = indexFiles[0].getPath().getName();
+ if (V2_INDEX_NAME.equals(indexFile)) {
+ format = new IndexFormatV2(fileSystem, conf);
+ } else if (V1_INDEX_NAME.equals(indexFile)) {
+ format = new IndexFormatV1(fileSystem, conf);
+ } else {
+ throw new RuntimeException("Unknown index file: " + indexFile);
+ }
+
+ return format.readIndexFile(versionDir);
+ }
+
+ @Override
+ DictSlice readSlice(String directory, String sliceFileName) {
+ Path path = new Path(directory, sliceFileName);
+ logger.info("read slice from {}", path);
+ try (FSDataInputStream input = fileSystem.open(path, BUFFER_SIZE)) {
+ return DictSlice.deserializeFrom(input);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("read slice %s failed", path), e);
+ }
+ }
+
+ @Override
+ String writeSlice(String workingDir, DictSliceKey key, DictNode slice) {
+ //write new slice
+ String sliceFile = IndexFormatV2.sliceFileName(key);
+ Path path = new Path(workingDir, sliceFile);
+
+ logger.info("write slice with key {} into file {}", key, path);
+ try (FSDataOutputStream out = fileSystem.create(path, true, BUFFER_SIZE)) {
+ byte[] bytes = slice.buildTrieBytes();
+ out.write(bytes);
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("write slice with key %s into file %s failed", key, path), e);
+ }
+ return sliceFile;
+ }
+
+ @Override
+ void deleteSlice(String workingDir, String sliceFileName) {
+ Path path = new Path(workingDir, sliceFileName);
+ logger.info("delete slice at {}", path);
+ try {
+ if (fileSystem.exists(path)) {
+ fileSystem.delete(path, false);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(String.format("delete slice at %s failed", path), e);
+ }
+ }
+
+ @Override
+ void commit(String workingDir, GlobalDictMetadata metadata) throws IOException {
+ Path workingPath = new Path(workingDir);
+
+ // delete v1 index file
+ Path oldIndexFile = new Path(workingPath, V1_INDEX_NAME);
+ if (fileSystem.exists(oldIndexFile)) {
+ fileSystem.delete(oldIndexFile, false);
+ }
+ // write v2 index file
+ IndexFormat index = new IndexFormatV2(fileSystem, conf);
+ index.writeIndexFile(workingPath, metadata);
+ index.sanityCheck(workingPath, metadata);
+
+ // copy working dir to newVersion dir
+ Path newVersionPath = new Path(basePath, VERSION_PREFIX + System.currentTimeMillis());
+ fileSystem.rename(workingPath, newVersionPath);
+
+ cleanUp();
+ }
+
+ // Check versions count, delete expired versions
+ private void cleanUp() throws IOException {
+ Long[] versions = listAllVersions();
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < versions.length - maxVersions; i++) {
+ if (versions[i] + versionTTL < timestamp) {
+ fileSystem.delete(getVersionDir(versions[i]), true);
+ }
+ }
+ }
+
+ @Override
+ String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
+ checkArgument(baseDir.startsWith(srcConfig.getHdfsWorkingDirectory()), "Please check why current directory {} doesn't belong to source working directory {}", baseDir, srcConfig.getHdfsWorkingDirectory());
+
+ final String dstBaseDir = baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory());
+
+ Long[] versions = listAllVersions();
+ if (versions.length == 0) { // empty dict, nothing to copy
+ return dstBaseDir;
+ }
+
+ Path srcVersionDir = getVersionDir(versions[versions.length - 1]);
+ Path dstVersionDir = new Path(srcVersionDir.toString().replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()));
+ FileSystem dstFS = dstVersionDir.getFileSystem(conf);
+ if (dstFS.exists(dstVersionDir)) {
+ dstFS.delete(dstVersionDir, true);
+ }
+ FileUtil.copy(fileSystem, srcVersionDir, dstFS, dstVersionDir, false, true, conf);
+
+ return dstBaseDir;
+ }
+
+ public interface IndexFormat {
+ GlobalDictMetadata readIndexFile(Path dir) throws IOException;
+
+ void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException;
+
+ void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException;
+ }
+
+ public static class IndexFormatV1 implements IndexFormat {
+ static final String SLICE_PREFIX = "cached_";
+
+ protected final FileSystem fs;
+ protected final Configuration conf;
+
+ protected IndexFormatV1(FileSystem fs, Configuration conf) {
+ this.fs = fs;
+ this.conf = conf;
+ }
+
+ @Override
+ public GlobalDictMetadata readIndexFile(Path dir) throws IOException {
+ Path indexFile = new Path(dir, V1_INDEX_NAME);
+ try (FSDataInputStream in = fs.open(indexFile)) {
+ int baseId = in.readInt();
+ int maxId = in.readInt();
+ int maxValueLength = in.readInt();
+ int nValues = in.readInt();
+ String converterName = in.readUTF();
+ BytesConverter converter;
+ try {
+ converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e);
+ }
+
+ int nSlices = in.readInt();
+ TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>();
+ for (int i = 0; i < nSlices; i++) {
+ DictSliceKey key = new DictSliceKey();
+ key.readFields(in);
+ sliceFileMap.put(key, sliceFileName(key));
+ }
+ // make sure first key is always ""
+ String firstFile = sliceFileMap.remove(sliceFileMap.firstKey());
+ sliceFileMap.put(DictSliceKey.START_KEY, firstFile);
+
+ return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap);
+ }
+ }
+
+ //only for test
+ @Override
+ public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException {
+ Path indexFile = new Path(dir, V1_INDEX_NAME);
+ try (FSDataOutputStream out = fs.create(indexFile, true)) {
+ out.writeInt(metadata.baseId);
+ out.writeInt(metadata.maxId);
+ out.writeInt(metadata.maxValueLength);
+ out.writeInt(metadata.nValues);
+ out.writeUTF(metadata.bytesConverter.getClass().getName());
+ out.writeInt(metadata.sliceFileMap.size());
+ for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+ entry.getKey().write(out);
+ }
+ }
+ }
+
+ @Override
+ public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException {
+ throw new UnsupportedOperationException("sanityCheck V1 format is no longer supported");
+ }
+
+ public static String sliceFileName(DictSliceKey key) {
+ return SLICE_PREFIX + key;
+ }
+ }
+
+ public static class IndexFormatV2 extends IndexFormatV1 {
+ static final String SLICE_PREFIX = "cached_";
+ static final int MINOR_VERSION_V1 = 0x01;
+
+ protected IndexFormatV2(FileSystem fs, Configuration conf) {
+ super(fs, conf);
+ }
+
+ @Override
+ public GlobalDictMetadata readIndexFile(Path dir) throws IOException {
+ Path indexFile = new Path(dir, V2_INDEX_NAME);
+ try (FSDataInputStream in = fs.open(indexFile)) {
+ byte minorVersion = in.readByte(); // include a header to allow minor format changes
+ if (minorVersion != MINOR_VERSION_V1) {
+ throw new RuntimeException("Unsupported minor version " + minorVersion);
+ }
+ int baseId = in.readInt();
+ int maxId = in.readInt();
+ int maxValueLength = in.readInt();
+ int nValues = in.readInt();
+ String converterName = in.readUTF();
+ BytesConverter converter;
+ try {
+ converter = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Fail to instantiate BytesConverter: " + converterName, e);
+ }
+
+ int nSlices = in.readInt();
+ TreeMap<DictSliceKey, String> sliceFileMap = new TreeMap<>();
+ for (int i = 0; i < nSlices; i++) {
+ DictSliceKey key = new DictSliceKey();
+ key.readFields(in);
+ String sliceFileName = in.readUTF();
+ sliceFileMap.put(key, sliceFileName);
+ }
+
+ return new GlobalDictMetadata(baseId, maxId, maxValueLength, nValues, converter, sliceFileMap);
+ }
+ }
+
+ @Override
+ public void writeIndexFile(Path dir, GlobalDictMetadata metadata) throws IOException {
+ Path indexFile = new Path(dir, V2_INDEX_NAME);
+ try (FSDataOutputStream out = fs.create(indexFile, true)) {
+ out.writeByte(MINOR_VERSION_V1);
+ out.writeInt(metadata.baseId);
+ out.writeInt(metadata.maxId);
+ out.writeInt(metadata.maxValueLength);
+ out.writeInt(metadata.nValues);
+ out.writeUTF(metadata.bytesConverter.getClass().getName());
+ out.writeInt(metadata.sliceFileMap.size());
+ for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+ entry.getKey().write(out);
+ out.writeUTF(entry.getValue());
+ }
+ }
+ }
+
+ @Override
+ public void sanityCheck(Path dir, GlobalDictMetadata metadata) throws IOException {
+ for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+ if (!fs.exists(new Path(dir, entry.getValue()))) {
+ throw new RuntimeException("The slice file " + entry.getValue() + " for the key: " + entry.getKey() + " must be existed!");
+ }
+ }
+ }
+
+ public static String sliceFileName(DictSliceKey key) {
+ return String.format("%s%d_%d", SLICE_PREFIX, System.currentTimeMillis(), key.hashCode());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
new file mode 100644
index 0000000..65c80ca
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictMetadata.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Encapsulates the metadata for a particular version of the global dictionary.
+ * Usually each version of a global dictionary stores its metadata in an index file.
+ */
+public class GlobalDictMetadata {
+ final int baseId;
+ final int maxId;
+ final int maxValueLength;
+ final int nValues;
+ final BytesConverter bytesConverter;
+ final TreeMap<DictSliceKey, String> sliceFileMap; // slice key -> slice file name
+
+ public GlobalDictMetadata(int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, NavigableMap<DictSliceKey, String> sliceFileMap) {
+
+ Preconditions.checkNotNull(bytesConverter, "bytesConverter");
+ Preconditions.checkNotNull(sliceFileMap, "sliceFileMap");
+
+ this.baseId = baseId;
+ this.maxId = maxId;
+ this.maxValueLength = maxValueLength;
+ this.nValues = nValues;
+ this.bytesConverter = bytesConverter;
+ this.sliceFileMap = new TreeMap<>(sliceFileMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
new file mode 100644
index 0000000..5817868
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictStore.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+
+import java.io.IOException;
+
+public abstract class GlobalDictStore {
+
+ protected final String baseDir; // base directory containing all versions of this global dict
+ protected final int maxVersions;
+ protected final int versionTTL;
+
+ protected GlobalDictStore(String baseDir) {
+ this.baseDir = Preconditions.checkNotNull(baseDir, "baseDir");
+ this.maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
+ this.versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
+ }
+
+ // workingDir should be an absolute path, will create if not exists
+ abstract void prepareForWrite(String workingDir) throws IOException;
+
+ /**
+ * @return all versions of this dictionary in ascending order
+ * @throws IOException on I/O error
+ */
+ abstract Long[] listAllVersions() throws IOException;
+
+ // return the path of specified version dir
+ abstract Path getVersionDir(long version);
+
+ /**
+ * Get the metadata for a particular version of the dictionary.
+ * @param version version number
+ * @return <i>GlobalDictMetadata</i> for the specified version
+ * @throws IOException on I/O error
+ */
+ abstract GlobalDictMetadata getMetadata(long version) throws IOException;
+
+ /**
+ * Read a <i>DictSlice</i> from a slice file.
+ * @param workingDir directory of the slice file
+ * @param sliceFileName file name of the slice
+ * @return a <i>DictSlice</i>
+ * @throws IOException on I/O error
+ */
+ abstract DictSlice readSlice(String workingDir, String sliceFileName);
+
+ /**
+ * Write a slice with the given key to the specified directory.
+ * @param workingDir where to write the slice, should exist
+ * @param key slice key
+ * @param slice slice to write
+ * @return file name of the new written slice
+ * @throws IOException on I/O error
+ */
+ abstract String writeSlice(String workingDir, DictSliceKey key, DictNode slice);
+
+ /**
+ * Delete a slice with the specified file name.
+ * @param workingDir directory of the slice file, should exist
+ * @param sliceFileName file name of the slice, should exist
+ * @throws IOException on I/O error
+ */
+ abstract void deleteSlice(String workingDir, String sliceFileName);
+
+ /**
+ * commit the <i>DictSlice</i> and <i>GlobalDictMetadata</i> in workingDir to new versionDir
+ * @param workingDir where store the tmp slice and index, should exist
+ * @param globalDictMetadata the metadata of global dict
+ * @throws IOException on I/O error
+ */
+ abstract void commit(String workingDir, GlobalDictMetadata globalDictMetadata) throws IOException;
+
+ /**
+ * Copy the latest version of this dict to another meta. The source is unchanged.
+ * @param srcConfig config of source meta
+ * @param dstConfig config of destination meta
+ * @return the new base directory for destination meta
+ * @throws IOException on I/O error
+ */
+ abstract String copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index cda3c2b..7921980 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
import java.io.IOException;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
/**
@@ -28,7 +29,7 @@ import org.apache.kylin.common.util.Dictionary;
* Created by sunyerui on 16/5/24.
*/
public class GlobalDictionaryBuilder implements IDictionaryBuilder {
- AppendTrieDictionary.Builder<String> builder;
+ AppendTrieDictionaryBuilder builder;
int baseId;
@Override
@@ -36,19 +37,20 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
if (dictInfo == null) {
throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
}
- this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir());
+
+ int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+ this.builder = new AppendTrieDictionaryBuilder(dictInfo.getResourceDir(), maxEntriesPerSlice);
this.baseId = baseId;
}
-
+
@Override
public boolean addValue(String value) {
if (value == null)
return false;
-
builder.addValue(value);
return true;
}
-
+
@Override
public Dictionary<String> build() throws IOException {
return builder.build(baseId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/ce8b24f6/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index e2af338..9da5071 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.dict;
+import static org.apache.kylin.dict.GlobalDictHDFSStore.V2_INDEX_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
@@ -29,62 +30,76 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-/**
- * Created by sunyerui on 16/4/28.
- */
public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
- public static final String BASE_DIR = "file:///tmp/kylin_append_dict";
- public static final String RESOURCE_DIR = "/dict/append_dict_test";
+ private static final UUID uuid = UUID.randomUUID();
+ private static final String RESOURCE_DIR = "/dict/append_dict_test/" + uuid;
+ private static final String HDFS_DIR = "file:///tmp/kylin_append_dict";
+ private static String BASE_DIR;
+ private static String LOCAL_BASE_DIR = "/tmp/kylin_append_dict/kylin_metadata/resources/GlobalDict" + RESOURCE_DIR + "/";
@Before
- public void setUp() {
+ public void beforeTest() {
staticCreateTestMetadata();
- System.setProperty("kylin.dictionary.append-entry-size", "50000");
- System.setProperty("kylin.env.hdfs-working-dir", BASE_DIR);
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "50000");
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.env.hdfs-working-dir", HDFS_DIR);
+ BASE_DIR = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "/resources/GlobalDict" + RESOURCE_DIR + "/";
}
@After
- public void after() {
+ public void afterTest() {
cleanup();
staticCleanupTestMetadata();
}
- public static void cleanup() {
- Path basePath = new Path(BASE_DIR);
+ private void cleanup() {
+ Path basePath = new Path(HDFS_DIR);
try {
HadoopUtil.getFileSystem(basePath).delete(basePath, true);
- } catch (IOException e) {}
+ } catch (IOException e) {
+ }
}
- public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
+ private static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
"", // empty
- "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
- "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
+ "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii", "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
"paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
- + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
+ + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
"paint", "tar", "try", // some dup
};
+ private AppendTrieDictionaryBuilder createBuilder(String resourceDir) throws IOException {
+ int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
+ return new AppendTrieDictionaryBuilder(resourceDir, maxEntriesPerSlice);
+ }
+
@Test
public void testStringRepeatly() throws IOException {
ArrayList<String> list = new ArrayList<>();
@@ -94,20 +109,22 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
notfound.add("pars");
notfound.add("tri");
notfound.add("\u5b57");
- for (int i = 0; i < 100; i++) {
+ for (int i = 0; i < 50; i++) {
testStringDictAppend(list, notfound, true);
+ //to speed up the test
+ cleanup();
}
}
@Test
- public void englishWordsTest() throws Exception {
+ public void testEnglishWords() throws Exception {
InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
ArrayList<String> str = loadStrings(is);
testStringDictAppend(str, null, false);
}
@Test
- public void categoryNamesTest() throws Exception {
+ public void testCategoryNames() throws Exception {
InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
ArrayList<String> str = loadStrings(is);
testStringDictAppend(str, null, true);
@@ -133,7 +150,8 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
@Ignore("need huge key set")
@Test
public void testHugeKeySet() throws IOException {
- AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+
AppendTrieDictionary<String> dict = null;
InputStream is = new FileInputStream("src/test/resources/dict/huge_key");
@@ -143,17 +161,17 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
while ((word = reader.readLine()) != null) {
word = word.trim();
if (!word.isEmpty())
- b.addValue(word);
+ builder.addValue(word);
}
} finally {
reader.close();
is.close();
}
- dict = b.build(0);
+ dict = builder.build(0);
dict.dump(System.out);
}
- private static void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException {
+ private void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException {
Random rnd = new Random(System.currentTimeMillis());
ArrayList<String> strList = new ArrayList<String>();
strList.addAll(list);
@@ -162,8 +180,8 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
}
BytesConverter converter = new StringBytesConverter();
- AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
- AppendTrieDictionary<String> dict = null;
+ AppendTrieDictionaryBuilder b = createBuilder(RESOURCE_DIR);
+
TreeMap<Integer, String> checkMap = new TreeMap<>();
int firstAppend = rnd.nextInt(strList.size() / 2);
int secondAppend = firstAppend + rnd.nextInt((strList.size() - firstAppend) / 2);
@@ -173,7 +191,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
for (; appendIndex < firstAppend; appendIndex++) {
b.addValue(strList.get(appendIndex));
}
- dict = b.build(0);
+ AppendTrieDictionary<String> dict = b.build(0);
dict.dump(System.out);
for (; checkIndex < firstAppend; checkIndex++) {
String str = strList.get(checkIndex);
@@ -185,13 +203,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
}
// reopen dict and append
-// b = AppendTrieDictionary.Builder.create(dict);
- b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
+ b = createBuilder(RESOURCE_DIR);
+
for (; appendIndex < secondAppend; appendIndex++) {
b.addValue(strList.get(appendIndex));
}
- AppendTrieDictionary newDict = b.build(0);
- assert newDict == dict;
+ AppendTrieDictionary<String> newDict = b.build(0);
+ assert newDict.equals(dict);
dict = newDict;
dict.dump(System.out);
checkIndex = 0;
@@ -210,12 +228,13 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
}
// reopen dict and append rest str
- b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
+ b = createBuilder(RESOURCE_DIR);
+
for (; appendIndex < strList.size(); appendIndex++) {
b.addValue(strList.get(appendIndex));
}
newDict = b.build(0);
- assert newDict == dict;
+ assert newDict.equals(dict);
dict = newDict;
dict.dump(System.out);
checkIndex = 0;
@@ -268,7 +287,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
@Test
public void testMaxInteger() throws IOException {
- AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
builder.setMaxId(Integer.MAX_VALUE - 2);
builder.addValue("a");
builder.addValue("ab");
@@ -284,7 +303,7 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
@Ignore("Only occurred when value is very long (>8000 bytes)")
@Test
public void testSuperLongValue() throws IOException {
- AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
String value = "a";
for (int i = 0; i < 10000; i++) {
value += "a";
@@ -299,17 +318,15 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
dictionary.getMaxId();
}
- private static class SharedBuilderThread extends Thread {
+ private class SharedBuilderThread extends Thread {
CountDownLatch startLatch;
CountDownLatch finishLatch;
- String resourcePath;
String prefix;
int count;
- SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) {
+ SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String prefix, int count) {
this.startLatch = startLatch;
this.finishLatch = finishLatch;
- this.resourcePath = resourcePath;
this.prefix = prefix;
this.count = count;
}
@@ -317,27 +334,28 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
@Override
public void run() {
try {
- AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
startLatch.countDown();
for (int i = 0; i < count; i++) {
builder.addValue(prefix + i);
}
builder.build(0);
finishLatch.countDown();
- } catch (IOException e) {}
+ } catch (IOException e) {
+ }
}
}
+ @Ignore
@Test
public void testSharedBuilder() throws IOException, InterruptedException {
- String resourcePath = "shared_builder";
final CountDownLatch startLatch = new CountDownLatch(3);
final CountDownLatch finishLatch = new CountDownLatch(3);
- AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
- Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000);
- Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10);
- Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000);
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ Thread t1 = new SharedBuilderThread(startLatch, finishLatch, "t1_", 10000);
+ Thread t2 = new SharedBuilderThread(startLatch, finishLatch, "t2_", 10);
+ Thread t3 = new SharedBuilderThread(startLatch, finishLatch, "t3_", 100000);
t1.start();
t2.start();
t3.start();
@@ -345,23 +363,228 @@ public class AppendTrieDictionaryTest extends LocalFileMetadataTestCase {
AppendTrieDictionary dict = builder.build(0);
assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS));
assertEquals(110010, dict.getMaxId());
- try {
- builder.addValue("fail");
- fail("Builder should be closed");
- } catch (Exception e) {}
- builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict);
+ builder = createBuilder(RESOURCE_DIR);
builder.addValue("success");
+ builder.addValue("s");
dict = builder.build(0);
- for (int i = 0; i < 10000; i ++) {
+ for (int i = 0; i < 10000; i++) {
assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
}
- for (int i = 0; i < 10; i ++) {
+ for (int i = 0; i < 10; i++) {
assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
}
- for (int i = 0; i < 100000; i ++) {
+ for (int i = 0; i < 100000; i++) {
assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
}
assertEquals(110011, dict.getIdFromValue("success"));
+ assertEquals(110012, dict.getIdFromValue("s"));
+ }
+
+ @Test
+ public void testSplitContainSuperLongValue() throws IOException {
+ String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
+
+ createAppendTrieDict(Arrays.asList("a", superLongValue));
+ }
+
+ @Test
+ public void testSuperLongValueAsFileName() throws IOException {
+ String superLongValue = "%5Cx1A%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%7E%29%5CxEF%5CxBF%5CxBD%5Cx1B+%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5Cx13%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5CxEF%5CxBF%5CxBD%5B";
+
+ createAppendTrieDict(Arrays.asList("a", superLongValue));
+ }
+
+ @Test
+ public void testIllegalFileNameValue() throws IOException {
+ createAppendTrieDict(Arrays.asList("::", ":"));
+ }
+
+ @Test
+ public void testSkipAddValue() throws IOException {
+ createAppendTrieDict(new ArrayList<String>());
+ }
+
+ private void createAppendTrieDict(List<String> valueList) throws IOException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "1");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+
+ for (String value : valueList) {
+ builder.addValue(value);
+ }
+
+ builder.build(0);
+ }
+
+ private static class CachedFileFilter implements FileFilter {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith("cached_");
+ }
+ }
+
+ private static class VersionFilter implements FileFilter {
+ @Override
+ public boolean accept(File pathname) {
+ return pathname.getName().startsWith(GlobalDictHDFSStore.VERSION_PREFIX);
+ }
+ }
+
+ @Test
+ public void testMultiVersions() throws IOException, InterruptedException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("a");
+ builder.addValue("b");
+ builder.addValue("c");
+ builder.addValue("d");
+ builder.addValue("e");
+ builder.addValue("f");
+ AppendTrieDictionary dict = builder.build(0);
+
+ assertEquals(2, dict.getIdFromValue("b"));
+
+ // re-open dict, append new data
+ builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("g");
+
+ // new data is not visible
+ try {
+ dict.getIdFromValue("g");
+ fail("Value 'g' (g) not exists!");
+ } catch (IllegalArgumentException e) {
+
+ }
+
+ // append data, and be visible for new immutable map
+ builder.addValue("h");
+
+ AppendTrieDictionary newDict = builder.build(0);
+ assert newDict.equals(dict);
+
+ assertEquals(7, newDict.getIdFromValue("g"));
+ assertEquals(8, newDict.getIdFromValue("h"));
+
+ // Check versions retention
+ File dir = new File(LOCAL_BASE_DIR);
+ assertEquals(2, dir.listFiles(new VersionFilter()).length);
+ }
+
+ @Test
+ public void testVersionRetention() throws IOException, InterruptedException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-max-versions", "1");
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-version-ttl", "1000");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("a");
+
+ //version 1
+ builder.build(0);
+
+ // Check versions retention
+ File dir = new File(LOCAL_BASE_DIR);
+ assertEquals(1, dir.listFiles(new VersionFilter()).length);
+
+ // sleep to make version 1 expired
+ Thread.sleep(1200);
+
+ //version 2
+ builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("");
+ builder.build(0);
+
+ // Check versions retention
+ assertEquals(1, dir.listFiles(new VersionFilter()).length);
+ }
+
+ @Test
+ public void testOldDirFormat() throws IOException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("a");
+ builder.addValue("b");
+ builder.addValue("c");
+ builder.addValue("d");
+ builder.addValue("e");
+ builder.addValue("f");
+ builder.build(0);
+
+ convertDirToOldFormat(BASE_DIR);
+
+ File dir = new File(LOCAL_BASE_DIR);
+ assertEquals(0, dir.listFiles(new VersionFilter()).length);
+ assertEquals(3, dir.listFiles(new CachedFileFilter()).length);
+
+ //convert older format to new format when builder init
+ builder = createBuilder(RESOURCE_DIR);
+ builder.build(0);
+
+ assertEquals(1, dir.listFiles(new VersionFilter()).length);
+ }
+
+ private void convertDirToOldFormat(String baseDir) throws IOException {
+ Path basePath = new Path(baseDir);
+ FileSystem fs = HadoopUtil.getFileSystem(basePath);
+
+ // move version dir to base dir, to simulate the older format
+ GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir);
+ Long[] versions = store.listAllVersions();
+ Path versionPath = store.getVersionDir(versions[versions.length - 1]);
+ Path tmpVersionPath = new Path(versionPath.getParent().getParent(), versionPath.getName());
+ fs.rename(versionPath, tmpVersionPath);
+ fs.delete(new Path(baseDir), true);
+ fs.rename(tmpVersionPath, new Path(baseDir));
+ }
+
+ @Test
+ public void testOldIndexFormat() throws IOException {
+ KylinConfig.getInstanceFromEnv().setProperty("kylin.dictionary.append-entry-size", "4");
+
+ AppendTrieDictionaryBuilder builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("a");
+ builder.addValue("b");
+ builder.addValue("c");
+ builder.addValue("d");
+ builder.addValue("e");
+ builder.addValue("f");
+ builder.build(0);
+
+ convertIndexToOldFormat(BASE_DIR);
+
+ builder = createBuilder(RESOURCE_DIR);
+ builder.addValue("g");
+ builder.addValue("h");
+ builder.addValue("i");
+ AppendTrieDictionary dict = builder.build(0);
+
+ assertEquals(1, dict.getIdFromValue("a"));
+ assertEquals(7, dict.getIdFromValue("g"));
+ }
+
+ private void convertIndexToOldFormat(String baseDir) throws IOException {
+ Path basePath = new Path(baseDir);
+ FileSystem fs = HadoopUtil.getFileSystem(basePath);
+
+ GlobalDictHDFSStore store = new GlobalDictHDFSStore(baseDir);
+ Long[] versions = store.listAllVersions();
+ GlobalDictMetadata metadata = store.getMetadata(versions[versions.length - 1]);
+
+ //convert v2 index to v1 index
+ Path versionPath = store.getVersionDir(versions[versions.length - 1]);
+ Path v2IndexFile = new Path(versionPath, V2_INDEX_NAME);
+
+ fs.delete(v2IndexFile, true);
+ GlobalDictHDFSStore.IndexFormat indexFormatV1 = new GlobalDictHDFSStore.IndexFormatV1(fs, HadoopUtil.getCurrentConfiguration());
+ indexFormatV1.writeIndexFile(versionPath, metadata);
+
+ //convert v2 fileName format to v1 fileName format
+ for (Map.Entry<DictSliceKey, String> entry : metadata.sliceFileMap.entrySet()) {
+ fs.rename(new Path(versionPath, entry.getValue()), new Path(versionPath, "cached_" + entry.getKey()));
+ }
}
+
}