You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/02 13:19:10 UTC
[17/27] kylin git commit: KYLIN-2192 More Robust Global Dictionary
KYLIN-2192 More Robust Global Dictionary
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4a0ee798
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4a0ee798
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4a0ee798
Branch: refs/heads/KYLIN-1875
Commit: 4a0ee7989d5f8272592b980fce3f5716ca40d4c1
Parents: e562aaf
Author: sunyerui <su...@gmail.com>
Authored: Mon Nov 21 21:26:34 2016 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Fri Dec 2 13:33:59 2016 +0800
----------------------------------------------------------------------
.../kylin/job/dataGen/FactTableGenerator.java | 12 +-
.../apache/kylin/common/KylinConfigBase.java | 8 +
.../apache/kylin/common/util/Dictionary.java | 2 +-
.../model/validation/rule/DictionaryRule.java | 78 +-
.../validation/rule/DictionaryRuleTest.java | 28 +-
.../apache/kylin/dict/AppendTrieDictionary.java | 285 +++++--
.../kylin/dict/AppendTrieDictionaryChecker.java | 102 +++
.../org/apache/kylin/dict/CachedTreeMap.java | 260 +++---
.../kylin/dict/GlobalDictionaryBuilder.java | 36 +-
.../kylin/dict/AppendTrieDictionaryTest.java | 150 +++-
.../apache/kylin/dict/CachedTreeMapTest.java | 320 +++++---
.../kylin/measure/bitmap/BitmapCounterTest.java | 6 +-
...t_kylin_cube_without_slr_left_join_desc.json | 16 +-
.../localmeta/data/DEFAULT.TEST_KYLIN_FACT.csv | 804 +++++++++----------
.../flatten_data_for_without_slr_left_join.csv | 804 +++++++++----------
.../test_kylin_inner_join_model_desc.json | 3 +-
.../test_kylin_inner_join_view_model_desc.json | 3 +-
.../test_kylin_left_join_model_desc.json | 3 +-
.../test_kylin_left_join_view_model_desc.json | 3 +-
.../table/DEFAULT.TEST_KYLIN_FACT.json | 8 +-
.../source/hive/ITHiveTableReaderTest.java | 2 +-
.../query/sql_distinct_precisely/query00.sql | 2 +-
.../query/sql_distinct_precisely/query01.sql | 2 +-
.../query/sql_distinct_precisely/query02.sql | 2 +-
.../query/sql_distinct_precisely/query03.sql | 3 +-
.../query/sql_distinct_precisely/query04.sql | 3 +-
.../query/sql_distinct_precisely/query05.sql | 25 -
.../query/sql_distinct_precisely/query06.sql | 26 -
.../query/sql_distinct_precisely/query07.sql | 24 -
29 files changed, 1737 insertions(+), 1283 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 8068fd1..677b713 100644
--- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -403,13 +403,13 @@ public class FactTableGenerator {
}
private String createRandomCell(ColumnDesc cDesc) {
- String type = cDesc.getTypeName();
- String s = type.toLowerCase();
- if (s.equals("string") || s.equals("char") || s.equals("varchar")) {
+ DataType type =cDesc.getType();
+ String s = type.getName();
+ if (s.equals("char") || s.equals("varchar")) {
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < 2; i++) {
- sb.append((char) ('a' + r.nextInt(10)));// there are 10*10
- // possible strings
+ int len = Math.min(type.getPrecision(), 3);
+ for (int i = 0; i < len; i++) {
+ sb.append((char) ('a' + r.nextInt(10))); // cardinality at most 10x10x10
}
return sb.toString();
} else if (s.equals("bigint") || s.equals("int") || s.equals("tinyint") || s.equals("smallint")) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3c10826..f46c185 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -232,6 +232,14 @@ abstract public class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.dictionary.append-entry-size", "10000000"));
}
+ public int getAppendDictMaxVersions() {
+ return Integer.parseInt(getOptional("kylin.dictionary.append-max-versions", "3"));
+ }
+
+ public int getAppendDictVersionTTL() {
+ return Integer.parseInt(getOptional("kylin.dictionary.append-version-ttl", "259200000"));
+ }
+
// for test
public void setAppendDictEntrySize(int entrySize) {
setProperty("kylin.dictionary.append-entry-size", String.valueOf(entrySize));
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
index 0fb299c..1e172bc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
@@ -158,7 +158,7 @@ abstract public class Dictionary<T> implements Serializable {
return nullId();
else {
int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
- if (id < 0)
+ if (id == -1)
throw new IllegalArgumentException("Value '" + Bytes.toString(value, offset, len) + "' (" + Bytes.toStringBinary(value, offset, len) + ") not exists!");
return id;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
index d06c816..37889c2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/DictionaryRule.java
@@ -18,9 +18,12 @@
package org.apache.kylin.cube.model.validation.rule;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.DictionaryDesc;
import org.apache.kylin.cube.model.validation.IValidatorRule;
@@ -29,9 +32,19 @@ import org.apache.kylin.cube.model.validation.ValidateContext;
import org.apache.kylin.metadata.model.TblColRef;
/**
- * Created by sunyerui on 16/6/1.
+ * Validate Dictionary Settings:
+ *
+ * <ul>
+ * <li> no duplicated dictionary for one column
+ * <li> dictionary can't set both `reuse` and `builder`
+ * <li> transitive `reuse` like "a <- b <- c" is not allowed, force "a <- b, a <- c"
+ * </ul>
*/
public class DictionaryRule implements IValidatorRule<CubeDesc> {
+ static final String ERROR_DUPLICATE_DICTIONARY_COLUMN = "Duplicated dictionary specification for column: ";
+ static final String ERROR_REUSE_BUILDER_BOTH_SET = "REUSE and BUILDER both set on dictionary for column: ";
+ static final String ERROR_REUSE_BUILDER_BOTH_EMPTY = "REUSE and BUILDER both empty on dictionary for column: ";
+ static final String ERROR_TRANSITIVE_REUSE = "Transitive REUSE is not allowed for dictionary: ";
@Override
public void validate(CubeDesc cubeDesc, ValidateContext context) {
@@ -40,40 +53,43 @@ public class DictionaryRule implements IValidatorRule<CubeDesc> {
return;
}
- HashMap<TblColRef, String> colToBuilderMap = new HashMap<>();
- HashMap<TblColRef, TblColRef> colToReuseColMap = new HashMap<>();
+ Set<TblColRef> allDictCols = new HashSet<>();
+ Set<TblColRef> baseCols = new HashSet<>(); // col with builder
+ List<DictionaryDesc> reuseDictionaries = new ArrayList<>();
+
+ // first pass
for (DictionaryDesc dictDesc : dictDescs) {
TblColRef dictCol = dictDesc.getColumnRef();
- if (dictCol == null) {
- context.addResult(ResultLevel.ERROR, "Some column in dictionaries not found");
+ TblColRef reuseCol = dictDesc.getResuseColumnRef();
+ String builderClass = dictDesc.getBuilderClass();
+
+ if (!allDictCols.add(dictCol)) {
+ context.addResult(ResultLevel.ERROR, ERROR_DUPLICATE_DICTIONARY_COLUMN + dictCol);
return;
}
- String builder = dictDesc.getBuilderClass();
- TblColRef reuseCol = dictDesc.getResuseColumnRef();
- if (reuseCol == null) {
- if (builder == null || builder.isEmpty()) {
- context.addResult(ResultLevel.ERROR, "Column " + dictCol + " cannot have builder and reuse column both empty");
- return;
- }
-
- // Make sure the same column associate with same builder class
- String oldBuilder = colToBuilderMap.put(dictCol, builder);
- if (oldBuilder != null && !oldBuilder.equals(builder)) {
- context.addResult(ResultLevel.ERROR, "Column " + dictCol + " has inconsistent builders " + builder + " and " + oldBuilder);
- return;
- }
+
+ if (reuseCol != null && StringUtils.isNotEmpty(builderClass)) {
+ context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_SET + dictCol);
+ return;
+ }
+
+ if (reuseCol == null && StringUtils.isEmpty(builderClass)) {
+ context.addResult(ResultLevel.ERROR, ERROR_REUSE_BUILDER_BOTH_EMPTY + dictCol);
+ return;
+ }
+
+ if (reuseCol != null) {
+ reuseDictionaries.add(dictDesc);
} else {
- if (builder != null && !builder.isEmpty()) {
- context.addResult(ResultLevel.ERROR, "Column " + dictCol + " cannot have builder and reuse column both");
- return;
- }
-
- // Make sure one column only reuse another one column
- TblColRef oldReuseCol = colToReuseColMap.put(dictCol, reuseCol);
- if (oldReuseCol != null && !reuseCol.equals(oldReuseCol)) {
- context.addResult(ResultLevel.ERROR, "Column " + dictCol + " reuse inconsistent column " + reuseCol + " and " + oldReuseCol);
- return;
- }
+ baseCols.add(dictCol);
+ }
+ }
+
+ // second pass: check no transitive reuse
+ for (DictionaryDesc dictDesc : reuseDictionaries) {
+ if (!baseCols.contains(dictDesc.getResuseColumnRef())) {
+ context.addResult(ResultLevel.ERROR, ERROR_TRANSITIVE_REUSE + dictDesc.getColumnRef());
+ return;
}
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
index 9b37507..b6e0bcb 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/DictionaryRuleTest.java
@@ -18,6 +18,10 @@
package org.apache.kylin.cube.model.validation.rule;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_DUPLICATE_DICTIONARY_COLUMN;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_EMPTY;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_REUSE_BUILDER_BOTH_SET;
+import static org.apache.kylin.cube.model.validation.rule.DictionaryRule.ERROR_TRANSITIVE_REUSE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -36,9 +40,6 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-/**
- * Created by sunyerui on 16/6/1.
- */
public class DictionaryRuleTest extends LocalFileMetadataTestCase {
private static KylinConfig config;
@@ -65,24 +66,31 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
desc.init(config);
ValidateContext vContext = new ValidateContext();
rule.validate(desc, vContext);
- vContext.print(System.out);
assertTrue(vContext.getResults().length == 0);
}
}
@Test
public void testBadDesc() throws IOException {
- testDictionaryDesc("Column EDW.TEST_SITES.SITE_NAME has inconsistent builders " + "FakeBuilderClass and org.apache.kylin.dict.GlobalDictionaryBuilder", DictionaryDesc.create("SITE_NAME", null, "FakeBuilderClass"));
+ testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("USER_ID", null, "FakeBuilderClass"));
+ testDictionaryDesc(ERROR_DUPLICATE_DICTIONARY_COLUMN, DictionaryDesc.create("USER_ID", null, GlobalDictionaryBuilder.class.getName()));
}
@Test
public void testBadDesc2() throws IOException {
- testDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.LSTG_SITE_ID cannot have builder and reuse column both", DictionaryDesc.create("lstg_site_id", "SITE_NAME", "FakeBuilderClass"));
+ testDictionaryDesc(ERROR_REUSE_BUILDER_BOTH_SET, DictionaryDesc.create("lstg_site_id", "SITE_NAME", "FakeBuilderClass"));
}
@Test
public void testBadDesc3() throws IOException {
- testDictionaryDesc("Column DEFAULT.TEST_KYLIN_FACT.LSTG_SITE_ID cannot have builder and reuse column both empty", DictionaryDesc.create("lstg_site_id", null, null));
+ testDictionaryDesc(ERROR_REUSE_BUILDER_BOTH_EMPTY, DictionaryDesc.create("lstg_site_id", null, null));
+ }
+
+ @Test
+ public void testBadDesc4() throws IOException {
+ testDictionaryDesc(ERROR_TRANSITIVE_REUSE,
+ DictionaryDesc.create("lstg_site_id", "USER_ID", null),
+ DictionaryDesc.create("price", "lstg_site_id", null));
}
@Test
@@ -102,13 +110,13 @@ public class DictionaryRuleTest extends LocalFileMetadataTestCase {
desc.init(config);
ValidateContext vContext = new ValidateContext();
rule.validate(desc, vContext);
- vContext.print(System.out);
if (expectMessage == null) {
assertTrue(vContext.getResults().length == 0);
} else {
- assertTrue(vContext.getResults().length >= 1);
- assertEquals(expectMessage, vContext.getResults()[0].getMessage());
+ assertTrue(vContext.getResults().length == 1);
+ String actualMessage = vContext.getResults()[0].getMessage();
+ assertTrue(actualMessage.startsWith(expectMessage));
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
index 14980bf..84060a7 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionary.java
@@ -31,10 +31,13 @@ import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.NavigableSet;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -49,6 +52,8 @@ import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metadata.MetadataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,16 +62,16 @@ import org.slf4j.LoggerFactory;
* int IDs, used for global dictionary.
*
* Trie data is split into sub trees, called {@link DictSlice}, and stored in a {@link CachedTreeMap} with a configurable cache size.
- *
+ *
* With Trie the memory footprint of the mapping is kinda minimized at the cost
* CPU, if compared to HashMap of ID Arrays. Performance test shows Trie is
* roughly 10 times slower, so there's a cache layer overlays on top of Trie and
* gracefully fall back to Trie using a weak reference.
- *
+ *
* The implementation is NOT thread-safe for now.
*
* TODO making it thread-safe
- *
+ *
* @author sunyerui
*/
@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
@@ -87,7 +92,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
transient private int nValues;
transient private BytesConverter<T> bytesConverter;
- private TreeMap<DictSliceKey, DictSlice> dictSliceMap;
+ volatile private TreeMap<DictSliceKey, DictSlice> dictSliceMap;
transient private boolean enableValueCache = true;
transient private SoftReference<HashMap> valueToIdCache;
@@ -99,17 +104,23 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
}
}
- public void update(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter, CachedTreeMap dictMap) throws IOException {
+ public void initParams(String baseDir, int baseId, int maxId, int maxValueLength, int nValues, BytesConverter bytesConverter) throws IOException {
this.baseDir = baseDir;
this.baseId = baseId;
this.maxId = maxId;
this.maxValueLength = maxValueLength;
this.nValues = nValues;
this.bytesConverter = bytesConverter;
+ }
+ public void initDictSliceMap(CachedTreeMap dictMap) throws IOException {
int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
- dictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
- ((CachedTreeMap)dictSliceMap).loadEntry(dictMap);
+ int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
+ long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
+ CachedTreeMap newDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir)
+ .immutable(true).maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+ newDictSliceMap.loadEntry(dictMap);
+ this.dictSliceMap = newDictSliceMap;
}
public byte[] writeDictMap() throws IOException {
@@ -123,6 +134,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
return dictMapBytes;
}
+ // The dict id starts from 1 to 2147483647 and 2147483648 to -2, leave 0 and -1 used for uninitialized state
+ public static void checkValidId(int id) {
+ if (id == 0 || id == -1) {
+ throw new IllegalArgumentException("AppendTrieDictionary Id Overflow Unsigned Integer Size 4294967294");
+ }
+ }
+
public static class DictSliceKey implements WritableComparable {
byte[] key;
@@ -181,7 +199,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
transient private int nValues;
transient private int sizeOfId;
- transient private int childOffsetMask;
+ // mask MUST be long, since childOffset maybe 5 bytes at most
+ transient private long childOffsetMask;
transient private int firstByteOffset;
private void init(byte[] trieBytes) {
@@ -197,7 +216,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
this.sizeChildOffset = headIn.read();
this.sizeOfId = headIn.read();
- this.childOffsetMask = ~((BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE) << ((sizeChildOffset - 1) * 8));
+ this.childOffsetMask = ~(((long)(BIT_IS_LAST_CHILD | BIT_IS_END_OF_VALUE)) << ((sizeChildOffset - 1) * 8));
this.firstByteOffset = sizeChildOffset + 1; // the offset from begin of node to its first value byte
} catch (Exception e) {
if (e instanceof RuntimeException)
@@ -216,7 +235,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
if (checkFlag(nodeOffset, BIT_IS_END_OF_VALUE)) {
break;
}
- nodeOffset = headSize + (BytesUtil.readUnsigned(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
+ nodeOffset = headSize + (int)(BytesUtil.readLong(trieBytes, nodeOffset, sizeChildOffset) & childOffsetMask);
if (nodeOffset == headSize) {
break;
}
@@ -258,7 +277,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
}
// find a child to continue
- int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask);
+ int c = headSize + (int)(BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
if (c == headSize) // has no children
return -1;
byte inpByte = inp[o];
@@ -297,7 +316,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
DictNode root = null;
while (true) {
int p = n + firstByteOffset;
- int childOffset = BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask;
+ int childOffset = (int)(BytesUtil.readLong(trieBytes, n, sizeChildOffset) & childOffsetMask);
int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
@@ -329,6 +348,53 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
return root;
}
+ public boolean doCheck() {
+ int offset = headSize;
+ HashSet<Integer> parentSet = new HashSet<>();
+ boolean lastChild = false;
+
+ while (offset < trieBytes.length) {
+ if (lastChild) {
+ boolean contained = parentSet.remove(offset - headSize);
+ // Can't find parent, the data is corrupted
+ if (!contained) {
+ return false;
+ }
+ lastChild = false;
+ }
+ int p = offset + firstByteOffset;
+ int childOffset = (int)(BytesUtil.readLong(trieBytes, offset, sizeChildOffset) & childOffsetMask);
+ int parLen = BytesUtil.readUnsigned(trieBytes, p - 1, 1);
+ boolean isEndOfValue = checkFlag(offset, BIT_IS_END_OF_VALUE);
+
+ // Copy value overflow, the data is corrupted
+ if (trieBytes.length < p + parLen) {
+ return false;
+ }
+
+ // Check id is fine
+ if (isEndOfValue) {
+ BytesUtil.readUnsigned(trieBytes, p + parLen, sizeOfId);
+ }
+
+ // Record it if has children
+ if (childOffset != 0) {
+ parentSet.add(childOffset);
+ }
+
+ // brothers done, move to next parent
+ if (checkFlag(offset, BIT_IS_LAST_CHILD)) {
+ lastChild = true;
+ }
+
+ // move to next node
+ offset += firstByteOffset + parLen + (isEndOfValue ? sizeOfId : 0);
+ }
+
+ // ParentMap is empty, meaning all nodes has parent, the data is correct
+ return parentSet.isEmpty();
+ }
+
public void write(DataOutput out) throws IOException {
out.write(trieBytes);
}
@@ -341,7 +407,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
throw new IllegalArgumentException("Wrong file type (magic does not match)");
DataInputStream headIn = new DataInputStream(//
- new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
+ new ByteArrayInputStream(headPartial, HEAD_SIZE_I, headPartial.length - HEAD_SIZE_I));
int headSize = headIn.readShort();
int bodyLen = headIn.readInt();
headIn.close();
@@ -398,6 +464,9 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
this.id = o.id;
this.isEndOfValue = o.isEndOfValue;
this.children = o.children;
+ for (DictNode child : o.children) {
+ child.parent = this;
+ }
this.nValuesBeneath = o.nValuesBeneath;
this.parent = o.parent;
this.childrenCount = o.childrenCount;
@@ -602,7 +671,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
// nValueBytes
if (n.part.length > 255)
- throw new RuntimeException();
+ throw new RuntimeException("Value length is " + n.part.length
+ + " and larger than 255: " + Bytes.toStringBinary(n.part));
BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1);
o++;
@@ -611,7 +681,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
o += n.part.length;
if (n.isEndOfValue) {
- assert n.id > 0;
+ checkValidId(n.id);
BytesUtil.writeUnsigned(n.id, trieBytes, o, sizeId);
o += sizeId;
}
@@ -715,12 +785,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
s.mbpn_sizeId = 4;
s.mbpn_sizeValueTotal = s.nValueBytesCompressed + s.nValues * s.mbpn_sizeId;
s.mbpn_sizeNoValueBytes = 1;
- s.mbpn_sizeChildOffset = 4;
+ s.mbpn_sizeChildOffset = 5;
s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset);
while (true) { // minimize the offset size to match the footprint
int t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (s.mbpn_sizeNoValueBytes + s.mbpn_sizeChildOffset - 1);
// *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag
- if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) {
+ // expand t to long before *4, avoiding exceed Integer.MAX_VALUE
+ if (BytesUtil.sizeForValue((long)t * 4) <= s.mbpn_sizeChildOffset - 1) {
s.mbpn_sizeChildOffset--;
s.mbpn_footprint = t;
} else
@@ -760,31 +831,97 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
}
public static class Builder<T> {
- private String baseDir;
+ private static ConcurrentHashMap<String, Pair<Integer, Builder>> builderInstanceAndCountMap = new ConcurrentHashMap();
+
+ public static Builder getInstance(String resourcePath) throws IOException {
+ return getInstance(resourcePath, null);
+ }
+
+ public synchronized static Builder getInstance(String resourcePath, AppendTrieDictionary dict) throws IOException {
+ Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath);
+ if (entry == null) {
+ entry = new Pair<>(0, createNewBuilder(resourcePath, dict));
+ builderInstanceAndCountMap.put(resourcePath, entry);
+ }
+ entry.setFirst(entry.getFirst() + 1);
+ return entry.getSecond();
+ }
+
+ // return true if entry still in map
+ private synchronized static boolean releaseInstance(String resourcePath) {
+ Pair<Integer, Builder> entry = builderInstanceAndCountMap.get(resourcePath);
+ if (entry != null) {
+ entry.setFirst(entry.getFirst() - 1);
+ if (entry.getFirst() <= 0) {
+ builderInstanceAndCountMap.remove(resourcePath);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ public static Builder createNewBuilder(String resourcePath, AppendTrieDictionary existDict) throws IOException {
+ String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + resourcePath + "/";
+
+ AppendTrieDictionary dictToUse = existDict;
+ if (dictToUse == null) {
+ // Try to load the existing dict from cache, making sure there's only the same one object in memory
+ NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(resourcePath);
+ ArrayList<String> appendDicts = new ArrayList<>();
+ if (dicts != null && !dicts.isEmpty()) {
+ for (String dict : dicts) {
+ DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
+ if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
+ appendDicts.add(dict);
+ }
+ }
+ }
+ if (appendDicts.isEmpty()) {
+ dictToUse = null;
+ } else if (appendDicts.size() == 1) {
+ dictToUse = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0));
+ } else {
+ throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", resourcePath, appendDicts.size()));
+ }
+ }
+
+ AppendTrieDictionary.Builder<String> builder;
+ if (dictToUse == null) {
+ logger.info("GlobalDict {} is empty, create new one", resourcePath);
+ builder = new Builder<>(resourcePath, null, dictDir, 0, 0, 0, new StringBytesConverter(), null);
+ } else {
+ logger.info("GlobalDict {} exist, append value", dictToUse);
+ builder = new Builder<>(resourcePath, dictToUse, dictToUse.baseDir, dictToUse.maxId, dictToUse.maxValueLength,
+ dictToUse.nValues, dictToUse.bytesConverter, dictToUse.writeDictMap());
+ }
+
+ return builder;
+ }
+
+ private final String resourcePath;
+ private final String baseDir;
private int maxId;
private int maxValueLength;
private int nValues;
- private BytesConverter<T> bytesConverter;
+ private final BytesConverter<T> bytesConverter;
- private AppendTrieDictionary dict;
+ private final AppendTrieDictionary dict;
- private TreeMap<DictSliceKey, DictNode> mutableDictSliceMap;
- private static int MAX_ENTRY_IN_SLICE = 10_000_000;
+ private final TreeMap<DictSliceKey, DictNode> mutableDictSliceMap;
+ private int MAX_ENTRY_IN_SLICE = 10_000_000;
private static final double MAX_ENTRY_OVERHEAD_FACTOR = 1.0;
private int processedCount = 0;
- public static Builder create(String baseDir) throws IOException {
- return new Builder<>(null, baseDir, 0, 0, 0, new StringBytesConverter(), null);
- }
-
- public static Builder create(AppendTrieDictionary dict) throws IOException {
- return new Builder<>(dict, dict.baseDir, dict.maxId, dict.maxValueLength, dict.nValues, dict.bytesConverter, dict.writeDictMap());
- }
-
// Constructor for a new Dict
- private Builder(AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException {
- this.dict = dict;
+ private Builder(String resourcePath, AppendTrieDictionary dict, String baseDir, int maxId, int maxValueLength, int nValues, BytesConverter<T> bytesConverter, byte[] dictMapBytes) throws IOException {
+ this.resourcePath = resourcePath;
+ if (dict == null) {
+ this.dict = new AppendTrieDictionary<T>();
+ } else {
+ this.dict = dict;
+ }
this.baseDir = baseDir;
this.maxId = maxId;
this.maxValueLength = maxValueLength;
@@ -793,8 +930,11 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
MAX_ENTRY_IN_SLICE = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
int cacheSize = KylinConfig.getInstanceFromEnv().getAppendDictCacheSize();
+ int maxVersions = KylinConfig.getInstanceFromEnv().getAppendDictMaxVersions();
+ long versionTTL = KylinConfig.getInstanceFromEnv().getAppendDictVersionTTL();
// create a new cached map with baseDir
- mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).persistent(true).immutable(false).build();
+ mutableDictSliceMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder().maxSize(cacheSize).baseDir(baseDir)
+ .maxVersions(maxVersions).versionTTL(versionTTL).keyClazz(DictSliceKey.class).valueClazz(DictNode.class).immutable(false).build();
if (dictMapBytes != null) {
((Writable) mutableDictSliceMap).readFields(new DataInputStream(new ByteArrayInputStream(dictMapBytes)));
}
@@ -804,7 +944,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
addValue(bytesConverter.convertToBytes(value));
}
- public void addValue(byte[] value) {
+ private synchronized void addValue(byte[] value) {
if (++processedCount % 1_000_000 == 0) {
logger.debug("add value count " + processedCount);
}
@@ -859,15 +999,41 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
private int createNextId() {
int id = ++maxId;
- if (maxId < 0) {
- throw new IllegalArgumentException("AppendTrieDictionary Id overflow Integer.MAX_VALUE");
- }
+ checkValidId(id);
nValues++;
return id;
}
+ // Only used for test
+ public void setMaxId(int id) {
+ this.maxId = id;
+ }
+
+ // When add a new node, the value part maybe over 255 bytes, need split it into a sub tree
+ private DictNode addNodeMaybeOverflow(byte[] value, int start, int end) {
+ DictNode head = null;
+ DictNode current = null;
+ for (; start + 255 < end; start += 255) {
+ DictNode c = new DictNode(BytesUtil.subarray(value, start, start + 255), false);
+ if (head == null) {
+ head = c;
+ current = c;
+ } else {
+ current.addChild(c);
+ current = c;
+ }
+ }
+ DictNode last = new DictNode(BytesUtil.subarray(value, start, end), true);
+ last.id = createNextId();
+ if (head == null) {
+ head = last;
+ } else {
+ current.addChild(last);
+ }
+ return head;
+ }
+
private void addValueR(DictNode node, byte[] value, int start) {
- assert value.length - start <= 255 : "value bytes overflow than 255";
// match the value part of current node
int i = 0, j = start;
int n = node.part.length, nn = value.length;
@@ -903,8 +1069,7 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
if (i < n) {
DictNode c1 = new DictNode(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children);
c1.id = node.id;
- DictNode c2 = new DictNode(BytesUtil.subarray(value, j, nn), true);
- c2.id = createNextId();
+ DictNode c2 = addNodeMaybeOverflow(value, j, nn);
node.reset(BytesUtil.subarray(node.part, 0, i), false);
if (comp < 0) {
node.addChild(c1);
@@ -940,18 +1105,17 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
addValueR(node.children.get(mid), value, j);
} else {
// otherwise, make the value a new child
- DictNode c = new DictNode(BytesUtil.subarray(value, j, nn), true);
- c.id = createNextId();
+ DictNode c = addNodeMaybeOverflow(value, j, nn);
node.addChild(comp <= 0 ? mid : mid + 1, c);
}
}
- public AppendTrieDictionary<T> build(int baseId) throws IOException {
- if (dict == null) {
- dict = new AppendTrieDictionary<T>();
- }
- dict.update(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)mutableDictSliceMap);
- dict.flushIndex((CachedTreeMap) mutableDictSliceMap);
+ public synchronized AppendTrieDictionary<T> build(int baseId) throws IOException {
+ boolean keepAppend = releaseInstance(resourcePath);
+ CachedTreeMap dictSliceMap = (CachedTreeMap)mutableDictSliceMap;
+ dict.initParams(baseDir, baseId, maxId, maxValueLength, nValues, bytesConverter);
+ dict.flushIndex(dictSliceMap, keepAppend);
+ dict.initDictSliceMap(dictSliceMap);
return dict;
}
@@ -970,8 +1134,6 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
}
DictSlice slice = dictSliceMap.get(sliceKey);
int id = slice.getIdFromValueBytesImpl(value, offset, len, roundingFlag);
- if (id < 0)
- logger.error("Not a valid value: " + bytesConverter.convertFromBytes(value, offset, len));
return id;
}
@@ -1031,25 +1193,24 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
throw new UnsupportedOperationException("AppendTrieDictionary can't retrive value from id");
}
- public void flushIndex(CachedTreeMap dictSliceMap) throws IOException {
- Path filePath = new Path(dictSliceMap.getCurrentDir() + "/.index");
- Configuration conf = new Configuration();
- try (FSDataOutputStream indexOut = (FileSystem.get(filePath.toUri(), conf)).create(filePath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8)) {
+ public void flushIndex(CachedTreeMap dictSliceMap, boolean keepAppend) throws IOException {
+ try (FSDataOutputStream indexOut = dictSliceMap.openIndexOutput()) {
indexOut.writeInt(baseId);
indexOut.writeInt(maxId);
indexOut.writeInt(maxValueLength);
indexOut.writeInt(nValues);
indexOut.writeUTF(bytesConverter.getClass().getName());
dictSliceMap.write(indexOut);
+ dictSliceMap.commit(keepAppend);
}
- dictSliceMap.commit(false);
}
@Override
public AppendTrieDictionary copyToAnotherMeta(KylinConfig srcConfig, KylinConfig dstConfig) throws IOException {
Configuration conf = new Configuration();
AppendTrieDictionary newDict = new AppendTrieDictionary();
- newDict.update(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter, (CachedTreeMap)dictSliceMap);
+ newDict.initParams(baseDir.replaceFirst(srcConfig.getHdfsWorkingDirectory(), dstConfig.getHdfsWorkingDirectory()), baseId, maxId, maxValueLength, nValues, bytesConverter);
+ newDict.initDictSliceMap((CachedTreeMap)dictSliceMap);
logger.info("Copy AppendDict from {} to {}", this.baseDir, newDict.baseDir);
Path srcPath = new Path(this.baseDir);
Path dstPath = new Path(newDict.baseDir);
@@ -1071,9 +1232,8 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
@Override
public void readFields(DataInput in) throws IOException {
String baseDir = in.readUTF();
- Path filePath = new Path(baseDir + "/.index");
Configuration conf = new Configuration();
- try (FSDataInputStream input = (FileSystem.get(filePath.toUri(), conf)).open(filePath, 8 * 1024 * 1024)) {
+ try (FSDataInputStream input = CachedTreeMap.openLatestIndexInput(conf, baseDir)) {
int baseId = input.readInt();
int maxId = input.readInt();
int maxValueLength = input.readInt();
@@ -1087,10 +1247,13 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
throw new IOException(e);
}
}
+ initParams(baseDir, baseId, maxId, maxValueLength, nValues, converter);
+
+ // Create instance for deserialize data, and update to map in dict
CachedTreeMap dictMap = CachedTreeMap.CachedTreeMapBuilder.newBuilder()
- .baseDir(baseDir).persistent(true).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
+ .baseDir(baseDir).immutable(true).keyClazz(DictSliceKey.class).valueClazz(DictSlice.class).build();
dictMap.readFields(input);
- update(baseDir, baseId, maxId, maxValueLength, nValues, converter, dictMap);
+ initDictSliceMap(dictMap);
}
}
@@ -1120,4 +1283,6 @@ public class AppendTrieDictionary<T> extends Dictionary<T> {
public boolean contains(Dictionary other) {
return false;
}
+
}
+
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
new file mode 100644
index 0000000..f231275
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/AppendTrieDictionaryChecker.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Created by sunyerui on 16/11/15.
+ */
+public class AppendTrieDictionaryChecker {
+
+ public boolean runChecker(String baseDir) throws IOException {
+ Configuration conf = new Configuration();
+ Path basePath = new Path(baseDir);
+ FileSystem fs = FileSystem.get(basePath.toUri(), conf);
+ List<Path> sliceList = new ArrayList<>();
+ List<Path> corruptedSliceList = new ArrayList<>();
+ listDictSlicePath(fs, fs.getFileStatus(basePath), sliceList);
+
+ for (Path path : sliceList) {
+ if (!doCheck(fs, path)) {
+ System.out.println("AppendDict Slice " + path + " corrupted");
+ corruptedSliceList.add(path);
+ } else {
+ System.out.println("AppendDict Slice " + path + " is right");
+ }
+ }
+
+ if (corruptedSliceList.isEmpty()) {
+ System.out.println("ALL AppendDict Slices is right");
+ return true;
+ } else {
+ System.out.println("Some AppendDict Slice(s) corrupted: ");
+ for (Path path : corruptedSliceList) {
+ System.out.println(path.toString());
+ }
+ return false;
+ }
+ }
+
+ public void listDictSlicePath(FileSystem fs, FileStatus path, List<Path> list) throws IOException {
+ if (path.isDirectory()) {
+ for (FileStatus status : fs.listStatus(path.getPath())) {
+ listDictSlicePath(fs, status, list);
+ }
+ } else {
+ if (path.getPath().getName().startsWith(CachedTreeMap.CACHED_PREFIX)) {
+ list.add(path.getPath());
+ }
+ }
+ }
+
+ public boolean doCheck(FileSystem fs, Path filePath) {
+ try (FSDataInputStream input = fs.open(filePath, CachedTreeMap.BUFFER_SIZE)) {
+ AppendTrieDictionary.DictSlice slice = new AppendTrieDictionary.DictSlice();
+ slice.readFields(input);
+ return slice.doCheck();
+ } catch (Exception e) {
+ return false;
+ } catch (Error e) {
+ return false;
+ }
+ }
+
+ public static void main(String[] args) throws IOException {
+ String path = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict/";
+ if (args.length > 0) {
+ path = args[0];
+ }
+ System.out.println("Recursive Check AppendTrieDictionary Slices in path " + path);
+ AppendTrieDictionaryChecker checker = new AppendTrieDictionaryChecker();
+ if (checker.runChecker(path)) {
+ System.exit(0);
+ } else {
+ System.exit(-1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
index 1ea3c1c..6acf764 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/CachedTreeMap.java
@@ -31,9 +31,11 @@ import java.util.concurrent.ExecutionException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.slf4j.Logger;
@@ -56,25 +58,29 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
private final Class<V> valueClazz;
transient volatile Collection<V> values;
private final LoadingCache<K, V> valueCache;
- private final TreeSet<String> fileList;
private final Configuration conf;
- private final String baseDir;
- private final String tmpDir;
+ private final Path baseDir;
+ private final Path versionDir;
+ private final Path workingDir;
private final FileSystem fs;
- private final boolean persistent;
private final boolean immutable;
- private long writeValueTime = 0;
- private long readValueTime = 0;
+ private final int maxVersions;
+ private final long versionTTL;
+ private boolean keepAppend;
- private static final int BUFFER_SIZE = 8 * 1024 * 1024;
+ public static final int BUFFER_SIZE = 8 * 1024 * 1024;
+
+ public static final String CACHED_PREFIX = "cached_";
+ public static final String VERSION_PREFIX = "version_";
public static class CachedTreeMapBuilder<K, V> {
private Class<K> keyClazz;
private Class<V> valueClazz;
private int maxCount = 8;
private String baseDir;
- private boolean persistent;
private boolean immutable;
+ private int maxVersions;
+ private long versionTTL;
public static CachedTreeMapBuilder newBuilder() {
return new CachedTreeMapBuilder();
@@ -103,13 +109,18 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
return this;
}
- public CachedTreeMapBuilder<K, V> persistent(boolean persistent) {
- this.persistent = persistent;
+ public CachedTreeMapBuilder<K, V> immutable(boolean immutable) {
+ this.immutable = immutable;
return this;
}
- public CachedTreeMapBuilder<K, V> immutable(boolean immutable) {
- this.immutable = immutable;
+ public CachedTreeMapBuilder<K, V> maxVersions(int maxVersions) {
+ this.maxVersions = maxVersions;
+ return this;
+ }
+
+ public CachedTreeMapBuilder<K, V> versionTTL(long versionTTL) {
+ this.versionTTL = versionTTL;
return this;
}
@@ -120,26 +131,38 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
if (keyClazz == null || valueClazz == null) {
throw new RuntimeException("CachedTreeMap need key and value clazz to serialize data");
}
- CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, persistent, immutable);
+ CachedTreeMap map = new CachedTreeMap(maxCount, keyClazz, valueClazz, baseDir, immutable, maxVersions, versionTTL);
return map;
}
}
- private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String baseDir, boolean persistent, boolean immutable) throws IOException {
+ private CachedTreeMap(int maxCount, Class<K> keyClazz, Class<V> valueClazz, String basePath,
+ boolean immutable, int maxVersions, long versionTTL) throws IOException {
super();
this.keyClazz = keyClazz;
this.valueClazz = valueClazz;
- this.fileList = new TreeSet<>();
+ this.immutable = immutable;
+ this.keepAppend = true;
+ this.maxVersions = maxVersions;
+ this.versionTTL = versionTTL;
this.conf = new Configuration();
- if (baseDir.endsWith("/")) {
- this.baseDir = baseDir.substring(0, baseDir.length()-1);
- } else {
- this.baseDir = baseDir;
+ if (basePath.endsWith("/")) {
+ basePath = basePath.substring(0, basePath.length()-1);
+ }
+ this.baseDir = new Path(basePath);
+ this.fs = FileSystem.get(baseDir.toUri(), conf);
+ if (!fs.exists(baseDir)) {
+ fs.mkdirs(baseDir);
+ }
+ this.versionDir = getLatestVersion(conf, fs, baseDir);
+ this.workingDir = new Path(baseDir, "working");
+ if (!this.immutable) {
+ // For mutable map, copy all data into working dir and work on it, avoiding suddenly server crash made data corrupt
+ if (fs.exists(workingDir)) {
+ fs.delete(workingDir, true);
+ }
+ FileUtil.copy(fs, versionDir, fs, workingDir, false, true, conf);
}
- this.tmpDir = this.baseDir + ".tmp";
- this.fs = FileSystem.get(new Path(baseDir).toUri(), conf);
- this.persistent = persistent;
- this.immutable = immutable;
CacheBuilder builder = CacheBuilder.newBuilder().removalListener(new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
@@ -152,24 +175,14 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
deleteValue(notification.getKey());
break;
default:
- throw new RuntimeException("unexpected evict reason " + notification.getCause());
}
}
});
- // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc
if (this.immutable) {
+ // For immutable values, load all values as much as possible, and evict by soft reference to free memory when gc
builder.softValues();
} else {
builder.maximumSize(maxCount);
- // For mutable map, copy all data into tmp and modify on tmp data, avoiding suddenly server crash made data corrupt
- if (fs.exists(new Path(tmpDir))) {
- fs.delete(new Path(tmpDir), true);
- }
- if (fs.exists(new Path(this.baseDir))) {
- FileUtil.copy(fs, new Path(this.baseDir), fs, new Path(tmpDir), false, true, conf);
- } else {
- fs.mkdirs(new Path(this.baseDir));
- }
}
this.valueCache = builder.build(new CacheLoader<K, V>() {
@Override
@@ -182,38 +195,108 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
}
private String generateFileName(K key) {
- String file = (immutable ? baseDir : tmpDir) + "/cached_" + key.toString();
+ String file = getCurrentDir() + "/" + CACHED_PREFIX + key.toString();
return file;
}
- public String getCurrentDir() {
- return immutable ? baseDir : tmpDir;
+ private String getCurrentDir() {
+ return immutable ? versionDir.toString() : workingDir.toString();
}
- public void commit(boolean stillMutable) throws IOException {
- assert !immutable : "Only support commit method with immutable false";
+ private static String[] listAllVersions(FileSystem fs, Path baseDir) throws IOException {
+ FileStatus[] fileStatus = fs.listStatus(baseDir, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith(VERSION_PREFIX)) {
+ return true;
+ }
+ return false;
+ }
+ });
+ TreeSet<String> versions = new TreeSet<>();
+ for (FileStatus status : fileStatus) {
+ versions.add(status.getPath().toString());
+ }
+ return versions.toArray(new String[versions.size()]);
+ }
- Path basePath = new Path(baseDir);
- Path backupPath = new Path(baseDir+".bak");
- Path tmpPath = new Path(tmpDir);
- try {
- fs.rename(basePath, backupPath);
- } catch (IOException e) {
- logger.info("CachedTreeMap commit backup basedir failed, " + e, e);
- throw e;
+ // only for test
+ public String getLatestVersion() throws IOException {
+ return getLatestVersion(conf, fs, baseDir).toUri().getPath();
+ }
+
+ private static Path getLatestVersion(Configuration conf, FileSystem fs, Path baseDir) throws IOException {
+ String[] versions = listAllVersions(fs, baseDir);
+ if (versions.length > 0) {
+ return new Path(versions[versions.length - 1]);
+ } else {
+ // Old format, directly use base dir, convert to new format
+ Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis());
+ Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis());
+ Path indexFile = new Path(baseDir, ".index");
+ FileStatus[] cachedFiles;
+ try {
+ cachedFiles = fs.listStatus(baseDir, new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith(CACHED_PREFIX)) {
+ return true;
+ }
+ return false;
+ }
+ });
+ fs.mkdirs(tmpNewVersionDir);
+ if (fs.exists(indexFile) && cachedFiles.length > 0) {
+ FileUtil.copy(fs, indexFile, fs, tmpNewVersionDir, false, true, conf);
+ for (FileStatus file : cachedFiles) {
+ FileUtil.copy(fs, file.getPath(), fs, tmpNewVersionDir, false, true, conf);
+ }
+ }
+ fs.rename(tmpNewVersionDir, newVersionDir);
+ if (fs.exists(indexFile) && cachedFiles.length > 0) {
+ fs.delete(indexFile, true);
+ for (FileStatus file : cachedFiles) {
+ fs.delete(file.getPath(), true);
+ }
+ }
+ } finally {
+ if (fs.exists(tmpNewVersionDir)) {
+ fs.delete(tmpNewVersionDir, true);
+ }
+ }
+ return newVersionDir;
}
+ }
- try {
- if (stillMutable) {
- FileUtil.copy(fs, tmpPath, fs, basePath, false, true, conf);
- } else {
- fs.rename(tmpPath, basePath);
+ public void commit(boolean keepAppend) throws IOException {
+ assert this.keepAppend & !immutable : "Only support commit method with immutable false and keepAppend true";
+
+ Path newVersionDir = new Path(baseDir, VERSION_PREFIX + System.currentTimeMillis());
+ if (keepAppend) {
+ // Copy to tmp dir, and rename to new version, make sure it's complete when be visible
+ Path tmpNewVersionDir = new Path(baseDir, "tmp_" + VERSION_PREFIX + System.currentTimeMillis());
+ try {
+ FileUtil.copy(fs, workingDir, fs, tmpNewVersionDir, false, true, conf);
+ fs.rename(tmpNewVersionDir, newVersionDir);
+ } finally {
+ if (fs.exists(tmpNewVersionDir)) {
+ fs.delete(tmpNewVersionDir, true);
+ }
+ }
+ } else {
+ fs.rename(workingDir, newVersionDir);
+ }
+ this.keepAppend = keepAppend;
+
+ // Check versions count, delete expired versions
+ String[] versions = listAllVersions(fs, baseDir);
+ long timestamp = System.currentTimeMillis();
+ for (int i = 0; i < versions.length - maxVersions; i++) {
+ String versionString = versions[i].substring(versions[i].lastIndexOf(VERSION_PREFIX) + VERSION_PREFIX.length());
+ long version = Long.parseLong(versionString);
+ if (version + versionTTL < timestamp) {
+ fs.delete(new Path(versions[i]), true);
}
- fs.delete(backupPath, true);
- } catch (IOException e) {
- fs.rename(backupPath, basePath);
- logger.info("CachedTreeMap commit move/copy tmpdir failed, " + e, e);
- throw e;
}
}
@@ -227,25 +310,17 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
if (immutable) {
return;
}
- long t0 = System.currentTimeMillis();
String fileName = generateFileName(key);
Path filePath = new Path(fileName);
try (FSDataOutputStream out = fs.create(filePath, true, BUFFER_SIZE, (short) 5, BUFFER_SIZE * 8)) {
value.write(out);
- if (!persistent) {
- fs.deleteOnExit(filePath);
- }
} catch (Exception e) {
logger.error(String.format("write value into %s exception: %s", fileName, e), e);
throw new RuntimeException(e.getCause());
- } finally {
- fileList.add(fileName);
- writeValueTime += System.currentTimeMillis() - t0;
}
}
private V readValue(K key) throws Exception {
- long t0 = System.currentTimeMillis();
String fileName = generateFileName(key);
Path filePath = new Path(fileName);
try (FSDataInputStream input = fs.open(filePath, BUFFER_SIZE)) {
@@ -255,13 +330,11 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
} catch (Exception e) {
logger.error(String.format("read value from %s exception: %s", fileName, e), e);
return null;
- } finally {
- readValueTime += System.currentTimeMillis() - t0;
}
}
private void deleteValue(K key) {
- if (persistent && immutable) {
+ if (immutable) {
return;
}
String fileName = generateFileName(key);
@@ -272,14 +345,12 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
}
} catch (Exception e) {
logger.error(String.format("delete value file %s exception: %s", fileName, e), e);
- } finally {
- fileList.remove(fileName);
}
}
@Override
public V put(K key, V value) {
- assert !immutable : "Only support put method with immutable false";
+ assert keepAppend & !immutable : "Only support put method with immutable false and keepAppend true";
super.put(key, null);
valueCache.put(key, value);
return null;
@@ -301,7 +372,7 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
@Override
public V remove(Object key) {
- assert !immutable : "Only support remove method with immutable false";
+ assert keepAppend & !immutable : "Only support remove method with immutable false keepAppend true";
super.remove(key);
valueCache.invalidate(key);
return null;
@@ -357,15 +428,32 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
@Override
public void remove() {
- assert !immutable : "Only support remove method with immutable false";
+ assert keepAppend & !immutable : "Only support remove method with immutable false and keepAppend true";
keyIterator.remove();
valueCache.invalidate(currentKey);
}
}
+ public FSDataOutputStream openIndexOutput() throws IOException {
+ assert keepAppend & !immutable : "Only support write method with immutable false and keepAppend true";
+ Path indexPath = new Path(getCurrentDir(), ".index");
+ return fs.create(indexPath, true, 8 * 1024 * 1024, (short) 5, 8 * 1024 * 1024 * 8);
+ }
+
+ public FSDataInputStream openIndexInput() throws IOException {
+ Path indexPath = new Path(getCurrentDir(), ".index");
+ return fs.open(indexPath, 8 * 1024 * 1024);
+ }
+
+ public static FSDataInputStream openLatestIndexInput(Configuration conf, String baseDir) throws IOException {
+ Path basePath = new Path(baseDir);
+ FileSystem fs = FileSystem.get(basePath.toUri(), conf);
+ Path indexPath = new Path(getLatestVersion(conf, fs, basePath), ".index");
+ return fs.open(indexPath, 8 * 1024 * 1024);
+ }
+
@Override
public void write(DataOutput out) throws IOException {
- assert persistent : "Only support serialize with persistent true";
out.writeInt(size());
for (K key : keySet()) {
key.write(out);
@@ -378,7 +466,6 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
@Override
public void readFields(DataInput in) throws IOException {
- assert persistent : "Only support deserialize with persistent true";
int size = in.readInt();
try {
for (int i = 0; i < size; i++) {
@@ -390,27 +477,4 @@ public class CachedTreeMap<K extends WritableComparable, V extends Writable> ext
throw new IOException(e);
}
}
-
- // clean up all tmp files
- @Override
- public void finalize() throws Throwable {
- if (persistent) {
- return;
- }
- try {
- this.clear();
- for (String file : fileList) {
- try {
- Path filePath = new Path(file);
- fs.delete(filePath, true);
- } catch (Throwable t) {
- //do nothing?
- }
- }
- } catch (Throwable t) {
- //do nothing
- } finally {
- super.finalize();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index b2a3664..cda3c2b 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -19,14 +19,8 @@
package org.apache.kylin.dict;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.NavigableSet;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* GlobalDictinary based on whole cube, to ensure one value has same dict id in different segments.
@@ -34,41 +28,15 @@ import org.slf4j.LoggerFactory;
* Created by sunyerui on 16/5/24.
*/
public class GlobalDictionaryBuilder implements IDictionaryBuilder {
- private static final Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
-
AppendTrieDictionary.Builder<String> builder;
int baseId;
-
+
@Override
public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
if (dictInfo == null) {
throw new IllegalArgumentException("GlobalDictinaryBuilder must used with an existing DictionaryInfo");
}
- String dictDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir() + "/";
-
- // Try to load the existing dict from cache, making sure there's only the same one object in memory
- NavigableSet<String> dicts = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().listResources(dictInfo.getResourceDir());
- ArrayList<String> appendDicts = new ArrayList<>();
- if (dicts != null && !dicts.isEmpty()) {
- for (String dict : dicts) {
- DictionaryInfo info = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getStore().getResource(dict, DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
- if (info.getDictionaryClass().equals(AppendTrieDictionary.class.getName())) {
- appendDicts.add(dict);
- }
- }
- }
-
- if (appendDicts.isEmpty()) {
- logger.info("GlobalDict {} is empty, create new one", dictInfo.getResourceDir());
- this.builder = AppendTrieDictionary.Builder.create(dictDir);
- } else if (appendDicts.size() == 1) {
- logger.info("GlobalDict {} exist, append value", appendDicts.get(0));
- AppendTrieDictionary dict = (AppendTrieDictionary) DictionaryManager.getInstance(KylinConfig.getInstanceFromEnv()).getDictionary(appendDicts.get(0));
- this.builder = AppendTrieDictionary.Builder.create(dict);
- } else {
- throw new IllegalStateException(String.format("GlobalDict %s should have 0 or 1 append dict but %d", dictInfo.getResourceDir(), appendDicts.size()));
- }
-
+ this.builder = AppendTrieDictionary.Builder.getInstance(dictInfo.getResourceDir());
this.baseId = baseId;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4a0ee798/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
index 5e1705a..a7e8152 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -20,13 +20,15 @@ package org.apache.kylin.dict;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
-import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -35,6 +37,8 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Random;
import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -51,6 +55,7 @@ import org.junit.Test;
public class AppendTrieDictionaryTest {
public static final String BASE_DIR = "/tmp/kylin_append_dict";
+ public static final String RESOURCE_DIR = "/dict/append_dict_test";
@BeforeClass
public static void setUp() {
@@ -64,22 +69,28 @@ public class AppendTrieDictionaryTest {
@AfterClass
public static void tearDown() {
- String workingDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+ cleanup();
+ }
+
+// @After
+ public void afterTest() {
+ cleanup();
+ }
+
+ public static void cleanup() {
+ Configuration conf = new Configuration();
+ Path basePath = new Path(BASE_DIR);
try {
- FileSystem.get(new Path(workingDir).toUri(), new Configuration()).delete(new Path(workingDir), true);
- } catch (IOException e) {
- }
- File tmpLocalDir = new File(BASE_DIR);
- if (tmpLocalDir.exists()) {
- for (File f : tmpLocalDir.listFiles()) {
- f.delete();
- }
- tmpLocalDir.delete();
- }
+ FileSystem.get(basePath.toUri(), conf).delete(basePath, true);
+ } catch (IOException e) {}
}
public static final String[] words = new String[] { "paint", "par", "part", "parts", "partition", "partitions", "party", "partie", "parties", "patient", "taste", "tar", "trie", "try", "tries", "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
"", // empty
+ "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
+ "paiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiipaiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiiii",
+ "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
+ + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk",
"paint", "tar", "try", // some dup
};
@@ -131,8 +142,7 @@ public class AppendTrieDictionaryTest {
@Ignore("need huge key set")
@Test
public void testHugeKeySet() throws IOException {
- BytesConverter converter = new StringBytesConverter();
- AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create(BASE_DIR);
+ AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
AppendTrieDictionary<String> dict = null;
InputStream is = new FileInputStream("src/test/resources/dict/huge_key");
@@ -162,7 +172,7 @@ public class AppendTrieDictionaryTest {
}
BytesConverter converter = new StringBytesConverter();
- AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create(BASE_DIR);
+ AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
AppendTrieDictionary<String> dict = null;
TreeMap<Integer, String> checkMap = new TreeMap<>();
int firstAppend = rnd.nextInt(strList.size() / 2);
@@ -179,12 +189,14 @@ public class AppendTrieDictionaryTest {
String str = strList.get(checkIndex);
byte[] bytes = converter.convertToBytes(str);
int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ assertNotEquals(String.format("Value %s not exist", str), -1, id);
assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)), checkMap.containsKey(id) && !str.equals(checkMap.get(id)));
checkMap.put(id, str);
}
// reopen dict and append
- b = AppendTrieDictionary.Builder.create(dict);
+// b = AppendTrieDictionary.Builder.create(dict);
+ b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
for (; appendIndex < secondAppend; appendIndex++) {
b.addValue(strList.get(appendIndex));
}
@@ -197,6 +209,7 @@ public class AppendTrieDictionaryTest {
String str = strList.get(checkIndex);
byte[] bytes = converter.convertToBytes(str);
int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ assertNotEquals(String.format("Value %s not exist", str), -1, id);
if (checkIndex < firstAppend) {
assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
} else {
@@ -207,7 +220,7 @@ public class AppendTrieDictionaryTest {
}
// reopen dict and append rest str
- b = AppendTrieDictionary.Builder.create(dict);
+ b = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR, dict);
for (; appendIndex < strList.size(); appendIndex++) {
b.addValue(strList.get(appendIndex));
}
@@ -220,6 +233,7 @@ public class AppendTrieDictionaryTest {
String str = strList.get(checkIndex);
byte[] bytes = converter.convertToBytes(str);
int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ assertNotEquals(String.format("Value %s not exist", str), -1, id);
if (checkIndex < secondAppend) {
assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
} else {
@@ -240,6 +254,7 @@ public class AppendTrieDictionaryTest {
for (String str : strList) {
byte[] bytes = converter.convertToBytes(str);
int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ assertNotEquals(String.format("Value %s not exist", str), -1, id);
assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
}
}
@@ -260,4 +275,103 @@ public class AppendTrieDictionaryTest {
throw new RuntimeException(e);
}
}
-}
\ No newline at end of file
+
+ @Test
+ public void testMaxInteger() throws IOException {
+ AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+ builder.setMaxId(Integer.MAX_VALUE - 2);
+ builder.addValue("a");
+ builder.addValue("ab");
+ builder.addValue("acd");
+ builder.addValue("ac");
+ AppendTrieDictionary dict = builder.build(0);
+ assertEquals(2147483646, dict.getIdFromValueImpl("a", 0));
+ assertEquals(2147483647, dict.getIdFromValueImpl("ab", 0));
+ assertEquals(-2147483647, dict.getIdFromValueImpl("ac", 0));
+ assertEquals(-2147483648, dict.getIdFromValueImpl("acd", 0));
+ }
+
+ @Ignore("Only occurred when value is very long (>8000 bytes)")
+ @Test
+ public void testSuperLongValue() throws IOException {
+ AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(RESOURCE_DIR);
+ String value = "a";
+ for (int i = 0; i < 10000; i++) {
+ value += "a";
+ try {
+ builder.addValue(value);
+ } catch (StackOverflowError e) {
+ System.out.println("\nstack overflow " + i);
+ throw e;
+ }
+ }
+ AppendTrieDictionary dictionary = builder.build(0);
+ dictionary.getMaxId();
+ }
+
+ private static class SharedBuilderThread extends Thread {
+ CountDownLatch startLatch;
+ CountDownLatch finishLatch;
+ String resourcePath;
+ String prefix;
+ int count;
+
+ SharedBuilderThread(CountDownLatch startLatch, CountDownLatch finishLatch, String resourcePath, String prefix, int count) {
+ this.startLatch = startLatch;
+ this.finishLatch = finishLatch;
+ this.resourcePath = resourcePath;
+ this.prefix = prefix;
+ this.count = count;
+ }
+
+ @Override
+ public void run() {
+ try {
+ AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
+ startLatch.countDown();
+ for (int i = 0; i < count; i++) {
+ builder.addValue(prefix + i);
+ }
+ builder.build(0);
+ finishLatch.countDown();
+ } catch (IOException e) {}
+ }
+ }
+
+ @Test
+ public void testSharedBuilder() throws IOException, InterruptedException {
+ String resourcePath = "shared_builder";
+ final CountDownLatch startLatch = new CountDownLatch(3);
+ final CountDownLatch finishLatch = new CountDownLatch(3);
+
+ AppendTrieDictionary.Builder<String> builder = AppendTrieDictionary.Builder.getInstance(resourcePath);
+ Thread t1 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t1_", 10000);
+ Thread t2 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t2_", 10);
+ Thread t3 = new SharedBuilderThread(startLatch, finishLatch, resourcePath, "t3_", 100000);
+ t1.start();
+ t2.start();
+ t3.start();
+ startLatch.await();
+ AppendTrieDictionary dict = builder.build(0);
+ assertTrue("AppendDictBuilder Thread too slow", finishLatch.await(3000, TimeUnit.MILLISECONDS));
+ assertEquals(110010, dict.getMaxId());
+ try {
+ builder.addValue("fail");
+ fail("Builder should be closed");
+ } catch (Exception e) {}
+
+ builder = AppendTrieDictionary.Builder.getInstance(resourcePath, dict);
+ builder.addValue("success");
+ dict = builder.build(0);
+ for (int i = 0; i < 10000; i ++) {
+ assertNotEquals(-1, dict.getIdFromValue("t1_" + i));
+ }
+ for (int i = 0; i < 10; i ++) {
+ assertNotEquals(-1, dict.getIdFromValue("t2_" + i));
+ }
+ for (int i = 0; i < 100000; i ++) {
+ assertNotEquals(-1, dict.getIdFromValue("t3_" + i));
+ }
+ assertEquals(110011, dict.getIdFromValue("success"));
+ }
+}