You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/27 10:28:22 UTC
[25/50] [abbrv] kylin git commit: KYLIN-2195 All code changes,
ready for test
http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
index 916c369..598865b 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java
@@ -1,512 +1,512 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * Builds a dictionary using Trie structure. All values are taken in byte[] form
- * and organized in a Trie with ordering. Then numeric IDs are assigned in
- * sequence.
- *
- * @author yangli9
- */
-public class TrieDictionaryBuilder<T> {
-
- private static final int _2GB = 2000000000;
-
- public static class Node {
- public byte[] part;
- public boolean isEndOfValue;
- public ArrayList<Node> children;
-
- public int nValuesBeneath; // only present after stats()
-
- Node(byte[] value, boolean isEndOfValue) {
- reset(value, isEndOfValue);
- }
-
- Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
- reset(value, isEndOfValue, children);
- }
-
- void reset(byte[] value, boolean isEndOfValue) {
- reset(value, isEndOfValue, new ArrayList<Node>());
- }
-
- void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
- this.part = value;
- this.isEndOfValue = isEndOfValue;
- this.children = children;
- }
- }
-
- public static interface Visitor {
- void visit(Node n, int level);
- }
-
- // ============================================================================
-
- private Node root;
- private BytesConverter<T> bytesConverter;
-
- public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) {
- this.root = new Node(new byte[0], false);
- this.bytesConverter = bytesConverter;
- }
-
- public void addValue(T value) {
- addValue(bytesConverter.convertToBytes(value));
- }
-
- // add a converted value (given in byte[] format), use with care, for internal only
- void addValue(byte[] value) {
- addValueR(root, value, 0);
- }
-
- private void addValueR(Node node, byte[] value, int start) {
- // match the value part of current node
- int i = 0, j = start;
- int n = node.part.length, nn = value.length;
- int comp = 0;
- for (; i < n && j < nn; i++, j++) {
- comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
- if (comp != 0)
- break;
- }
-
- if (j == nn) {
- // if value fully matched within the current node
- if (i == n) {
- // if equals to current node, just mark end of value
- node.isEndOfValue = true;
- } else {
- // otherwise, split the current node into two
- Node c = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
- node.reset(BytesUtil.subarray(node.part, 0, i), true);
- node.children.add(c);
- }
- return;
- }
-
- // if partially matched the current, split the current node, add the new value, make a 3-way
- if (i < n) {
- Node c1 = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
- Node c2 = new Node(BytesUtil.subarray(value, j, nn), true);
- node.reset(BytesUtil.subarray(node.part, 0, i), false);
- if (comp < 0) {
- node.children.add(c1);
- node.children.add(c2);
- } else {
- node.children.add(c2);
- node.children.add(c1);
- }
- return;
- }
-
- // out matched the current, binary search the next byte for a child node to continue
- byte lookfor = value[j];
- int lo = 0;
- int hi = node.children.size() - 1;
- int mid = 0;
- boolean found = false;
- comp = 0;
- while (!found && lo <= hi) {
- mid = lo + (hi - lo) / 2;
- comp = BytesUtil.compareByteUnsigned(lookfor, node.children.get(mid).part[0]);
- if (comp < 0)
- hi = mid - 1;
- else if (comp > 0)
- lo = mid + 1;
- else
- found = true;
- }
- if (found) {
- // found a child node matching the first byte, continue in that child
- addValueR(node.children.get(mid), value, j);
- } else {
- // otherwise, make the value a new child
- Node c = new Node(BytesUtil.subarray(value, j, nn), true);
- node.children.add(comp <= 0 ? mid : mid + 1, c);
- }
- }
-
- public void traverse(Visitor visitor) {
- traverseR(root, visitor, 0);
- }
-
- private void traverseR(Node node, Visitor visitor, int level) {
- visitor.visit(node, level);
- for (Node c : node.children)
- traverseR(c, visitor, level + 1);
- }
-
- public void traversePostOrder(Visitor visitor) {
- traversePostOrderR(root, visitor, 0);
- }
-
- private void traversePostOrderR(Node node, Visitor visitor, int level) {
- for (Node c : node.children)
- traversePostOrderR(c, visitor, level + 1);
- visitor.visit(node, level);
- }
-
- public static class Stats {
- public int nValues; // number of values in total
- public int nValueBytesPlain; // number of bytes for all values
- // uncompressed
- public int nValueBytesCompressed; // number of values bytes in Trie
- // (compressed)
- public int maxValueLength; // size of longest value in bytes
-
- // the trie is multi-byte-per-node
- public int mbpn_nNodes; // number of nodes in trie
- public int mbpn_trieDepth; // depth of trie
- public int mbpn_maxFanOut; // the maximum no. children
- public long mbpn_nChildLookups; // number of child lookups during lookup every value once
- public long mbpn_nTotalFanOut; // the sum of fan outs during lookup every value once
- public int mbpn_sizeValueTotal; // the sum of value space in all nodes
- public int mbpn_sizeNoValueBytes; // size of field noValueBytes
- public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath, depends on cardinality
- public int mbpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array
- public long mbpn_footprint; // MBPN footprint in bytes
-
- // stats for one-byte-per-node as well, so there's comparison
- public int obpn_sizeValue; // size of value per node, always 1
- public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath, depends on cardinality
- public int obpn_sizeChildCount; // size of field childCount, enables binary search among children
- public int obpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array
- public int obpn_nNodes; // no. nodes in OBPN trie
- public long obpn_footprint; // OBPN footprint in bytes
-
- public void print() {
- PrintStream out = System.out;
- out.println("============================================================================");
- out.println("No. values: " + nValues);
- out.println("No. bytes raw: " + nValueBytesPlain);
- out.println("No. bytes in trie: " + nValueBytesCompressed);
- out.println("Longest value length: " + maxValueLength);
-
- // flatten trie footprint calculation, case of One-Byte-Per-Node
- out.println("----------------------------------------------------------------------------");
- out.println("OBPN node size: " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset);
- out.println("OBPN no. nodes: " + obpn_nNodes);
- out.println("OBPN trie depth: " + maxValueLength);
- out.println("OBPN footprint: " + obpn_footprint + " in bytes");
-
- // flatten trie footprint calculation, case of Multi-Byte-Per-Node
- out.println("----------------------------------------------------------------------------");
- out.println("MBPN max fan out: " + mbpn_maxFanOut);
- out.println("MBPN no. child lookups: " + mbpn_nChildLookups);
- out.println("MBPN total fan out: " + mbpn_nTotalFanOut);
- out.println("MBPN average fan out: " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups);
- out.println("MBPN values size total: " + mbpn_sizeValueTotal);
- out.println("MBPN node size: " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset);
- out.println("MBPN no. nodes: " + mbpn_nNodes);
- out.println("MBPN trie depth: " + mbpn_trieDepth);
- out.println("MBPN footprint: " + mbpn_footprint + " in bytes");
- }
- }
-
- /** out print some statistics of the trie and the dictionary built from it */
- public Stats stats() {
- // calculate nEndValueBeneath
- traversePostOrder(new Visitor() {
- @Override
- public void visit(Node n, int level) {
- n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
- for (Node c : n.children)
- n.nValuesBeneath += c.nValuesBeneath;
- }
- });
-
- // run stats
- final Stats s = new Stats();
- final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
- traverse(new Visitor() {
- @Override
- public void visit(Node n, int level) {
- if (n.isEndOfValue)
- s.nValues++;
- s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
- s.nValueBytesCompressed += n.part.length;
- s.mbpn_nNodes++;
- if (s.mbpn_trieDepth < level + 1)
- s.mbpn_trieDepth = level + 1;
- if (n.children.size() > 0) {
- if (s.mbpn_maxFanOut < n.children.size())
- s.mbpn_maxFanOut = n.children.size();
- int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
- s.mbpn_nChildLookups += childLookups;
- s.mbpn_nTotalFanOut += childLookups * n.children.size();
- }
-
- if (level < lenAtLvl.size())
- lenAtLvl.set(level, n.part.length);
- else
- lenAtLvl.add(n.part.length);
- int lenSoFar = 0;
- for (int i = 0; i <= level; i++)
- lenSoFar += lenAtLvl.get(i);
- if (lenSoFar > s.maxValueLength)
- s.maxValueLength = lenSoFar;
- }
- });
-
- // flatten trie footprint calculation, case of One-Byte-Per-Node
- s.obpn_sizeValue = 1;
- s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues);
- s.obpn_sizeChildCount = 1;
- s.obpn_sizeChildOffset = 5; // MSB used as isEndOfValue flag
- s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total number of compressed bytes in OBPN
- s.obpn_footprint = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset);
- while (true) { // minimize the offset size to match the footprint
- long t = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1);
- if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2 because MSB of offset is used for isEndOfValue flag
- s.obpn_sizeChildOffset--;
- s.obpn_footprint = t;
- } else
- break;
- }
-
- // flatten trie footprint calculation, case of Multi-Byte-Per-Node
- s.mbpn_sizeValueTotal = s.nValueBytesCompressed;
- s.mbpn_sizeNoValueBytes = 1;
- s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues);
- s.mbpn_sizeChildOffset = 5;
- s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset);
- while (true) { // minimize the offset size to match the footprint
- long t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1);
- if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
- s.mbpn_sizeChildOffset--;
- s.mbpn_footprint = t;
- } else
- break;
- }
-
- return s;
- }
-
- /** out print trie for debug */
- public void print() {
- print(System.out);
- }
-
- public void print(final PrintStream out) {
- traverse(new Visitor() {
- @Override
- public void visit(Node n, int level) {
- try {
- for (int i = 0; i < level; i++)
- out.print(" ");
- out.print(new String(n.part, "UTF-8"));
- out.print(" - ");
- if (n.nValuesBeneath > 0)
- out.print(n.nValuesBeneath);
- if (n.isEndOfValue)
- out.print("*");
- out.print("\n");
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- });
- }
-
- private CompleteParts completeParts = new CompleteParts();
-
- private class CompleteParts {
- byte[] data = new byte[4096];
- int current = 0;
-
- public void append(byte[] part) {
- while (current + part.length > data.length)
- expand();
-
- System.arraycopy(part, 0, data, current, part.length);
- current += part.length;
- }
-
- public void withdraw(int size) {
- current -= size;
- }
-
- public byte[] retrieve() {
- return Arrays.copyOf(data, current);
- }
-
- private void expand() {
- byte[] temp = new byte[2 * data.length];
- System.arraycopy(data, 0, temp, 0, data.length);
- data = temp;
- }
- }
-
- // there is a 255 limitation of length for each node's part.
- // we interpolate nodes to satisfy this when a node's part becomes
- // too long(overflow)
- private void checkOverflowParts(Node node) {
- LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children);
- for (Node child : childrenCopy) {
- if (child.part.length > 255) {
- byte[] first255 = Arrays.copyOf(child.part, 255);
-
- completeParts.append(node.part);
- completeParts.append(first255);
- byte[] visited = completeParts.retrieve();
- this.addValue(visited);
- completeParts.withdraw(255);
- completeParts.withdraw(node.part.length);
- }
- }
-
- completeParts.append(node.part); // by here the node.children may have been changed
- for (Node child : node.children) {
- checkOverflowParts(child);
- }
- completeParts.withdraw(node.part.length);
- }
-
- /**
- * Flatten the trie into a byte array for a minimized memory footprint.
- * Lookup remains fast. Cost is inflexibility to modify (becomes immutable).
- *
- * Flattened node structure is HEAD + NODEs, for each node:
- * - o byte, offset to child node, o = stats.mbpn_sizeChildOffset
- * - 1 bit, isLastChild flag, the 1st MSB of o
- * - 1 bit, isEndOfValue flag, the 2nd MSB of o
- * - c byte, number of values beneath, c = stats.mbpn_sizeNoValueBeneath
- * - 1 byte, number of value bytes
- * - n byte, value bytes
- */
- public TrieDictionary<T> build(int baseId) {
- byte[] trieBytes = buildTrieBytes(baseId);
- TrieDictionary<T> r = new TrieDictionary<T>(trieBytes);
- return r;
- }
-
- protected byte[] buildTrieBytes(int baseId) {
- checkOverflowParts(this.root);
-
- Stats stats = stats();
- int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath;
- int sizeChildOffset = stats.mbpn_sizeChildOffset;
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.IdentityHashMap;
+import java.util.LinkedList;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * Builds a dictionary using Trie structure. All values are taken in byte[] form
+ * and organized in a Trie with ordering. Then numeric IDs are assigned in
+ * sequence.
+ *
+ * @author yangli9
+ */
+public class TrieDictionaryBuilder<T> {
+
+ private static final int _2GB = 2000000000;
+
+ public static class Node {
+ public byte[] part;
+ public boolean isEndOfValue;
+ public ArrayList<Node> children;
+
+ public int nValuesBeneath; // only present after stats()
+
+ Node(byte[] value, boolean isEndOfValue) {
+ reset(value, isEndOfValue);
+ }
+
+ Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
+ reset(value, isEndOfValue, children);
+ }
+
+ void reset(byte[] value, boolean isEndOfValue) {
+ reset(value, isEndOfValue, new ArrayList<Node>());
+ }
+
+ void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) {
+ this.part = value;
+ this.isEndOfValue = isEndOfValue;
+ this.children = children;
+ }
+ }
+
+ public static interface Visitor {
+ void visit(Node n, int level);
+ }
+
+ // ============================================================================
+
+ private Node root;
+ private BytesConverter<T> bytesConverter;
+
+ public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) {
+ this.root = new Node(new byte[0], false);
+ this.bytesConverter = bytesConverter;
+ }
+
+ public void addValue(T value) {
+ addValue(bytesConverter.convertToBytes(value));
+ }
+
+ // add a converted value (given in byte[] format), use with care, for internal only
+ void addValue(byte[] value) {
+ addValueR(root, value, 0);
+ }
+
+ private void addValueR(Node node, byte[] value, int start) {
+ // match the value part of current node
+ int i = 0, j = start;
+ int n = node.part.length, nn = value.length;
+ int comp = 0;
+ for (; i < n && j < nn; i++, j++) {
+ comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]);
+ if (comp != 0)
+ break;
+ }
+
+ if (j == nn) {
+ // if value fully matched within the current node
+ if (i == n) {
+ // if equals to current node, just mark end of value
+ node.isEndOfValue = true;
+ } else {
+ // otherwise, split the current node into two
+ Node c = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+ node.reset(BytesUtil.subarray(node.part, 0, i), true);
+ node.children.add(c);
+ }
+ return;
+ }
+
+ // if partially matched the current, split the current node, add the new value, make a 3-way
+ if (i < n) {
+ Node c1 = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
+ Node c2 = new Node(BytesUtil.subarray(value, j, nn), true);
+ node.reset(BytesUtil.subarray(node.part, 0, i), false);
+ if (comp < 0) {
+ node.children.add(c1);
+ node.children.add(c2);
+ } else {
+ node.children.add(c2);
+ node.children.add(c1);
+ }
+ return;
+ }
+
+ // out matched the current, binary search the next byte for a child node to continue
+ byte lookfor = value[j];
+ int lo = 0;
+ int hi = node.children.size() - 1;
+ int mid = 0;
+ boolean found = false;
+ comp = 0;
+ while (!found && lo <= hi) {
+ mid = lo + (hi - lo) / 2;
+ comp = BytesUtil.compareByteUnsigned(lookfor, node.children.get(mid).part[0]);
+ if (comp < 0)
+ hi = mid - 1;
+ else if (comp > 0)
+ lo = mid + 1;
+ else
+ found = true;
+ }
+ if (found) {
+ // found a child node matching the first byte, continue in that child
+ addValueR(node.children.get(mid), value, j);
+ } else {
+ // otherwise, make the value a new child
+ Node c = new Node(BytesUtil.subarray(value, j, nn), true);
+ node.children.add(comp <= 0 ? mid : mid + 1, c);
+ }
+ }
+
+ public void traverse(Visitor visitor) {
+ traverseR(root, visitor, 0);
+ }
+
+ private void traverseR(Node node, Visitor visitor, int level) {
+ visitor.visit(node, level);
+ for (Node c : node.children)
+ traverseR(c, visitor, level + 1);
+ }
+
+ public void traversePostOrder(Visitor visitor) {
+ traversePostOrderR(root, visitor, 0);
+ }
+
+ private void traversePostOrderR(Node node, Visitor visitor, int level) {
+ for (Node c : node.children)
+ traversePostOrderR(c, visitor, level + 1);
+ visitor.visit(node, level);
+ }
+
+ public static class Stats {
+ public int nValues; // number of values in total
+ public int nValueBytesPlain; // number of bytes for all values
+ // uncompressed
+ public int nValueBytesCompressed; // number of values bytes in Trie
+ // (compressed)
+ public int maxValueLength; // size of longest value in bytes
+
+ // the trie is multi-byte-per-node
+ public int mbpn_nNodes; // number of nodes in trie
+ public int mbpn_trieDepth; // depth of trie
+ public int mbpn_maxFanOut; // the maximum no. children
+ public long mbpn_nChildLookups; // number of child lookups during lookup every value once
+ public long mbpn_nTotalFanOut; // the sum of fan outs during lookup every value once
+ public int mbpn_sizeValueTotal; // the sum of value space in all nodes
+ public int mbpn_sizeNoValueBytes; // size of field noValueBytes
+ public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath, depends on cardinality
+ public int mbpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array
+ public long mbpn_footprint; // MBPN footprint in bytes
+
+ // stats for one-byte-per-node as well, so there's comparison
+ public int obpn_sizeValue; // size of value per node, always 1
+ public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath, depends on cardinality
+ public int obpn_sizeChildCount; // size of field childCount, enables binary search among children
+ public int obpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array
+ public int obpn_nNodes; // no. nodes in OBPN trie
+ public long obpn_footprint; // OBPN footprint in bytes
+
+ public void print() {
+ PrintStream out = System.out;
+ out.println("============================================================================");
+ out.println("No. values: " + nValues);
+ out.println("No. bytes raw: " + nValueBytesPlain);
+ out.println("No. bytes in trie: " + nValueBytesCompressed);
+ out.println("Longest value length: " + maxValueLength);
+
+ // flatten trie footprint calculation, case of One-Byte-Per-Node
+ out.println("----------------------------------------------------------------------------");
+ out.println("OBPN node size: " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset);
+ out.println("OBPN no. nodes: " + obpn_nNodes);
+ out.println("OBPN trie depth: " + maxValueLength);
+ out.println("OBPN footprint: " + obpn_footprint + " in bytes");
+
+ // flatten trie footprint calculation, case of Multi-Byte-Per-Node
+ out.println("----------------------------------------------------------------------------");
+ out.println("MBPN max fan out: " + mbpn_maxFanOut);
+ out.println("MBPN no. child lookups: " + mbpn_nChildLookups);
+ out.println("MBPN total fan out: " + mbpn_nTotalFanOut);
+ out.println("MBPN average fan out: " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups);
+ out.println("MBPN values size total: " + mbpn_sizeValueTotal);
+ out.println("MBPN node size: " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset);
+ out.println("MBPN no. nodes: " + mbpn_nNodes);
+ out.println("MBPN trie depth: " + mbpn_trieDepth);
+ out.println("MBPN footprint: " + mbpn_footprint + " in bytes");
+ }
+ }
+
+ /** out print some statistics of the trie and the dictionary built from it */
+ public Stats stats() {
+ // calculate nEndValueBeneath
+ traversePostOrder(new Visitor() {
+ @Override
+ public void visit(Node n, int level) {
+ n.nValuesBeneath = n.isEndOfValue ? 1 : 0;
+ for (Node c : n.children)
+ n.nValuesBeneath += c.nValuesBeneath;
+ }
+ });
+
+ // run stats
+ final Stats s = new Stats();
+ final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>();
+ traverse(new Visitor() {
+ @Override
+ public void visit(Node n, int level) {
+ if (n.isEndOfValue)
+ s.nValues++;
+ s.nValueBytesPlain += n.part.length * n.nValuesBeneath;
+ s.nValueBytesCompressed += n.part.length;
+ s.mbpn_nNodes++;
+ if (s.mbpn_trieDepth < level + 1)
+ s.mbpn_trieDepth = level + 1;
+ if (n.children.size() > 0) {
+ if (s.mbpn_maxFanOut < n.children.size())
+ s.mbpn_maxFanOut = n.children.size();
+ int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0);
+ s.mbpn_nChildLookups += childLookups;
+ s.mbpn_nTotalFanOut += childLookups * n.children.size();
+ }
+
+ if (level < lenAtLvl.size())
+ lenAtLvl.set(level, n.part.length);
+ else
+ lenAtLvl.add(n.part.length);
+ int lenSoFar = 0;
+ for (int i = 0; i <= level; i++)
+ lenSoFar += lenAtLvl.get(i);
+ if (lenSoFar > s.maxValueLength)
+ s.maxValueLength = lenSoFar;
+ }
+ });
+
+ // flatten trie footprint calculation, case of One-Byte-Per-Node
+ s.obpn_sizeValue = 1;
+ s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues);
+ s.obpn_sizeChildCount = 1;
+ s.obpn_sizeChildOffset = 5; // MSB used as isEndOfValue flag
+ s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total number of compressed bytes in OBPN
+ s.obpn_footprint = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset);
+ while (true) { // minimize the offset size to match the footprint
+ long t = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1);
+ if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2 because MSB of offset is used for isEndOfValue flag
+ s.obpn_sizeChildOffset--;
+ s.obpn_footprint = t;
+ } else
+ break;
+ }
+
+ // flatten trie footprint calculation, case of Multi-Byte-Per-Node
+ s.mbpn_sizeValueTotal = s.nValueBytesCompressed;
+ s.mbpn_sizeNoValueBytes = 1;
+ s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues);
+ s.mbpn_sizeChildOffset = 5;
+ s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset);
+ while (true) { // minimize the offset size to match the footprint
+ long t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1);
+ if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
+ s.mbpn_sizeChildOffset--;
+ s.mbpn_footprint = t;
+ } else
+ break;
+ }
+
+ return s;
+ }
+
+ /** out print trie for debug */
+ public void print() {
+ print(System.out);
+ }
+
+ public void print(final PrintStream out) {
+ traverse(new Visitor() {
+ @Override
+ public void visit(Node n, int level) {
+ try {
+ for (int i = 0; i < level; i++)
+ out.print(" ");
+ out.print(new String(n.part, "UTF-8"));
+ out.print(" - ");
+ if (n.nValuesBeneath > 0)
+ out.print(n.nValuesBeneath);
+ if (n.isEndOfValue)
+ out.print("*");
+ out.print("\n");
+ } catch (UnsupportedEncodingException e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+
+ private CompleteParts completeParts = new CompleteParts();
+
+ private class CompleteParts {
+ byte[] data = new byte[4096];
+ int current = 0;
+
+ public void append(byte[] part) {
+ while (current + part.length > data.length)
+ expand();
+
+ System.arraycopy(part, 0, data, current, part.length);
+ current += part.length;
+ }
+
+ public void withdraw(int size) {
+ current -= size;
+ }
+
+ public byte[] retrieve() {
+ return Arrays.copyOf(data, current);
+ }
+
+ private void expand() {
+ byte[] temp = new byte[2 * data.length];
+ System.arraycopy(data, 0, temp, 0, data.length);
+ data = temp;
+ }
+ }
+
+ // there is a 255 limitation of length for each node's part.
+ // we interpolate nodes to satisfy this when a node's part becomes
+ // too long(overflow)
+ private void checkOverflowParts(Node node) {
+ LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children);
+ for (Node child : childrenCopy) {
+ if (child.part.length > 255) {
+ byte[] first255 = Arrays.copyOf(child.part, 255);
+
+ completeParts.append(node.part);
+ completeParts.append(first255);
+ byte[] visited = completeParts.retrieve();
+ this.addValue(visited);
+ completeParts.withdraw(255);
+ completeParts.withdraw(node.part.length);
+ }
+ }
+
+ completeParts.append(node.part); // by here the node.children may have been changed
+ for (Node child : node.children) {
+ checkOverflowParts(child);
+ }
+ completeParts.withdraw(node.part.length);
+ }
+
+ /**
+ * Flatten the trie into a byte array for a minimized memory footprint.
+ * Lookup remains fast. Cost is inflexibility to modify (becomes immutable).
+ *
+ * Flattened node structure is HEAD + NODEs, for each node:
+ * - o byte, offset to child node, o = stats.mbpn_sizeChildOffset
+ * - 1 bit, isLastChild flag, the 1st MSB of o
+ * - 1 bit, isEndOfValue flag, the 2nd MSB of o
+ * - c byte, number of values beneath, c = stats.mbpn_sizeNoValueBeneath
+ * - 1 byte, number of value bytes
+ * - n byte, value bytes
+ */
+ public TrieDictionary<T> build(int baseId) {
+ byte[] trieBytes = buildTrieBytes(baseId);
+ TrieDictionary<T> r = new TrieDictionary<T>(trieBytes);
+ return r;
+ }
+
+ protected byte[] buildTrieBytes(int baseId) {
+ checkOverflowParts(this.root);
+
+ Stats stats = stats();
+ int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath;
+ int sizeChildOffset = stats.mbpn_sizeChildOffset;
+
if (stats.mbpn_footprint <= 0) // must never happen, but let us be cautious
throw new IllegalStateException("Too big dictionary, dictionary cannot be bigger than 2GB");
- if (stats.mbpn_footprint > _2GB)
- throw new RuntimeException("Too big dictionary, dictionary cannot be bigger than 2GB");
-
- // write head
- byte[] head;
- try {
- ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
- DataOutputStream headOut = new DataOutputStream(byteBuf);
- headOut.write(TrieDictionary.MAGIC);
- headOut.writeShort(0); // head size, will back fill
- headOut.writeInt((int) stats.mbpn_footprint); // body size
- headOut.write(sizeChildOffset);
- headOut.write(sizeNoValuesBeneath);
- headOut.writeShort(baseId);
- headOut.writeShort(stats.maxValueLength);
- headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName());
- headOut.close();
- head = byteBuf.toByteArray();
- BytesUtil.writeUnsigned(head.length, head, TrieDictionary.MAGIC_SIZE_I, 2);
- } catch (IOException e) {
- throw new RuntimeException(e); // shall not happen, as we are writing in memory
- }
-
- byte[] trieBytes = new byte[(int) stats.mbpn_footprint + head.length];
- System.arraycopy(head, 0, trieBytes, 0, head.length);
-
- LinkedList<Node> open = new LinkedList<Node>();
- IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>();
-
- // write body
- int o = head.length;
- offsetMap.put(root, o);
- o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
- if (root.children.isEmpty() == false)
- open.addLast(root);
-
- while (open.isEmpty() == false) {
- Node parent = open.removeFirst();
- build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
- for (int i = 0; i < parent.children.size(); i++) {
- Node c = parent.children.get(i);
- boolean isLastChild = (i == parent.children.size() - 1);
- offsetMap.put(c, o);
- o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
- if (c.children.isEmpty() == false)
- open.addLast(c);
- }
- }
-
- if (o != trieBytes.length)
- throw new RuntimeException();
- return trieBytes;
- }
-
- private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
- int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
- BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
- trieBytes[parentOffset] |= flags;
- }
-
- private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) {
- int o = offset;
- if (o > _2GB)
- throw new IllegalStateException();
-
- // childOffset
- if (isLastChild)
- trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
- if (n.isEndOfValue)
- trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
- o += sizeChildOffset;
-
- // nValuesBeneath
- BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath);
- o += sizeNoValuesBeneath;
-
- // nValueBytes
- if (n.part.length > 255)
- throw new RuntimeException();
- BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
- o++;
-
- // valueBytes
- System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
- o += n.part.length;
-
- return o;
- }
-
-}
+ if (stats.mbpn_footprint > _2GB)
+ throw new RuntimeException("Too big dictionary, dictionary cannot be bigger than 2GB");
+
+ // write head
+ byte[] head;
+ try {
+ ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+ DataOutputStream headOut = new DataOutputStream(byteBuf);
+ headOut.write(TrieDictionary.MAGIC);
+ headOut.writeShort(0); // head size, will back fill
+ headOut.writeInt((int) stats.mbpn_footprint); // body size
+ headOut.write(sizeChildOffset);
+ headOut.write(sizeNoValuesBeneath);
+ headOut.writeShort(baseId);
+ headOut.writeShort(stats.maxValueLength);
+ headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName());
+ headOut.close();
+ head = byteBuf.toByteArray();
+ BytesUtil.writeUnsigned(head.length, head, TrieDictionary.MAGIC_SIZE_I, 2);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // shall not happen, as we are writing in memory
+ }
+
+ byte[] trieBytes = new byte[(int) stats.mbpn_footprint + head.length];
+ System.arraycopy(head, 0, trieBytes, 0, head.length);
+
+ LinkedList<Node> open = new LinkedList<Node>();
+ IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>();
+
+ // write body
+ int o = head.length;
+ offsetMap.put(root, o);
+ o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
+ if (root.children.isEmpty() == false)
+ open.addLast(root);
+
+ while (open.isEmpty() == false) {
+ Node parent = open.removeFirst();
+ build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes);
+ for (int i = 0; i < parent.children.size(); i++) {
+ Node c = parent.children.get(i);
+ boolean isLastChild = (i == parent.children.size() - 1);
+ offsetMap.put(c, o);
+ o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes);
+ if (c.children.isEmpty() == false)
+ open.addLast(c);
+ }
+ }
+
+ if (o != trieBytes.length)
+ throw new RuntimeException();
+ return trieBytes;
+ }
+
+ private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) {
+ int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE);
+ BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset);
+ trieBytes[parentOffset] |= flags;
+ }
+
+ private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) {
+ int o = offset;
+ if (o > _2GB)
+ throw new IllegalStateException();
+
+ // childOffset
+ if (isLastChild)
+ trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD;
+ if (n.isEndOfValue)
+ trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE;
+ o += sizeChildOffset;
+
+ // nValuesBeneath
+ BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath);
+ o += sizeNoValuesBeneath;
+
+ // nValueBytes
+ if (n.part.length > 255)
+ throw new RuntimeException();
+ BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
+ o++;
+
+ // valueBytes
+ System.arraycopy(n.part, 0, trieBytes, o, n.part.length);
+ o += n.part.length;
+
+ return o;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
index 56c1c98..4b96622 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java
@@ -1,112 +1,112 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-import java.util.Comparator;
-
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-
-/**
- * @author yangli9
- *
- */
-public class LookupStringTable extends LookupTable<String> {
-
- private static final Comparator<String> dateStrComparator = new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- long l1 = Long.parseLong(o1);
- long l2 = Long.parseLong(o2);
- return Long.compare(l1, l2);
- }
- };
-
- private static final Comparator<String> numStrComparator = new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- double d1 = Double.parseDouble(o1);
- double d2 = Double.parseDouble(o2);
- return Double.compare(d1, d2);
- }
- };
-
- private static final Comparator<String> defaultStrComparator = new Comparator<String>() {
- @Override
- public int compare(String o1, String o2) {
- return o1.compareTo(o2);
- }
- };
-
- boolean[] colIsDateTime;
- boolean[] colIsNumber;
-
- public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
- super(tableDesc, keyColumns, table);
- }
-
- @Override
- protected void init() throws IOException {
- ColumnDesc[] cols = tableDesc.getColumns();
- colIsDateTime = new boolean[cols.length];
- colIsNumber = new boolean[cols.length];
- for (int i = 0; i < cols.length; i++) {
- DataType t = cols[i].getType();
- colIsDateTime[i] = t.isDateTimeFamily();
- colIsNumber[i] = t.isNumberFamily();
- }
-
- super.init();
- }
-
- @Override
- protected String[] convertRow(String[] cols) {
- for (int i = 0; i < cols.length; i++) {
- if (colIsDateTime[i]) {
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+
+/**
+ * @author yangli9
+ *
+ */
+public class LookupStringTable extends LookupTable<String> {
+
+ private static final Comparator<String> dateStrComparator = new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ long l1 = Long.parseLong(o1);
+ long l2 = Long.parseLong(o2);
+ return Long.compare(l1, l2);
+ }
+ };
+
+ private static final Comparator<String> numStrComparator = new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ double d1 = Double.parseDouble(o1);
+ double d2 = Double.parseDouble(o2);
+ return Double.compare(d1, d2);
+ }
+ };
+
+ private static final Comparator<String> defaultStrComparator = new Comparator<String>() {
+ @Override
+ public int compare(String o1, String o2) {
+ return o1.compareTo(o2);
+ }
+ };
+
+ boolean[] colIsDateTime;
+ boolean[] colIsNumber;
+
+ public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+ super(tableDesc, keyColumns, table);
+ }
+
+ @Override
+ protected void init() throws IOException {
+ ColumnDesc[] cols = tableDesc.getColumns();
+ colIsDateTime = new boolean[cols.length];
+ colIsNumber = new boolean[cols.length];
+ for (int i = 0; i < cols.length; i++) {
+ DataType t = cols[i].getType();
+ colIsDateTime[i] = t.isDateTimeFamily();
+ colIsNumber[i] = t.isNumberFamily();
+ }
+
+ super.init();
+ }
+
+ @Override
+ protected String[] convertRow(String[] cols) {
+ for (int i = 0; i < cols.length; i++) {
+ if (colIsDateTime[i]) {
if (cols[i] != null)
cols[i] = String.valueOf(DateFormat.stringToMillis(cols[i]));
- }
- }
- return cols;
- }
-
- @Override
- protected Comparator<String> getComparator(int idx) {
- if (colIsDateTime[idx])
- return dateStrComparator;
- else if (colIsNumber[idx])
- return numStrComparator;
- else
- return defaultStrComparator;
- }
-
- @Override
- protected String toString(String cell) {
- return cell;
- }
-
- public Class<?> getType() {
- return String.class;
- }
-
-}
+ }
+ }
+ return cols;
+ }
+
+ @Override
+ protected Comparator<String> getComparator(int idx) {
+ if (colIsDateTime[idx])
+ return dateStrComparator;
+ else if (colIsNumber[idx])
+ return numStrComparator;
+ else
+ return defaultStrComparator;
+ }
+
+ @Override
+ protected String toString(String cell) {
+ return cell;
+ }
+
+ public Class<?> getType() {
+ return String.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
index 9aae755..cd700e9 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java
@@ -1,180 +1,180 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableReader;
-
-import com.google.common.collect.Sets;
-
-/**
- * An in-memory lookup table, in which each cell is an object of type T. The
- * table is indexed by specified PK for fast lookup.
- *
- * @author yangli9
- */
-abstract public class LookupTable<T> {
-
- protected TableDesc tableDesc;
- protected String[] keyColumns;
- protected ReadableTable table;
- protected ConcurrentHashMap<Array<T>, T[]> data;
-
- public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
- this.tableDesc = tableDesc;
- this.keyColumns = keyColumns;
- this.table = table;
- this.data = new ConcurrentHashMap<Array<T>, T[]>();
- init();
- }
-
- protected void init() throws IOException {
- int[] keyIndex = new int[keyColumns.length];
- for (int i = 0; i < keyColumns.length; i++) {
- keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex();
- }
-
- TableReader reader = table.getReader();
- try {
- while (reader.next()) {
- initRow(reader.getRow(), keyIndex);
- }
- } finally {
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableReader;
+
+import com.google.common.collect.Sets;
+
+/**
+ * An in-memory lookup table, in which each cell is an object of type T. The
+ * table is indexed by specified PK for fast lookup.
+ *
+ * @author yangli9
+ */
+abstract public class LookupTable<T> {
+
+ protected TableDesc tableDesc;
+ protected String[] keyColumns;
+ protected ReadableTable table;
+ protected ConcurrentHashMap<Array<T>, T[]> data;
+
+ public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException {
+ this.tableDesc = tableDesc;
+ this.keyColumns = keyColumns;
+ this.table = table;
+ this.data = new ConcurrentHashMap<Array<T>, T[]>();
+ init();
+ }
+
+ protected void init() throws IOException {
+ int[] keyIndex = new int[keyColumns.length];
+ for (int i = 0; i < keyColumns.length; i++) {
+ keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex();
+ }
+
+ TableReader reader = table.getReader();
+ try {
+ while (reader.next()) {
+ initRow(reader.getRow(), keyIndex);
+ }
+ } finally {
IOUtils.closeQuietly(reader);
- }
- }
-
- @SuppressWarnings("unchecked")
- private void initRow(String[] cols, int[] keyIndex) {
- T[] value = convertRow(cols);
- T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(getType(), keyIndex.length);
- for (int i = 0; i < keyCols.length; i++)
- keyCols[i] = value[keyIndex[i]];
-
- Array<T> key = new Array<T>(keyCols);
-
- if (data.containsKey(key))
- throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value));
-
- data.put(key, value);
- }
-
- abstract protected T[] convertRow(String[] cols);
-
- public T[] getRow(Array<T> key) {
- return data.get(key);
- }
-
- public Collection<T[]> getAllRows() {
- return data.values();
- }
-
- public List<T> scan(String col, List<T> values, String returnCol) {
- ArrayList<T> result = new ArrayList<T>();
- int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
- int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
- for (T[] row : data.values()) {
- if (values.contains(row[colIdx]))
- result.add(row[returnIdx]);
- }
- return result;
- }
-
- public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) {
- int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
- int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
- Comparator<T> colComp = getComparator(colIdx);
- Comparator<T> returnComp = getComparator(returnIdx);
-
- T returnBegin = null;
- T returnEnd = null;
- for (T[] row : data.values()) {
- if (between(beginValue, row[colIdx], endValue, colComp)) {
- T returnValue = row[returnIdx];
- if (returnBegin == null || returnComp.compare(returnValue, returnBegin) < 0) {
- returnBegin = returnValue;
- }
- if (returnEnd == null || returnComp.compare(returnValue, returnEnd) > 0) {
- returnEnd = returnValue;
- }
- }
- }
- if (returnBegin == null && returnEnd == null)
- return null;
- else
- return Pair.newPair(returnBegin, returnEnd);
- }
-
- public Set<T> mapValues(String col, Set<T> values, String returnCol) {
- int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
- int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
- Set<T> result = Sets.newHashSetWithExpectedSize(values.size());
- for (T[] row : data.values()) {
- if (values.contains(row[colIdx])) {
- result.add(row[returnIdx]);
- }
- }
- return result;
- }
-
- private boolean between(T beginValue, T v, T endValue, Comparator<T> comp) {
- return (beginValue == null || comp.compare(beginValue, v) <= 0) && (endValue == null || comp.compare(v, endValue) <= 0);
- }
-
- abstract protected Comparator<T> getComparator(int colIdx);
-
- public String toString() {
- return "LookupTable [path=" + table + "]";
- }
-
- protected String toString(T[] cols) {
- StringBuilder b = new StringBuilder();
- b.append("[");
- for (int i = 0; i < cols.length; i++) {
- if (i > 0)
- b.append(",");
- b.append(toString(cols[i]));
- }
- b.append("]");
- return b.toString();
- }
-
- abstract protected String toString(T cell);
-
- abstract public Class<?> getType();
-
- public void dump() {
- for (Array<T> key : data.keySet()) {
- System.out.println(toString(key.data) + " => " + toString(data.get(key)));
- }
- }
-
-}
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void initRow(String[] cols, int[] keyIndex) {
+ T[] value = convertRow(cols);
+ T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(getType(), keyIndex.length);
+ for (int i = 0; i < keyCols.length; i++)
+ keyCols[i] = value[keyIndex[i]];
+
+ Array<T> key = new Array<T>(keyCols);
+
+ if (data.containsKey(key))
+ throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value));
+
+ data.put(key, value);
+ }
+
+ abstract protected T[] convertRow(String[] cols);
+
+ public T[] getRow(Array<T> key) {
+ return data.get(key);
+ }
+
+ public Collection<T[]> getAllRows() {
+ return data.values();
+ }
+
+ public List<T> scan(String col, List<T> values, String returnCol) {
+ ArrayList<T> result = new ArrayList<T>();
+ int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+ int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+ for (T[] row : data.values()) {
+ if (values.contains(row[colIdx]))
+ result.add(row[returnIdx]);
+ }
+ return result;
+ }
+
+ public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) {
+ int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+ int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+ Comparator<T> colComp = getComparator(colIdx);
+ Comparator<T> returnComp = getComparator(returnIdx);
+
+ T returnBegin = null;
+ T returnEnd = null;
+ for (T[] row : data.values()) {
+ if (between(beginValue, row[colIdx], endValue, colComp)) {
+ T returnValue = row[returnIdx];
+ if (returnBegin == null || returnComp.compare(returnValue, returnBegin) < 0) {
+ returnBegin = returnValue;
+ }
+ if (returnEnd == null || returnComp.compare(returnValue, returnEnd) > 0) {
+ returnEnd = returnValue;
+ }
+ }
+ }
+ if (returnBegin == null && returnEnd == null)
+ return null;
+ else
+ return Pair.newPair(returnBegin, returnEnd);
+ }
+
+ public Set<T> mapValues(String col, Set<T> values, String returnCol) {
+ int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex();
+ int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex();
+ Set<T> result = Sets.newHashSetWithExpectedSize(values.size());
+ for (T[] row : data.values()) {
+ if (values.contains(row[colIdx])) {
+ result.add(row[returnIdx]);
+ }
+ }
+ return result;
+ }
+
+ private boolean between(T beginValue, T v, T endValue, Comparator<T> comp) {
+ return (beginValue == null || comp.compare(beginValue, v) <= 0) && (endValue == null || comp.compare(v, endValue) <= 0);
+ }
+
+ abstract protected Comparator<T> getComparator(int colIdx);
+
+ public String toString() {
+ return "LookupTable [path=" + table + "]";
+ }
+
+ protected String toString(T[] cols) {
+ StringBuilder b = new StringBuilder();
+ b.append("[");
+ for (int i = 0; i < cols.length; i++) {
+ if (i > 0)
+ b.append(",");
+ b.append(toString(cols[i]));
+ }
+ b.append("]");
+ return b.toString();
+ }
+
+ abstract protected String toString(T cell);
+
+ abstract public Class<?> getType();
+
+ public void dump() {
+ for (Array<T> key : data.keySet()) {
+ System.out.println(toString(key.data) + " => " + toString(data.get(key)));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
index 0d3848c..085158a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java
@@ -1,81 +1,81 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict.lookup;
-
-import java.io.IOException;
-import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentHashMap;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict.lookup;
+
+import java.io.IOException;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.ReadableTable;
-import org.apache.kylin.source.ReadableTable.TableSignature;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.ReadableTable.TableSignature;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
-/**
- * @author yangli9
- */
-public class SnapshotManager {
-
- private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
-
- // static cached instances
- private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
-
- public static SnapshotManager getInstance(KylinConfig config) {
- SnapshotManager r = SERVICE_CACHE.get(config);
- if (r == null) {
- synchronized (SnapshotManager.class) {
- r = SERVICE_CACHE.get(config);
- if (r == null) {
- r = new SnapshotManager(config);
- SERVICE_CACHE.put(config, r);
- if (SERVICE_CACHE.size() > 1) {
- logger.warn("More than one singleton exist");
- }
- }
- }
- }
- return r;
- }
-
- // ============================================================================
-
- private KylinConfig config;
+/**
+ * @author yangli9
+ */
+public class SnapshotManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class);
+
+ // static cached instances
+ private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>();
+
+ public static SnapshotManager getInstance(KylinConfig config) {
+ SnapshotManager r = SERVICE_CACHE.get(config);
+ if (r == null) {
+ synchronized (SnapshotManager.class) {
+ r = SERVICE_CACHE.get(config);
+ if (r == null) {
+ r = new SnapshotManager(config);
+ SERVICE_CACHE.put(config, r);
+ if (SERVICE_CACHE.size() > 1) {
+ logger.warn("More than one singleton exist");
+ }
+ }
+ }
+ }
+ return r;
+ }
+
+ // ============================================================================
+
+ private KylinConfig config;
private LoadingCache<String, SnapshotTable> snapshotCache; // resource
-
- // path ==>
- // SnapshotTable
-
- private SnapshotManager(KylinConfig config) {
- this.config = config;
+
+ // path ==>
+ // SnapshotTable
+
+ private SnapshotManager(KylinConfig config) {
+ this.config = config;
this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, SnapshotTable>() {
@Override
public void onRemoval(RemovalNotification<String, SnapshotTable> notification) {
@@ -89,13 +89,13 @@ public class SnapshotManager {
return snapshotTable;
}
});
- }
-
- public void wipeoutCache() {
+ }
+
+ public void wipeoutCache() {
snapshotCache.invalidateAll();
- }
-
- public SnapshotTable getSnapshotTable(String resourcePath) throws IOException {
+ }
+
+ public SnapshotTable getSnapshotTable(String resourcePath) throws IOException {
try {
SnapshotTable r = snapshotCache.get(resourcePath);
if (r == null) {
@@ -105,114 +105,114 @@ public class SnapshotManager {
return r;
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
- }
- }
-
- public void removeSnapshot(String resourcePath) throws IOException {
- ResourceStore store = MetadataManager.getInstance(this.config).getStore();
- store.deleteResource(resourcePath);
+ }
+ }
+
+ public void removeSnapshot(String resourcePath) throws IOException {
+ ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+ store.deleteResource(resourcePath);
snapshotCache.invalidate(resourcePath);
- }
-
- public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
- SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
- snapshot.updateRandomUuid();
-
- String dup = checkDupByInfo(snapshot);
- if (dup != null) {
- logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup);
- return getSnapshotTable(dup);
- }
-
- if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) {
- throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() //
- + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
- }
-
- snapshot.takeSnapshot(table, tableDesc);
-
- return trySaveNewSnapshot(snapshot);
- }
-
- public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException {
- SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
- snapshot.setUuid(overwriteUUID);
-
- snapshot.takeSnapshot(table, tableDesc);
-
- SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath());
- snapshot.setLastModified(existing.getLastModified());
-
- save(snapshot);
- snapshotCache.put(snapshot.getResourcePath(), snapshot);
-
- return snapshot;
- }
-
- public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException {
-
- String dupTable = checkDupByContent(snapshotTable);
- if (dupTable != null) {
- logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable);
- return getSnapshotTable(dupTable);
- }
-
- save(snapshotTable);
- snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
-
- return snapshotTable;
- }
-
- private String checkDupByInfo(SnapshotTable snapshot) throws IOException {
- ResourceStore store = MetadataManager.getInstance(this.config).getStore();
- String resourceDir = snapshot.getResourceDir();
- NavigableSet<String> existings = store.listResources(resourceDir);
- if (existings == null)
- return null;
-
- TableSignature sig = snapshot.getSignature();
- for (String existing : existings) {
- SnapshotTable existingTable = load(existing, false); // skip cache,
- // direct load from store
- if (existingTable != null && sig.equals(existingTable.getSignature()))
- return existing;
- }
-
- return null;
- }
-
- private String checkDupByContent(SnapshotTable snapshot) throws IOException {
- ResourceStore store = MetadataManager.getInstance(this.config).getStore();
- String resourceDir = snapshot.getResourceDir();
- NavigableSet<String> existings = store.listResources(resourceDir);
- if (existings == null)
- return null;
-
- for (String existing : existings) {
- SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store
- if (existingTable != null && existingTable.equals(snapshot))
- return existing;
- }
-
- return null;
- }
-
- private void save(SnapshotTable snapshot) throws IOException {
- ResourceStore store = MetadataManager.getInstance(this.config).getStore();
- String path = snapshot.getResourcePath();
- store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER);
- }
-
- private SnapshotTable load(String resourcePath, boolean loadData) throws IOException {
- logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData);
- ResourceStore store = MetadataManager.getInstance(this.config).getStore();
-
- SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER);
-
- if (loadData)
- logger.debug("Loaded snapshot at " + resourcePath);
-
- return table;
- }
-
-}
+ }
+
+ public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException {
+ SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
+ snapshot.updateRandomUuid();
+
+ String dup = checkDupByInfo(snapshot);
+ if (dup != null) {
+ logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup);
+ return getSnapshotTable(dup);
+ }
+
+ if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) {
+ throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() //
+ + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize());
+ }
+
+ snapshot.takeSnapshot(table, tableDesc);
+
+ return trySaveNewSnapshot(snapshot);
+ }
+
+ public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException {
+ SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity());
+ snapshot.setUuid(overwriteUUID);
+
+ snapshot.takeSnapshot(table, tableDesc);
+
+ SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath());
+ snapshot.setLastModified(existing.getLastModified());
+
+ save(snapshot);
+ snapshotCache.put(snapshot.getResourcePath(), snapshot);
+
+ return snapshot;
+ }
+
+ public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException {
+
+ String dupTable = checkDupByContent(snapshotTable);
+ if (dupTable != null) {
+ logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable);
+ return getSnapshotTable(dupTable);
+ }
+
+ save(snapshotTable);
+ snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable);
+
+ return snapshotTable;
+ }
+
+ private String checkDupByInfo(SnapshotTable snapshot) throws IOException {
+ ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+ String resourceDir = snapshot.getResourceDir();
+ NavigableSet<String> existings = store.listResources(resourceDir);
+ if (existings == null)
+ return null;
+
+ TableSignature sig = snapshot.getSignature();
+ for (String existing : existings) {
+ SnapshotTable existingTable = load(existing, false); // skip cache,
+ // direct load from store
+ if (existingTable != null && sig.equals(existingTable.getSignature()))
+ return existing;
+ }
+
+ return null;
+ }
+
+ private String checkDupByContent(SnapshotTable snapshot) throws IOException {
+ ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+ String resourceDir = snapshot.getResourceDir();
+ NavigableSet<String> existings = store.listResources(resourceDir);
+ if (existings == null)
+ return null;
+
+ for (String existing : existings) {
+ SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store
+ if (existingTable != null && existingTable.equals(snapshot))
+ return existing;
+ }
+
+ return null;
+ }
+
+ private void save(SnapshotTable snapshot) throws IOException {
+ ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+ String path = snapshot.getResourcePath();
+ store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER);
+ }
+
+ private SnapshotTable load(String resourcePath, boolean loadData) throws IOException {
+ logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData);
+ ResourceStore store = MetadataManager.getInstance(this.config).getStore();
+
+ SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER);
+
+ if (loadData)
+ logger.debug("Loaded snapshot at " + resourcePath);
+
+ return table;
+ }
+
+}