You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2015/07/03 08:57:08 UTC

[2/3] incubator-kylin git commit: KYLIN-871 growing dict

KYLIN-871 growing dict

KYLIN-871 growing dict

KYLIN-871


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/96b9aac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/96b9aac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/96b9aac7

Branch: refs/heads/0.8
Commit: 96b9aac70363d4246b225bfb1196696255db13f0
Parents: 230fe40
Author: honma <ho...@ebay.com>
Authored: Fri Jul 3 11:19:53 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Jul 3 14:56:19 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/dict/DateStrDictionary.java    | 15 +++---
 .../java/org/apache/kylin/dict/Dictionary.java  | 31 +++++++++--
 .../apache/kylin/dict/DictionaryManager.java    | 55 +++++++++++++-------
 .../apache/kylin/dict/TimeStrDictionary.java    |  5 ++
 .../org/apache/kylin/dict/TrieDictionary.java   | 34 +++++++-----
 .../apache/kylin/dict/TrieDictionaryTest.java   | 47 +++++++++++------
 .../apache/kylin/streaming/StreamingUtil.java   |  2 +-
 7 files changed, 128 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
index acd7404..0ee1d06 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
@@ -18,16 +18,12 @@
 
 package org.apache.kylin.dict;
 
-import static org.apache.kylin.common.util.DateFormat.*;
+import org.apache.commons.lang.StringUtils;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.io.UnsupportedEncodingException;
+import java.io.*;
 import java.util.Date;
 
-import org.apache.commons.lang.StringUtils;
+import static org.apache.kylin.common.util.DateFormat.*;
 
 /**
  * A dictionary for date string (date only, no time).
@@ -178,6 +174,11 @@ public class DateStrDictionary extends Dictionary<String> {
     }
 
     @Override
+    public boolean containedBy(Dictionary other) {
+        return this.equals(other);
+    }
+
+    @Override
     public void dump(PrintStream out) {
         out.println(this.toString());
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
index be2429c..1df950d 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
@@ -65,6 +65,11 @@ abstract public class Dictionary<T> implements Writable {
     abstract public int getSizeOfValue();
 
     /**
+     * @return true if each entry of this dict is contained by the dict in param
+     */
+    abstract public boolean containedBy(Dictionary another);
+
+    /**
      * Convenient form of <code>getIdFromValue(value, 0)</code>
      */
     final public int getIdFromValue(T value) throws IllegalArgumentException {
@@ -91,6 +96,20 @@ abstract public class Dictionary<T> implements Writable {
             return getIdFromValueImpl(value, roundingFlag);
     }
 
+    final public boolean containsValue(T value) throws IllegalArgumentException {
+        if (isNullObjectForm(value)) {
+            return true;
+        } else {
+            try {
+                //if no key found, it will throw exception
+                getIdFromValueImpl(value, 0);
+            } catch (IllegalArgumentException e) {
+                return false;
+            }
+            return true;
+        }
+    }
+
     protected boolean isNullObjectForm(T value) {
         return value == null;
     }
@@ -135,8 +154,12 @@ abstract public class Dictionary<T> implements Writable {
     final public int getIdFromValueBytes(byte[] value, int offset, int len, int roundingFlag) throws IllegalArgumentException {
         if (isNullByteForm(value, offset, len))
             return nullId();
-        else
-            return getIdFromValueBytesImpl(value, offset, len, roundingFlag);
+        else {
+            int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
+            if (id < 0)
+                throw new IllegalArgumentException("Value not exists!");
+            return id;
+        }
     }
 
     protected boolean isNullByteForm(byte[] value, int offset, int len) {
@@ -151,9 +174,9 @@ abstract public class Dictionary<T> implements Writable {
         else
             return getValueBytesFromIdImpl(id);
     }
-    
+
     abstract protected byte[] getValueBytesFromIdImpl(int id);
-    
+
     /**
      * A lower level API, get byte values from ID, return the number of bytes
      * written. Bypassing the cache layer, this could be significantly slower

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 0c6dd57..d74f1c8 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -18,7 +18,7 @@
 
 package org.apache.kylin.dict;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import org.apache.commons.compress.utils.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -102,15 +102,30 @@ public class DictionaryManager {
 
     public DictionaryInfo trySaveNewDict(Dictionary<?> newDict, DictionaryInfo newDictInfo) throws IOException {
 
-        String dupDict = checkDupByContent(newDictInfo, newDict);
-        if (dupDict != null) {
-            logger.info("Identical dictionary content " + newDict + ", reuse existing dictionary at " + dupDict);
-            return getDictionaryInfo(dupDict);
-        }
-
         newDictInfo.setDictionaryObject(newDict);
         newDictInfo.setDictionaryClass(newDict.getClass().getName());
 
+        DictionaryInfo largestDict = findLargestDict(newDictInfo);
+        if (largestDict != null) {
+            if (newDict.containedBy(largestDict.getDictionaryObject())) {
+                logger.info("dictionary content " + newDict + ", is contained by  dictionary at " + largestDict.getResourcePath());
+                return largestDict;
+            } else if (largestDict.getDictionaryObject().containedBy(newDict)) {
+                logger.info("dictionary content " + newDict + " is by far the largest, save it");
+                return saveNewDict(newDict, newDictInfo);
+            } else {
+                logger.info("merge dict and save...");
+                return mergeDictionary(Lists.newArrayList(newDictInfo, largestDict));
+            }
+        } else {
+            logger.info("first dict of this column, save it directly");
+            return saveNewDict(newDict, newDictInfo);
+        }
+    }
+
+    private DictionaryInfo saveNewDict(Dictionary<?> newDict, DictionaryInfo newDictInfo) throws IOException {
+
+
         save(newDictInfo);
         dictCache.put(newDictInfo.getResourcePath(), newDictInfo);
 
@@ -307,10 +322,7 @@ public class DictionaryManager {
         }
         Collections.sort(existings);
 
-        final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(existings.get(0),
-                existings.get(existings.size() - 1),
-                DictionaryInfo.class,
-                DictionaryInfoSerializer.INFO_SERIALIZER);
+        final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(existings.get(0), existings.get(existings.size() - 1), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
 
         TableSignature input = dictInfo.getInput();
 
@@ -322,25 +334,28 @@ public class DictionaryManager {
         return null;
     }
 
-    private String checkDupByContent(DictionaryInfo dictInfo, Dictionary<?> dict) throws IOException {
+    private DictionaryInfo findLargestDict(DictionaryInfo dictInfo) throws IOException {
         ResourceStore store = MetadataManager.getInstance(config).getStore();
         ArrayList<String> dictInfos = store.listResources(dictInfo.getResourceDir());
         if (dictInfos == null || dictInfos.isEmpty()) {
             return null;
         }
-        Collections.sort(dictInfos);
+        //Collections.sort(dictInfos);
 
-        final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(dictInfos.get(0),
-                dictInfos.get(dictInfos.size() - 1),
-                DictionaryInfo.class,
-                DictionaryInfoSerializer.FULL_SERIALIZER);
+        final List<DictionaryInfo> allResources = MetadataManager.getInstance(config).getStore().getAllResources(dictInfos.get(0), dictInfos.get(dictInfos.size() - 1), DictionaryInfo.class, DictionaryInfoSerializer.FULL_SERIALIZER);
 
+        DictionaryInfo largestDict = null;
         for (DictionaryInfo dictionaryInfo : allResources) {
-            if (dict.equals(dictionaryInfo.getDictionaryObject())) {
-                return dictionaryInfo.getResourcePath();
+            if (largestDict == null) {
+                largestDict = dictionaryInfo;
+                continue;
+            }
+
+            if (largestDict.getDictionaryObject().getSize() < dictionaryInfo.getDictionaryObject().getSize()) {
+                largestDict = dictionaryInfo;
             }
         }
-        return null;
+        return largestDict;
     }
 
     public void removeDictionary(String resourcePath) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
index 2c576e7..92f3a3c 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
@@ -106,6 +106,11 @@ public class TimeStrDictionary extends Dictionary<String> {
     }
 
     @Override
+    public boolean containedBy(Dictionary other) {
+        return this.equals(other);
+    }
+
+    @Override
     public void write(DataOutput out) throws IOException {
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index 8b8a815..ab27b4c 100644
--- a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -18,22 +18,17 @@
 
 package org.apache.kylin.dict;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.lang.ref.SoftReference;
-import java.util.Arrays;
-import java.util.HashMap;
-
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ClassUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.*;
+import java.lang.ref.SoftReference;
+import java.util.Arrays;
+import java.util.HashMap;
+
 /**
  * A dictionary based on Trie data structure that maps enumerations of byte[] to
  * int IDs.
@@ -173,7 +168,7 @@ public class TrieDictionary<T> extends Dictionary<T> {
         int seq = lookupSeqNoFromValue(headSize, value, offset, offset + len, roundingFlag);
         int id = calcIdFromSeqNo(seq);
         if (id < 0)
-            throw new IllegalArgumentException("Not a valid value: " + bytesConvert.convertFromBytes(value, offset, len));
+            logger.error("Not a valid value: " + bytesConvert.convertFromBytes(value, offset, len));
         return id;
     }
 
@@ -478,13 +473,28 @@ public class TrieDictionary<T> extends Dictionary<T> {
     @Override
     public boolean equals(Object o) {
         if ((o instanceof TrieDictionary) == false) {
-            logger.info("Equals return false because o is not TrieDictionary");
+            logger.info("Equals return false because it's not TrieDictionary");
             return false;
         }
         TrieDictionary that = (TrieDictionary) o;
         return Arrays.equals(this.trieBytes, that.trieBytes);
     }
 
+    @Override
+    public boolean containedBy(Dictionary other) {
+        if (this.getSize() > other.getSize()) {
+            return false;
+        }
+
+        for (int i = getMinId(); i <= getMaxId(); ++i) {
+            T v = this.getValueFromId(i);
+            if (!other.containsValue(v)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     public static void main(String[] args) throws Exception {
         TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<String>(new StringBytesConverter());
         b.addValue("part");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
----------------------------------------------------------------------
diff --git a/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java b/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
index 4ad89d5..f6031e8 100644
--- a/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryTest.java
@@ -18,25 +18,12 @@
 
 package org.apache.kylin.dict;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.TreeSet;
+import java.io.*;
+import java.util.*;
 
-import org.junit.Test;
+import static org.junit.Assert.*;
 
 public class TrieDictionaryTest {
 
@@ -132,6 +119,32 @@ public class TrieDictionaryTest {
     }
 
     @Test
+    public void dictionaryContainTest()
+    {
+        ArrayList<String> str = new ArrayList<String>();
+        str.add("part");
+        str.add("part"); // meant to be dup
+        str.add("par");
+        str.add("partition");
+        str.add("party");
+        str.add("parties");
+        str.add("paint");
+
+        TrieDictionaryBuilder<String> b = newDictBuilder(str);
+        int baseId = new Random().nextInt(100);
+        TrieDictionary<String> dict = b.build(baseId);
+
+        str.add("py");
+        b = newDictBuilder(str);
+        baseId = new Random().nextInt(100);
+        TrieDictionary<String> dict2 = b.build(baseId);
+
+        assertEquals(true,dict.containedBy(dict2));
+        assertEquals(false,dict2.containedBy(dict));
+    }
+
+
+    @Test
     public void englishWordsTest() throws Exception {
         InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
         ArrayList<String> str = loadStrings(is);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/96b9aac7/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 961c725..49ef227 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -63,7 +63,7 @@ public final class StreamingUtil {
     }
 
     public static long findClosestOffsetWithDataTimestamp(KafkaClusterConfig kafkaClusterConfig, int partitionId, long timestamp, StreamParser streamParser) {
-        Pair<Long,Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
+        Pair<Long, Long> firstAndLast = getFirstAndLastOffset(kafkaClusterConfig, partitionId);
         final String topic = kafkaClusterConfig.getTopic();
 
         logger.info(String.format("topic: %s, partitionId: %d, try to find closest offset with timestamp: %d between offset {%d, %d}", topic, partitionId, timestamp, firstAndLast.getFirst(), firstAndLast.getSecond()));