You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/11 03:32:45 UTC

incubator-kylin git commit: KYLIN-1099 Support dictionary of cardinality over 10 millions

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging a3397d044 -> 64e774e83


KYLIN-1099 Support dictionary of cardinality over 10 millions

Signed-off-by: Li, Yang <ya...@ebay.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/64e774e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/64e774e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/64e774e8

Branch: refs/heads/2.x-staging
Commit: 64e774e83310e7a9b3ec51684684083c08c160f2
Parents: a3397d0
Author: lidongsjtu <do...@ebay.com>
Authored: Fri Oct 30 11:28:13 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Wed Nov 11 10:29:47 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/job/BuildIIWithStreamTest.java |   2 +-
 .../org/apache/kylin/cube/util/CubingUtils.java |   7 +-
 .../cube/inmemcubing/InMemCubeBuilderTest.java  |   5 +-
 .../apache/kylin/dict/DictionaryGenerator.java  | 102 +++++--------------
 .../kylin/dict/IDictionaryValueEnumerator.java  |  32 ++++++
 .../dict/IterableDictionaryValueEnumerator.java |  47 +++++++++
 .../dict/MultipleDictionaryValueEnumerator.java |  77 ++++++++++++++
 .../kylin/dict/TableColumnValueEnumerator.java  |  75 ++++++++++++++
 .../apache/kylin/dict/NumberDictionaryTest.java |   8 +-
 .../engine/mr/steps/MergeCuboidMapperTest.java  |  10 +-
 .../apache/kylin/engine/spark/SparkCubing.java  |   6 +-
 .../kylin/invertedindex/index/SliceBuilder.java |   3 +-
 .../invertedindex/util/IIDictionaryBuilder.java |   6 +-
 .../invertedindex/InvertedIndexLocalTest.java   |   8 +-
 14 files changed, 286 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index ddfd399..d4245cd 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -224,7 +224,7 @@ public class BuildIIWithStreamTest {
         }
     }
     
-    private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) {
+    private void build(SliceBuilder sliceBuilder, StreamingBatch batch, HTableInterface htable) throws IOException {
         final Slice slice = sliceBuilder.buildSlice(batch);
         try {
             loadToHBase(htable, slice, new IIKeyValueCodec(slice.getInfo()));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index cd84a22..0cfd020 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -59,10 +59,7 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.dict.*;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ReadableTable;
 import org.slf4j.Logger;
@@ -176,7 +173,7 @@ public class CubingUtils {
                     return input == null ? null : input.getBytes();
                 }
             });
-            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes));
             result.put(tblColRef, dict);
         }
         return result;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
index 0604d32..e3fb30e 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -41,6 +41,7 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -177,7 +178,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
             if (desc.getRowkey().isUseDictionary(col)) {
                 logger.info("Building dictionary for " + col);
                 List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
-                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), valueList);
+                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
                 result.put(col, dict);
             }
         }
@@ -191,7 +192,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
                 TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
                 logger.info("Building dictionary for " + displayCol);
                 List<byte[]> valueList = readValueList(flatTable, nColumns, displayColIdx);
-                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(displayCol.getType(), valueList);
+                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(displayCol.getType(), new IterableDictionaryValueEnumerator(valueList));
 
                 result.put(displayCol, dict);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index db885f9..8027f41 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -46,7 +46,7 @@ public class DictionaryGenerator {
 
     private static final Logger logger = LoggerFactory.getLogger(DictionaryGenerator.class);
 
-    private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd" };
+    private static final String[] DATE_PATTERNS = new String[] { "yyyy-MM-dd", "yyyyMMdd" };
 
     private static int getDictionaryMaxCardinality() {
         try {
@@ -56,7 +56,7 @@ public class DictionaryGenerator {
         }
     }
 
-    public static Dictionary<?> buildDictionaryFromValueList(DataType dataType, Iterable<byte[]> values) {
+    public static Dictionary<?> buildDictionaryFromValueEnumerator(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException {
         Preconditions.checkNotNull(dataType, "dataType cannot be null");
         Dictionary dict;
         int baseId = 0; // always 0 for now
@@ -66,13 +66,13 @@ public class DictionaryGenerator {
         // build dict, case by data type
         if (dataType.isDateTimeFamily()) {
             if (dataType.isDate())
-                dict = buildDateDict(values, baseId, nSamples, samples);
+                dict = buildDateDict(valueEnumerator, baseId, nSamples, samples);
             else
                 dict = new TimeStrDictionary(); // base ID is always 0
         } else if (dataType.isNumberFamily()) {
-            dict = buildNumberDict(values, baseId, nSamples, samples);
+            dict = buildNumberDict(valueEnumerator, baseId, nSamples, samples);
         } else {
-            dict = buildStringDict(values, baseId, nSamples, samples);
+            dict = buildStringDict(valueEnumerator, baseId, nSamples, samples);
         }
 
         // log a few samples
@@ -91,26 +91,8 @@ public class DictionaryGenerator {
         return dict;
     }
 
-    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) {
-
-        HashSet<byte[]> dedup = new HashSet<byte[]>();
-
-        for (DictionaryInfo info : sourceDicts) {
-            Dictionary<?> dict = info.getDictionaryObject();
-            int minkey = dict.getMinId();
-            int maxkey = dict.getMaxId();
-            byte[] buffer = new byte[dict.getSizeOfValue()];
-            for (int i = minkey; i <= maxkey; ++i) {
-                int size = dict.getValueBytesFromId(i, buffer, 0);
-                dedup.add(Bytes.copy(buffer, 0, size));
-            }
-        }
-
-        List<byte[]> valueList = new ArrayList<byte[]>();
-        valueList.addAll(dedup);
-
-        Dictionary<?> dict = buildDictionaryFromValueList(dataType, valueList);
-        return dict;
+    public static Dictionary mergeDictionaries(DataType dataType, List<DictionaryInfo> sourceDicts) throws IOException {
+        return buildDictionaryFromValueEnumerator(dataType, new MultipleDictionaryValueEnumerator(sourceDicts));
     }
 
     public static Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException {
@@ -118,24 +100,29 @@ public class DictionaryGenerator {
         // currently all data types are casted to string to build dictionary
         // String dataType = info.getDataType();
 
-        logger.debug("Building dictionary object " + JsonUtil.writeValueAsString(info));
+        IDictionaryValueEnumerator columnValueEnumerator = null;
+        try {
+            logger.debug("Building dictionary object " + JsonUtil.writeValueAsString(info));
 
-        ArrayList<byte[]> values = loadColumnValues(inpTable, info.getSourceColumnIndex());
-
-        Dictionary<?> dict = buildDictionaryFromValueList(DataType.getInstance(info.getDataType()), values);
-
-        return dict;
+            columnValueEnumerator = new TableColumnValueEnumerator(inpTable.getReader(), info.getSourceColumnIndex());
+            return buildDictionaryFromValueEnumerator(DataType.getInstance(info.getDataType()), columnValueEnumerator);
+        } finally {
+            if (columnValueEnumerator != null)
+                columnValueEnumerator.close();
+        }
     }
 
-    private static Dictionary buildDateDict(Iterable<byte[]> values, int baseId, int nSamples, ArrayList samples) {
+    private static Dictionary buildDateDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
         final int BAD_THRESHOLD = 0;
         String matchPattern = null;
+        byte[] value;
 
         for (String ptn : DATE_PATTERNS) {
             matchPattern = ptn; // be optimistic
             int badCount = 0;
             SimpleDateFormat sdf = new SimpleDateFormat(ptn);
-            for (byte[] value : values) {
+            while (valueEnumerator.moveNext()) {
+                value = valueEnumerator.current();
                 if (value == null || value.length == 0)
                     continue;
 
@@ -161,9 +148,11 @@ public class DictionaryGenerator {
         throw new IllegalStateException("Unrecognized datetime value");
     }
 
-    private static Dictionary buildStringDict(Iterable<byte[]> values, int baseId, int nSamples, ArrayList samples) {
+    private static Dictionary buildStringDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
         TrieDictionaryBuilder builder = new TrieDictionaryBuilder(new StringBytesConverter());
-        for (byte[] value : values) {
+        byte[] value;
+        while (valueEnumerator.moveNext()) {
+            value = valueEnumerator.current();
             if (value == null)
                 continue;
             String v = Bytes.toString(value);
@@ -174,9 +163,11 @@ public class DictionaryGenerator {
         return builder.build(baseId);
     }
 
-    private static Dictionary buildNumberDict(Iterable<byte[]> values, int baseId, int nSamples, ArrayList samples) {
+    private static Dictionary buildNumberDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
         NumberDictionaryBuilder builder = new NumberDictionaryBuilder(new StringBytesConverter());
-        for (byte[] value : values) {
+        byte[] value;
+        while (valueEnumerator.moveNext()) {
+            value = valueEnumerator.current();
             if (value == null)
                 continue;
             String v = Bytes.toString(value);
@@ -189,41 +180,4 @@ public class DictionaryGenerator {
         }
         return builder.build(baseId);
     }
-
-    static ArrayList<byte[]> loadColumnValues(ReadableTable inpTable, int colIndex) throws IOException {
-
-        TableReader reader = inpTable.getReader();
-
-        try {
-            ArrayList<byte[]> result = Lists.newArrayList();
-            HashSet<String> dedup = new HashSet<String>();
-
-            while (reader.next()) {
-                String[] split = reader.getRow();
-
-                String colValue;
-                // special single column file, e.g. common_indicator.txt
-                if (split.length == 1) {
-                    colValue = split[0];
-                }
-                // normal case
-                else {
-                    if (split.length <= colIndex) {
-                        throw new ArrayIndexOutOfBoundsException("Column no. " + colIndex + " not found, line split is " + Arrays.asList(split));
-                    }
-                    colValue = split[colIndex];
-                }
-
-                if (dedup.contains(colValue) == false) {
-                    dedup.add(colValue);
-                    result.add(Bytes.toBytes(colValue));
-                }
-            }
-            return result;
-
-        } finally {
-            reader.close();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryValueEnumerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryValueEnumerator.java
new file mode 100644
index 0000000..ecf980a
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryValueEnumerator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.io.IOException;
+
+/**
+ * Created by dongli on 10/28/15.
+ */
+public interface IDictionaryValueEnumerator {
+    byte[] current() throws IOException;
+
+    boolean moveNext() throws IOException;
+
+    void close() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/core-dictionary/src/main/java/org/apache/kylin/dict/IterableDictionaryValueEnumerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IterableDictionaryValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IterableDictionaryValueEnumerator.java
new file mode 100644
index 0000000..2f15eba
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IterableDictionaryValueEnumerator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * Created by dongli on 10/28/15.
+ */
+public class IterableDictionaryValueEnumerator implements IDictionaryValueEnumerator {
+    Iterator<byte[]> iterator;
+
+    public IterableDictionaryValueEnumerator(Iterable<byte[]> list) {
+        iterator = list.iterator();
+    }
+
+    @Override
+    public byte[] current() throws IOException {
+        return iterator.next();
+    }
+
+    @Override
+    public boolean moveNext() throws IOException {
+        return iterator.hasNext();
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
new file mode 100644
index 0000000..13f7394
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.util.Bytes;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Created by dongli on 10/28/15.
+ */
+@SuppressWarnings("rawtypes")
+public class MultipleDictionaryValueEnumerator implements IDictionaryValueEnumerator {
+    private int curDictIndex = 0;
+    private Dictionary curDict;
+    private int curKey;
+    private byte[] curValue = null;
+    private List<Dictionary> dictionaryList;
+
+    public MultipleDictionaryValueEnumerator(List<DictionaryInfo> dictionaryInfoList) {
+        dictionaryList = Lists.newArrayListWithCapacity(dictionaryInfoList.size());
+        for (DictionaryInfo dictInfo : dictionaryInfoList) {
+            dictionaryList.add(dictInfo.getDictionaryObject());
+        }
+        if (!dictionaryList.isEmpty()) {
+            curDict = dictionaryList.get(0);
+            curKey = curDict.getMinId();
+        }
+    }
+
+    @Override
+    public byte[] current() throws IOException {
+        return curValue;
+    }
+
+    @Override
+    public boolean moveNext() throws IOException {
+        if (curDictIndex < dictionaryList.size() && curKey <= curDict.getMaxId()) {
+            byte[] buffer = new byte[curDict.getSizeOfValue()];
+            int size = curDict.getValueBytesFromId(curKey, buffer, 0);
+            curValue = Bytes.copy(buffer, 0, size);
+
+            if (++curKey > curDict.getMaxId()) {
+                if (++curDictIndex < dictionaryList.size()) {
+                    curDict = dictionaryList.get(curDictIndex);
+                    curKey = curDict.getMinId();
+                }
+            }
+
+            return true;
+        }
+        curValue = null;
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java
new file mode 100644
index 0000000..ab9a6ff
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TableColumnValueEnumerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.source.ReadableTable;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * Created by dongli on 10/29/15.
+ */
+public class TableColumnValueEnumerator implements IDictionaryValueEnumerator {
+
+    private ReadableTable.TableReader reader;
+    private int colIndex;
+    private byte[] colValue;
+
+    public TableColumnValueEnumerator(ReadableTable.TableReader reader, int colIndex) {
+        this.reader = reader;
+        this.colIndex = colIndex;
+    }
+
+    @Override
+    public boolean moveNext() throws IOException {
+        if (reader.next()) {
+            String colStrValue;
+            String[] split = reader.getRow();
+            if (split.length == 1) {
+                colStrValue = split[0];
+            } else {
+                // normal case
+                if (split.length <= colIndex) {
+                    throw new ArrayIndexOutOfBoundsException("Column no. " + colIndex + " not found, line split is " + Arrays.asList(split));
+                }
+                colStrValue = split[colIndex];
+            }
+
+            colValue = Bytes.toBytes(colStrValue);
+            return true;
+
+        } else {
+            colValue = null;
+            return false;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (reader != null)
+            reader.close();
+    }
+
+    @Override
+    public byte[] current() {
+        return colValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
index ddfa01e..e51153c 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
@@ -21,8 +21,8 @@ package org.apache.kylin.dict;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.math.BigDecimal;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -46,15 +46,15 @@ public class NumberDictionaryTest {
     Random rand = new Random();
 
     @Test
-    public void testEmptyInput() {
+    public void testEmptyInput() throws IOException{
         String[] ints = new String[] { "", "0", "5", "100", "13" };
-        Collection<byte[]> intBytes = new ArrayList<byte[]>();
+        Collection<byte[]> intBytes = Lists.newArrayListWithCapacity(ints.length);
         for (String s : ints) {
             intBytes.add((s == null) ? null : Bytes.toBytes(s));
         }
 
         // check "" is treated as NULL, not a code of dictionary
-        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance("integer"), intBytes);
+        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance("integer"), new IterableDictionaryValueEnumerator(intBytes));
         assertEquals(4, dict.getSize());
 
         final int id = ((NumberDictionary<String>) dict).getIdFromValue("");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
index e060e9e..2acc751 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapperTest.java
@@ -34,11 +34,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.dict.*;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -75,7 +71,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
         List<byte[]> values = new ArrayList<byte[]>();
         values.add(new byte[] { 101, 101, 101 });
         values.add(new byte[] { 102, 102, 102 });
-        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values);
+        Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values));
         dictionaryManager.trySaveNewDict(dict, newDictInfo);
         ((TrieDictionary) dict).dump(System.out);
 
@@ -127,7 +123,7 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
                 values.add(new byte[] { 99, 99, 99 });
             else
                 values.add(new byte[] { 98, 98, 98 });
-            Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(DataType.getInstance(newDictInfo.getDataType()), values);
+            Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(DataType.getInstance(newDictInfo.getDataType()), new IterableDictionaryValueEnumerator(values));
             dictionaryManager.trySaveNewDict(dict, newDictInfo);
             ((TrieDictionary) dict).dump(System.out);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index e4fcc16..01d97fd 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -57,7 +57,7 @@ import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.*;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
 import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
 import org.apache.kylin.engine.spark.util.IteratorUtils;
@@ -169,7 +169,7 @@ public class SparkCubing extends AbstractApplication {
             final DataFrame frame = intermediateTable.select(column).distinct();
 
             final Row[] rows = frame.collect();
-            dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), new Iterable<byte[]>() {
+            dictionaryMap.put(tblColRef, DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(new Iterable<byte[]>() {
                 @Override
                 public Iterator<byte[]> iterator() {
                     return new Iterator<byte[]>() {
@@ -197,7 +197,7 @@ public class SparkCubing extends AbstractApplication {
                         }
                     };
                 }
-            }));
+            })));
         }
         final long end = System.currentTimeMillis();
         CubingUtils.writeDictionary(cubeInstance.getSegmentById(segmentId), dictionaryMap, start, end);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
index ef63b0f..4b9d7f5 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -28,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import java.io.IOException;
 import java.util.List;
 
 /**
@@ -44,7 +45,7 @@ public final class SliceBuilder {
         this.sliceMaker = new BatchSliceMaker(desc, shard);
     }
 
-    public Slice buildSlice(StreamingBatch microStreamBatch) {
+    public Slice buildSlice(StreamingBatch microStreamBatch) throws IOException{
         final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
             @Nullable
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
index 7fcbe53..2474750 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
@@ -34,6 +34,7 @@
 
 package org.apache.kylin.invertedindex.util;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 
@@ -41,6 +42,7 @@ import javax.annotation.Nullable;
 
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -55,7 +57,7 @@ public final class IIDictionaryBuilder {
     private IIDictionaryBuilder() {
     }
 
-    public static Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) {
+    public static Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) throws IOException{
         HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
         final List<TblColRef> allColumns = desc.listAllColumns();
         for (List<String> row : table) {
@@ -76,7 +78,7 @@ public final class IIDictionaryBuilder {
                     return input == null ? null : input.getBytes();
                 }
             });
-            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes));
             result[desc.findColumn(tblColRef)] = dict;
         }
         return result;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/64e774e8/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
index 9ebb650..6e05759 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/InvertedIndexLocalTest.java
@@ -37,6 +37,7 @@ import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.CompressedValueContainer;
@@ -149,7 +150,7 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
         dump(recordsCopy);
     }
 
-    private Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) {
+    private Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) throws IOException{
         SetMultimap<TblColRef, String> valueMap = HashMultimap.create();
         Set<TblColRef> dimensionColumns = Sets.newHashSet();
         for (int i = 0; i < desc.listAllColumns().size(); i++) {
@@ -165,13 +166,14 @@ public class InvertedIndexLocalTest extends LocalFileMetadataTestCase {
         }
         Dictionary<?>[] result = new Dictionary<?>[desc.listAllColumns().size()];
         for (TblColRef tblColRef : valueMap.keys()) {
-            result[desc.findColumn(tblColRef)] = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+            result[desc.findColumn(tblColRef)] = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(),
+                    new IterableDictionaryValueEnumerator(Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
                 @Nullable
                 @Override
                 public byte[] apply(String input) {
                     return input.getBytes();
                 }
-            }));
+            })));
         }
         return result;
     }