You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by su...@apache.org on 2016/06/04 06:41:37 UTC
[2/6] kylin git commit: KYLIN-1705 Global (and more scalable)
dictionary
http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
new file mode 100644
index 0000000..10bbb77
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -0,0 +1,229 @@
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Created by sunyerui on 16/4/28.
+ */
+public class AppendTrieDictionaryTest {
+
+ @BeforeClass
+ public static void setUp() {
+ KylinConfig.destroyInstance();
+ System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta");
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setAppendDictEntrySize(50000);
+ config.setAppendDictCacheSize(3);
+ config.setProperty("kylin.hdfs.working.dir", "/tmp");
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ String workingDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+ try {
+ FileSystem.get(new Path(workingDir).toUri(), new Configuration()).delete(new Path(workingDir), true);
+ } catch (IOException e) {}
+ }
+
+ public static final String[] words = new String[]{
+ "paint", "par", "part", "parts", "partition", "partitions",
+ "party", "partie", "parties", "patient",
+ "taste", "tar", "trie", "try", "tries",
+ "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
+ "", // empty
+ "paint", "tar", "try", // some dup
+ };
+
+ @Test
+ public void testStringRepeatly() throws IOException {
+ ArrayList<String> list = new ArrayList<>();
+ Collections.addAll(list, words);
+ ArrayList<String> notfound = new ArrayList<>();
+ notfound.add("pa");
+ notfound.add("pars");
+ notfound.add("tri");
+ notfound.add("\u5b57");
+ for (int i = 0; i < 100; i++) {
+ testStringDictAppend(list, notfound, true);
+ }
+ }
+
+ @Test
+ public void englishWordsTest() throws Exception {
+ InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
+ ArrayList<String> str = loadStrings(is);
+ testStringDictAppend(str, null, false);
+ }
+
+ @Test
+ public void categoryNamesTest() throws Exception {
+ InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
+ ArrayList<String> str = loadStrings(is);
+ testStringDictAppend(str, null, true);
+ }
+
+ private static ArrayList<String> loadStrings(InputStream is) throws Exception {
+ ArrayList<String> r = new ArrayList<String>();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ try {
+ String word;
+ while ((word = reader.readLine()) != null) {
+ word = word.trim();
+ if (word.isEmpty() == false)
+ r.add(word);
+ }
+ } finally {
+ reader.close();
+ is.close();
+ }
+ return r;
+ }
+
+ @Ignore("need huge key set")
+ @Test
+ public void testHugeKeySet() throws IOException {
+ BytesConverter converter = new StringBytesConverter();
+ AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create("/tmp");
+ AppendTrieDictionary<String> dict = null;
+
+ InputStream is = new FileInputStream("src/test/resources/dict/huge_key");
+ BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+ try {
+ String word;
+ while ((word = reader.readLine()) != null) {
+ word = word.trim();
+ if (!word.isEmpty());
+ b.addValue(word);
+ }
+ } finally {
+ reader.close();
+ is.close();
+ }
+ dict = b.build(0);
+ dict.dump(System.out);
+ }
+
+ private static void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException {
+ Random rnd = new Random(System.currentTimeMillis());
+ ArrayList<String> strList = new ArrayList<String>();
+ strList.addAll(list);
+ if (shuffleList) {
+ Collections.shuffle(strList, rnd);
+ }
+ BytesConverter converter = new StringBytesConverter();
+
+ AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create("/tmp");
+ AppendTrieDictionary<String> dict = null;
+ TreeMap<Integer, String> checkMap = new TreeMap<>();
+ int firstAppend = rnd.nextInt(strList.size()/2);
+ int secondAppend = firstAppend + rnd.nextInt((strList.size()-firstAppend)/2);
+ int appendIndex = 0;
+ int checkIndex = 0;
+
+ for (; appendIndex < firstAppend; appendIndex++) {
+ b.addValue(strList.get(appendIndex));
+ }
+ dict = b.build(0);
+ dict.dump(System.out);
+ for (;checkIndex < firstAppend; checkIndex++) {
+ String str = strList.get(checkIndex);
+ byte[] bytes = converter.convertToBytes(str);
+ int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)),
+ checkMap.containsKey(id) && !str.equals(checkMap.get(id)));
+ checkMap.put(id, str);
+ }
+
+ // reopen dict and append
+ b = AppendTrieDictionary.Builder.create(dict);
+ for (; appendIndex < secondAppend; appendIndex++) {
+ b.addValue(strList.get(appendIndex));
+ }
+ AppendTrieDictionary newDict = b.build(0);
+ assert newDict == dict;
+ dict = newDict;
+ dict.dump(System.out);
+ checkIndex = 0;
+ for (;checkIndex < secondAppend; checkIndex++) {
+ String str = strList.get(checkIndex);
+ byte[] bytes = converter.convertToBytes(str);
+ int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ if (checkIndex < firstAppend) {
+ assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
+ } else {
+ // check second append str, should be new id
+ assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)),
+ checkMap.containsKey(id) && !str.equals(checkMap.get(id)));
+ checkMap.put(id, str);
+ }
+ }
+
+ // reopen dict and append rest str
+ b = AppendTrieDictionary.Builder.create(dict);
+ for (; appendIndex < strList.size(); appendIndex++) {
+ b.addValue(strList.get(appendIndex));
+ }
+ newDict = b.build(0);
+ assert newDict == dict;
+ dict = newDict;
+ dict.dump(System.out);
+ checkIndex = 0;
+ for (; checkIndex < strList.size(); checkIndex++) {
+ String str = strList.get(checkIndex);
+ byte[] bytes = converter.convertToBytes(str);
+ int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ if (checkIndex < secondAppend) {
+ assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
+ } else {
+ // check third append str, should be new id
+ assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)),
+ checkMap.containsKey(id) && !str.equals(checkMap.get(id)));
+ checkMap.put(id, str);
+ }
+ }
+ if (notfound != null) {
+ for (String s : notfound) {
+ byte[] bytes = converter.convertToBytes(s);
+ int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ assertEquals(-1, id);
+ }
+ }
+
+ dict = testSerialize(dict, converter);
+ for (String str : strList) {
+ byte[] bytes = converter.convertToBytes(str);
+ int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+ assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
+ }
+ }
+
+ private static AppendTrieDictionary<String> testSerialize(AppendTrieDictionary<String> dict, BytesConverter converter) {
+ try {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ DataOutputStream dataout = new DataOutputStream(bout);
+ dict.write(dataout);
+ dataout.close();
+ ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+ DataInputStream datain = new DataInputStream(bin);
+ AppendTrieDictionary<String> r = new AppendTrieDictionary<String>();
+ r.readFields(datain);
+ datain.close();
+ return r;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 3402a0c..f74df35 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -26,6 +26,7 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.collect.Maps;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
@@ -120,17 +121,24 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
outputValue = new Text();
dictMeasures = Lists.newArrayList();
+ oldDicts = Maps.newHashMap();
+ newDicts = Maps.newHashMap();
for (int i = 0; i < measureDescs.size(); i++) {
MeasureDesc measureDesc = measureDescs.get(i);
MeasureType measureType = measureDesc.getFunction().getMeasureType();
- if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
+ List<TblColRef> columns = measureType.getColumnsNeedDictionary(measureDesc.getFunction());
+ boolean needReEncode = false;
+ for (TblColRef col : columns) {
+ if (!sourceCubeSegment.getDictionary(col).equals(mergedCubeSegment.getDictionary(col))) {
+ oldDicts.put(col, sourceCubeSegment.getDictionary(col));
+ newDicts.put(col, mergedCubeSegment.getDictionary(col));
+ needReEncode = true;
+ }
+ }
+ if (needReEncode) {
dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
}
}
- if (dictMeasures.size() > 0) {
- oldDicts = sourceCubeSegment.buildDictionaryMap();
- newDicts = mergedCubeSegment.buildDictionaryMap();
- }
}
private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");