You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/07/03 08:57:08 UTC
[2/3] incubator-kylin git commit: KYLIN-871 growing dict
KYLIN-871 growing dict
KYLIN-871 growing dict
KYLIN-871
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/96b9aac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/96b9aac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/96b9aac7
Branch: refs/heads/0.8
Commit: 96b9aac70363d4246b225bfb1196696255db13f0
Parents: 230fe40
Author: honma <ho...@ebay.com>
Authored: Fri Jul 3 11:19:53 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Jul 3 14:56:19 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/dict/DateStrDictionary.java | 15 +++---
.../java/org/apache/kylin/dict/Dictionary.java | 31 +++++++++--
.../apache/kylin/dict/DictionaryManager.java | 55 +++++++++++++-------
.../apache/kylin/dict/TimeStrDictionary.java | 5 ++
.../org/apache/kylin/dict/TrieDictionary.java | 34 +++++++-----
.../apache/kylin/dict/TrieDictionaryTest.java | 47 +++++++++++------
.../apache/kylin/streaming/StreamingUtil.java | 2 +-
7 files changed, 128 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
index acd7404..0ee1d06 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
@@ -18,16 +18,12 @@
package org.apache.kylin.dict;
-import static org.apache.kylin.common.util.DateFormat.*;
+import org.apache.commons.lang.StringUtils;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
import java.util.Date;
-import org.apache.commons.lang.StringUtils;
+import static org.apache.kylin.common.util.DateFormat.*;
/**
* A dictionary for date string (date only, no time).
@@ -178,6 +174,11 @@ public class DateStrDictionary extends Dictionary<String> {
}
@Override
+ public boolean containedBy(Dictionary other) {
+ return this.equals(other);
+ }
+
+ @Override
public void dump(PrintStream out) {
out.println(this.toString());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
index be2429c..1df950d 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
@@ -65,6 +65,11 @@ abstract public class Dictionary<T> implements Writable {
abstract public int getSizeOfValue();
/**
+ * @return true if each entry of this dict is contained by the dict in param
+ */
+ abstract public boolean containedBy(Dictionary another);
+
+ /**
* Convenient form of <code>getIdFromValue(value, 0)</code>
*/
final public int getIdFromValue(T value) throws IllegalArgumentException {
@@ -91,6 +96,20 @@ abstract public class Dictionary<T> implements Writable {
return getIdFromValueImpl(value, roundingFlag);
}
+ final public boolean containsValue(T value) throws IllegalArgumentException {
+ if (isNullObjectForm(value)) {
+ return true;
+ } else {
+ try {
+ //if no key found, it will throw exception
+ getIdFromValueImpl(value, 0);
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ return true;
+ }
+ }
+
protected boolean isNullObjectForm(T value) {
return value == null;
}
@@ -135,8 +154,12 @@ abstract public class Dictionary<T> implements Writable {
final public int getIdFromValueBytes(byte[] value, int offset, int len, int roundingFlag) throws IllegalArgumentException {
if (isNullByteForm(value, offset, len))
return nullId();
- else
- return getIdFromValueBytesImpl(value, offset, len, roundingFlag);
+ else {
+ int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
+ if (id < 0)
+ throw new IllegalArgumentException("Value not exists!");
+ return id;
+ }
}
protected boolean isNullByteForm(byte[] value, int offset, int len) {
@@ -151,9 +174,9 @@ abstract public class Dictionary<T> implements Writable {
else
return getValueBytesFromIdImpl(id);
}
-
+
abstract protected byte[] getValueBytesFromIdImpl(int id);
-
+
/**
* A lower level API, get byte values from ID, return the number of bytes
* written. Bypassing the cache layer, this could be significantly slower
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 0c6dd57..d74f1c8 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,7 +18,7 @@
package org.apache.kylin.dict;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -102,15 +102,30 @@ public class DictionaryManager {
public DictionaryInfo trySaveNewDict(Dictionary<?> newDict, DictionaryInfo newDictInfo) throws IOException {
- String dupDict = checkDupByContent(newDictInfo, newDict);
- if (dupDict != null) {
- logger.info("Identical dictionary content " + newDict + ", reuse existing dictionary at " + dupDict);
- return getDictionaryInfo(dupDict);
- }
-
newDictInfo.setDictionaryObject(newDict);
newDictInfo.setDictionaryClass(newDict.getClass().getName());
+ DictionaryInfo largestDict = findLargestDict(newDictInfo);
+ if (largestDict != null) {
+ if (newDict.containedBy(largestDict.getDictionaryObject())) {
+ logger.info("dictionary content " + newDict + ", is contained by dictionary at " + largestDict.getResourcePath());
+ return largestDict;
+ } else if (largestDict.getDictionaryObject().containedBy(newDict)) {
+ logger.info("dictionary content " + newDict + " is by far the largest, save it");
+ return saveNewDict(newDict, newDictInfo);
+ } else {
+ logger.info("merge dict and save...");
+ return mergeDictionary(Lists.newArrayList(newDictInfo, largestDict));
+ }
+ } else {
+ logger.info("first dict of this column, save it directly");
+ return saveNewDict(newDict, newDictInfo);
+ }
+ }
+
+ private DictionaryInfo saveNewDict(Dictionary<?> newDict, DictionaryInfo newDictInfo) throws IOException {
+
+
save(newDictInfo);
dictCache.put(newDictInfo.getResourcePath(), newDictInfo);
@@ -307,10 +322,7 @@ public class DictionaryManager {
}
Collections.sort(existings);
- final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(existings.get(0),
- existings.get(existings.size() - 1),
- DictionaryInfo.class,
- DictionaryInfoSerializer.INFO_SERIALIZER);
+ final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(existings.get(0), existings.get(existings.size() - 1), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
TableSignature input = dictInfo.getInput();
@@ -322,25 +334,28 @@ public class DictionaryManager {
return null;
}
- private String checkDupByContent(DictionaryInfo dictInfo, Dictionary<?> dict) throws IOException {
+ private DictionaryInfo findLargestDict(DictionaryInfo dictInfo) throws IOException {
ResourceStore store = MetadataManager.getInstance(config).getStore();
ArrayList<String> dictInfos = store.listResources(dictInfo.getResourceDir());
if (dictInfos == null || dictInfos.isEmpty()) {
return null;
}
- Collections.sort(dictInfos);
+ //Collections.sort(dictInfos);
- final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(dictInfos.get(0),
- dictInfos.get(dictInfos.size() - 1),
- DictionaryInfo.class,
- DictionaryInfoSerializer.FULL_SERIALIZER);
+ final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(dictInfos.get(0), dictInfos.get(dictInfos.size() - 1), DictionaryInfo.class, DictionaryInfoSerializer.FULL_SERIALIZER);
+ DictionaryInfo largestDict = null;
for (DictionaryInfo dictionaryInfo : allResources) {
- if (dict.equals(dictionaryInfo.getDictionaryObject())) {
- return dictionaryInfo.getResourcePath();
+ if (largestDict == null) {
+ largestDict = dictionaryInfo;
+ continue;
+ }
+
+ if (largestDict.getDictionaryObject().getSize() < dictionaryInfo.getDictionaryObject().getSize()) {
+ largestDict = dictionaryInfo;
}
}
- return null;
+ return largestDict;
}
public void removeDictionary(String resourcePath) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
index 2c576e7..92f3a3c 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
@@ -106,6 +106,11 @@ public class TimeStrDictionary extends Dictionary<String> {
}
@Override
+ public boolean containedBy(Dictionary other) {
+ return this.equals(other);
+ }
+
+ @Override
public void write(DataOutput out) throws IOException {
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index 8b8a815..ab27b4c 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -18,22 +18,17 @@
package org.apache.kylin.dict;
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.lang.ref.SoftReference;
-import java.util.Arrays;
-import java.util.HashMap;
-
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ClassUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.*;
+import java.lang.ref.SoftReference;
+import java.util.Arrays;
+import java.util.HashMap;
+
/**
* A dictionary based on Trie data structure that maps enumerations of byte[] to
* int IDs.
@@ -173,7 +168,7 @@ public class TrieDictionary<T> extends Dictionary<T> {
int seq = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
int id = calcIdFromSeqNo(seq);
if (id < 0)
- throw new IllegalArgumentException("Not a valid value: " + bytesConvert.convertFromBytes(value, offset, len));
+ logger.error("Not a valid value: " + bytesConvert.convertFromBytes(value, offset, len));
return id;
}
@@ -478,13 +473,28 @@ public class TrieDictionary<T> extends Dictionary<T> {
@Override
public boolean equals(Object o) {
if ((o instanceof TrieDictionary) == false) {
- logger.info("Equals return false because o is not TrieDictionary");
+ logger.info("Equals return false because it's not TrieDictionary");
return false;
}
TrieDictionary that = (TrieDictionary) o;
return Arrays.equals(this.trieBytes, that.trieBytes);
}
+ @Override
+ public boolean containedBy(Dictionary other) {
+ if (this.getSize() > other.getSize()) {
+ return false;
+ }
+
+ for (int i = getMinId(); i <= getMaxId(); ++i) {
+ T v = this.getValueFromId(i);
+ if (!other.containsValue(v)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
public static void main(String[] args) throws Exception {
TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter());
b.addValue("part");
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java b/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
index 4ad89d5..f6031e8 100644
--- a/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
@@ -18,25 +18,12 @@
package org.apache.kylin.dict;
-import static org.junit.Assert.*;
+import org.junit.Test;
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.TreeSet;
+import java.io.*;
+import java.util.*;
-import org.junit.Test;
+import static org.junit.Assert.*;
public class TrieDictionaryTest {
@@ -132,6 +119,32 @@ public class TrieDictionaryTest {
}
@Test
+ public void dictionaryContainTest()
+ {
+ ArrayList<String> str = new ArrayList<String>();
+ str.add("part");
+ str.add("part"); // meant to be dup
+ str.add("par");
+ str.add("partition");
+ str.add("party");
+ str.add("parties");
+ str.add("paint");
+
+ TrieDictionaryBuilder<String> b = newDictBuilder(str);
+ int baseId = new Random().nextInt(100);
+ TrieDictionary<String> dict = b.build(baseId);
+
+ str.add("py");
+ b = newDictBuilder(str);
+ baseId = new Random().nextInt(100);
+ TrieDictionary<String> dict2 = b.build(baseId);
+
+ assertEquals(true,dict.containedBy(dict2));
+ assertEquals(false,dict2.containedBy(dict));
+ }
+
+
+ @Test
public void englishWordsTest() throws Exception {
InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
ArrayList<String> str = loadStrings(is);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 961c725..49ef227 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -63,7 +63,7 @@ public final class StreamingUtil {
}
public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamParser streamParser) {
- Pair<Long,Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
+ Pair<Long, Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
final String topic = kafkaClusterConfig.getTopic();
logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond()));