You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by su...@apache.org on 2016/06/04 06:41:37 UTC

[2/6] kylin git commit: KYLIN-1705 Global (and more scalable) dictionary

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
new file mode 100644
index 0000000..10bbb77
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/AppendTrieDictionaryTest.java
@@ -0,0 +1,229 @@
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Created by sunyerui on 16/4/28.
+ */
+public class AppendTrieDictionaryTest {
+
+    @BeforeClass
+    public static void setUp() {
+        KylinConfig.destroyInstance();
+        System.setProperty(KylinConfig.KYLIN_CONF, "../examples/test_case_data/localmeta");
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setAppendDictEntrySize(50000);
+        config.setAppendDictCacheSize(3);
+        config.setProperty("kylin.hdfs.working.dir", "/tmp");
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        String workingDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+        try {
+            FileSystem.get(new Path(workingDir).toUri(), new Configuration()).delete(new Path(workingDir), true);
+        } catch (IOException e) {}
+    }
+
+    public static final String[] words = new String[]{
+        "paint", "par", "part", "parts", "partition", "partitions",
+        "party", "partie", "parties", "patient",
+        "taste", "tar", "trie", "try", "tries",
+        "\u5b57\u5178", "\u5b57\u5178\u6811", "\u5b57\u6bcd", // non-ascii characters
+        "",  // empty
+        "paint", "tar", "try", // some dup
+    };
+
+    @Test
+    public void testStringRepeatly() throws IOException {
+        ArrayList<String> list = new ArrayList<>();
+        Collections.addAll(list, words);
+        ArrayList<String> notfound = new ArrayList<>();
+        notfound.add("pa");
+        notfound.add("pars");
+        notfound.add("tri");
+        notfound.add("\u5b57");
+        for (int i = 0; i < 100; i++) {
+            testStringDictAppend(list, notfound, true);
+        }
+    }
+
+    @Test
+    public void englishWordsTest() throws Exception {
+        InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
+        ArrayList<String> str = loadStrings(is);
+        testStringDictAppend(str, null, false);
+    }
+
+    @Test
+    public void categoryNamesTest() throws Exception {
+        InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
+        ArrayList<String> str = loadStrings(is);
+        testStringDictAppend(str, null, true);
+    }
+
+    private static ArrayList<String> loadStrings(InputStream is) throws Exception {
+        ArrayList<String> r = new ArrayList<String>();
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+        try {
+            String word;
+            while ((word = reader.readLine()) != null) {
+                word = word.trim();
+                if (word.isEmpty() == false)
+                    r.add(word);
+            }
+        } finally {
+            reader.close();
+            is.close();
+        }
+        return r;
+    }
+
+    @Ignore("need huge key set")
+    @Test
+    public void testHugeKeySet() throws IOException {
+        BytesConverter converter = new StringBytesConverter();
+        AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create("/tmp");
+        AppendTrieDictionary<String> dict = null;
+
+        InputStream is = new FileInputStream("src/test/resources/dict/huge_key");
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+        try {
+            String word;
+            while ((word = reader.readLine()) != null) {
+                word = word.trim();
+                if (!word.isEmpty());
+                    b.addValue(word);
+            }
+        } finally {
+            reader.close();
+            is.close();
+        }
+        dict = b.build(0);
+        dict.dump(System.out);
+    }
+
+    private static void testStringDictAppend(ArrayList<String> list, ArrayList<String> notfound, boolean shuffleList) throws IOException {
+        Random rnd = new Random(System.currentTimeMillis());
+        ArrayList<String> strList = new ArrayList<String>();
+        strList.addAll(list);
+        if (shuffleList) {
+            Collections.shuffle(strList, rnd);
+        }
+        BytesConverter converter = new StringBytesConverter();
+
+        AppendTrieDictionary.Builder<String> b = AppendTrieDictionary.Builder.create("/tmp");
+        AppendTrieDictionary<String> dict = null;
+        TreeMap<Integer, String> checkMap = new TreeMap<>();
+        int firstAppend = rnd.nextInt(strList.size()/2);
+        int secondAppend = firstAppend + rnd.nextInt((strList.size()-firstAppend)/2);
+        int appendIndex = 0;
+        int checkIndex = 0;
+
+        for (; appendIndex < firstAppend; appendIndex++) {
+            b.addValue(strList.get(appendIndex));
+        }
+        dict = b.build(0);
+        dict.dump(System.out);
+        for (;checkIndex < firstAppend; checkIndex++) {
+            String str = strList.get(checkIndex);
+            byte[] bytes = converter.convertToBytes(str);
+            int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+            assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)),
+                    checkMap.containsKey(id) && !str.equals(checkMap.get(id)));
+            checkMap.put(id, str);
+        }
+
+        // reopen dict and append
+        b = AppendTrieDictionary.Builder.create(dict);
+        for (; appendIndex < secondAppend; appendIndex++) {
+            b.addValue(strList.get(appendIndex));
+        }
+        AppendTrieDictionary newDict = b.build(0);
+        assert newDict == dict;
+        dict = newDict;
+        dict.dump(System.out);
+        checkIndex = 0;
+        for (;checkIndex < secondAppend; checkIndex++) {
+            String str = strList.get(checkIndex);
+            byte[] bytes = converter.convertToBytes(str);
+            int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+            if (checkIndex < firstAppend) {
+                assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
+            } else {
+                // check second append str, should be new id
+                assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)),
+                    checkMap.containsKey(id) && !str.equals(checkMap.get(id)));
+                checkMap.put(id, str);
+            }
+        }
+
+        // reopen dict and append rest str
+        b = AppendTrieDictionary.Builder.create(dict);
+        for (; appendIndex < strList.size(); appendIndex++) {
+            b.addValue(strList.get(appendIndex));
+        }
+        newDict = b.build(0);
+        assert newDict == dict;
+        dict = newDict;
+        dict.dump(System.out);
+        checkIndex = 0;
+        for (; checkIndex < strList.size(); checkIndex++) {
+            String str = strList.get(checkIndex);
+            byte[] bytes = converter.convertToBytes(str);
+            int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+            if (checkIndex < secondAppend) {
+                assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
+            } else {
+                // check third append str, should be new id
+                assertFalse(String.format("Id %d for %s should be empty, but is %s", id, str, checkMap.get(id)),
+                    checkMap.containsKey(id) && !str.equals(checkMap.get(id)));
+                checkMap.put(id, str);
+            }
+        }
+        if (notfound != null) {
+            for (String s : notfound) {
+                byte[] bytes = converter.convertToBytes(s);
+                int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+                assertEquals(-1, id);
+            }
+        }
+
+        dict = testSerialize(dict, converter);
+        for (String str : strList) {
+            byte[] bytes = converter.convertToBytes(str);
+            int id = dict.getIdFromValueBytesImpl(bytes, 0, bytes.length, 0);
+            assertEquals("Except id " + id + " for " + str + " but " + checkMap.get(id), str, checkMap.get(id));
+        }
+    }
+
+    private static AppendTrieDictionary<String> testSerialize(AppendTrieDictionary<String> dict, BytesConverter converter) {
+        try {
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream dataout = new DataOutputStream(bout);
+            dict.write(dataout);
+            dataout.close();
+            ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+            DataInputStream datain = new DataInputStream(bin);
+            AppendTrieDictionary<String> r = new AppendTrieDictionary<String>();
+            r.readFields(datain);
+            datain.close();
+            return r;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/d1a9bab6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 3402a0c..f74df35 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import com.google.common.collect.Maps;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
@@ -120,17 +121,24 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         outputValue = new Text();
         
         dictMeasures = Lists.newArrayList();
+        oldDicts = Maps.newHashMap();
+        newDicts = Maps.newHashMap();
         for (int i = 0; i < measureDescs.size(); i++) {
             MeasureDesc measureDesc = measureDescs.get(i);
             MeasureType measureType = measureDesc.getFunction().getMeasureType();
-            if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
+            List<TblColRef> columns = measureType.getColumnsNeedDictionary(measureDesc.getFunction());
+            boolean needReEncode = false;
+            for (TblColRef col : columns) {
+                if (!sourceCubeSegment.getDictionary(col).equals(mergedCubeSegment.getDictionary(col))) {
+                    oldDicts.put(col, sourceCubeSegment.getDictionary(col));
+                    newDicts.put(col, mergedCubeSegment.getDictionary(col));
+                    needReEncode = true;
+                }
+            }
+            if (needReEncode) {
                 dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
             }
         }
-        if (dictMeasures.size() > 0) {
-            oldDicts = sourceCubeSegment.buildDictionaryMap();
-            newDicts = mergedCubeSegment.buildDictionaryMap();
-        }
     }
 
     private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");