You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/04/11 00:48:04 UTC

[27/34] incubator-kylin git commit: fix ci

fix ci


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7eebef63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7eebef63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7eebef63

Branch: refs/heads/streaming-localdict
Commit: 7eebef63e346dffcd2c3c83b502a9b735789a557
Parents: 6406540
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Apr 9 11:13:41 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Apr 9 17:04:29 2015 +0800

----------------------------------------------------------------------
 .../invertedindex/index/TableRecordInfo.java    |   2 +-
 .../invertedindex/util/IIDictionaryBuilder.java |  82 +++++++++
 .../endpoint/EndpointAggregators.java           |  55 ++++---
 .../endpoint/EndpointAggregationTest.java       | 165 ++++++++-----------
 .../streaming/invertedindex/SliceBuilder.java   |  46 +-----
 5 files changed, 183 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 0955a5f..606e00e 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -48,7 +48,7 @@ public class TableRecordInfo {
     }
 
     public TableRecordInfo(IIDesc desc) {
-        this(desc, new Dictionary<?>[desc.listAllColumns().size()]);
+        this(desc, new Dictionary<?>[0]);
     }
 
     public TableRecordInfo(IIDesc desc, Dictionary<?>[] dictionaryMap) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
new file mode 100644
index 0000000..45e8058
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/util/IIDictionaryBuilder.java
@@ -0,0 +1,82 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.invertedindex.util;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Created by qianzhou on 4/9/15.
+ */
+public final class IIDictionaryBuilder {
+
+    private IIDictionaryBuilder(){}
+
+    public static Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) {
+        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
+        final List<TblColRef> allColumns = desc.listAllColumns();
+        for (List<String> row : table) {
+            for (int i = 0; i < row.size(); i++) {
+                String cell = row.get(i);
+                if (!desc.isMetricsCol(i)) {
+                    valueMap.put(allColumns.get(i), cell);
+                }
+            }
+        }
+
+        Dictionary<?>[] result = new Dictionary<?>[allColumns.size()];
+        for (TblColRef tblColRef : valueMap.keySet()) {
+            final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
+                @Nullable
+                @Override
+                public byte[] apply(String input) {
+                    return input == null ? null : input.getBytes();
+                }
+            });
+            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
+            result[desc.findColumn(tblColRef)] = dict;
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 5af5f44..19d8fd1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -21,6 +21,7 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 import java.nio.ByteBuffer;
 import java.util.List;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
@@ -47,7 +48,7 @@ public class EndpointAggregators {
         Count, DimensionAsMetric, DistinctCount, Normal
     }
 
-    private static class MetricInfo {
+    private final static class MetricInfo {
         private MetricType type;
         private int refIndex = -1;
         private int precision = -1;
@@ -69,34 +70,36 @@ public class EndpointAggregators {
 
     }
 
+    private static MetricInfo generateMetricInfo(int index, FunctionDesc functionDesc) {
+        if (functionDesc.isCount()) {
+            return new MetricInfo(MetricType.Count);
+        } else if (functionDesc.isDimensionAsMetric()) {
+            return new MetricInfo(MetricType.DimensionAsMetric);
+        } else {
+            Preconditions.checkState(index >= 0, "Column " + functionDesc.getParameter().getValue() + " is not found in II");
+            if (functionDesc.isCountDistinct()) {
+                return new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
+            } else {
+                return new MetricInfo(MetricType.Normal, index);
+            }
+        }
+    }
+
+
     public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) {
-        String[] funcNames = new String[metrics.size()];
-        String[] dataTypes = new String[metrics.size()];
-        MetricInfo[] metricInfos = new MetricInfo[metrics.size()];
+        final int metricSize = metrics.size();
+        String[] funcNames = new String[metricSize];
+        String[] dataTypes = new String[metricSize];
+        MetricInfo[] metricInfos = new MetricInfo[metricSize];
 
-        for (int i = 0; i < metrics.size(); i++) {
+        for (int i = 0; i < metricSize; i++) {
             FunctionDesc functionDesc = metrics.get(i);
 
             //TODO: what if funcionDesc's type is different from tablDesc? cause scale difference
             funcNames[i] = functionDesc.getExpression();
             dataTypes[i] = functionDesc.getReturnType();
-
-            if (functionDesc.isCount()) {
-                metricInfos[i] = new MetricInfo(MetricType.Count);
-            } else if (functionDesc.isDimensionAsMetric()) {
-                metricInfos[i] = new MetricInfo(MetricType.DimensionAsMetric);
-            } else {
-                int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
-                if (index < 0) {
-                    throw new IllegalStateException("Column " + functionDesc.getParameter().getValue() + " is not found in II");
-                }
-
-                if (functionDesc.isCountDistinct()) {
-                    metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
-                } else {
-                    metricInfos[i] = new MetricInfo(MetricType.Normal, index);
-                }
-            }
+            int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
+            metricInfos[i] = generateMetricInfo(index, functionDesc);
         }
 
         return new EndpointAggregators(funcNames, dataTypes, metricInfos, tableInfo.getDigest());
@@ -115,7 +118,7 @@ public class EndpointAggregators {
 
     final LongWritable ONE = new LongWritable(1);
 
-    public EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
+    private EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
         this.funcNames = funcNames;
         this.dataTypes = dataTypes;
         this.metricInfos = metricInfos;
@@ -161,16 +164,16 @@ public class EndpointAggregators {
         rawTableRecord.setBytes(row, 0, row.length);
 
         for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
-            if (metricInfos[metricIndex].type == MetricType.Count) {
+            final MetricInfo metricInfo = metricInfos[metricIndex];
+            if (metricInfo.type == MetricType.Count) {
                 measureAggrs[metricIndex].aggregate(ONE);
                 continue;
             }
 
-            if (metricInfos[metricIndex].type == MetricType.DimensionAsMetric) {
+            if (metricInfo.type == MetricType.DimensionAsMetric) {
                 continue;
             }
 
-            MetricInfo metricInfo = metricInfos[metricIndex];
             MeasureAggregator aggregator = measureAggrs[metricIndex];
             FixedLenMeasureCodec measureSerializer = measureSerializers[metricIndex];
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
index e90f5b5..7cb0d09 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,24 +18,18 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 
-import static org.junit.Assert.assertEquals;
-
+import com.google.common.collect.Lists;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.ConstantTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.*;
-import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
+import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.storage.hbase.coprocessor.FilterDecorator;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -44,6 +38,9 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.*;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
 /**
  * Created by Hongbin Ma(Binmahone) on 11/27/14.
  *
@@ -51,39 +48,10 @@ import java.util.*;
  */
 @Ignore("need to mock up TableRecordInfo")
 public class EndpointAggregationTest extends LocalFileMetadataTestCase {
-    IIInstance ii;
-    TableRecordInfo tableRecordInfo;
-
-    CoprocessorProjector projector;
-    EndpointAggregators aggregators;
-    CoprocessorFilter filter;
-
-    EndpointAggregationCache aggCache;
-    List<TableRecord> tableData;
-
-    TableDesc factTableDesc;
 
     @Before
     public void setup() throws IOException {
         this.createTestMetadata();
-        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
-        this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
-        factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
-        TblColRef formatName = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME");
-        TblColRef siteId = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_SITE_ID");
-
-        Collection<TblColRef> dims = new HashSet<>();
-        dims.add(formatName);
-        projector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, dims);
-        aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
-
-        CompareTupleFilter rawFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
-        rawFilter.addChild(new ColumnTupleFilter(siteId));
-        rawFilter.addChild(new ConstantTupleFilter("0"));
-        filter = CoprocessorFilter.fromFilter(this.ii.getFirstSegment(), rawFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
-
-        aggCache = new EndpointAggregationCache(aggregators);
-        tableData = mockTable();
     }
 
     @After
@@ -91,54 +59,6 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
         cleanupTestMetadata();
     }
 
-    private List<TableRecord> mockTable() {
-
-        ColumnDesc[] factTableColumns = factTableDesc.getColumns();
-        int[] factTableColumnIndex = new int[factTableColumns.length];
-        for (int i = 0; i < factTableColumnIndex.length; ++i) {
-            factTableColumnIndex[i] = tableRecordInfo.findColumn(new TblColRef(factTableColumns[i]));
-        }
-
-        TableRecord temp1 = tableRecordInfo.createTableRecord();
-        temp1.setValueString(factTableColumnIndex[0], "10000000239");
-        temp1.setValueString(factTableColumnIndex[1], "2012-03-22");
-        temp1.setValueString(factTableColumnIndex[2], "Auction");
-        temp1.setValueString(factTableColumnIndex[3], "80135");
-        temp1.setValueString(factTableColumnIndex[4], "0");
-        temp1.setValueString(factTableColumnIndex[5], "14");
-        temp1.setValueString(factTableColumnIndex[6], "199.99");
-        temp1.setValueString(factTableColumnIndex[7], "1");
-        temp1.setValueString(factTableColumnIndex[8], "10000005");
-
-        TableRecord temp2 = tableRecordInfo.createTableRecord();
-        temp2.setValueString(factTableColumnIndex[0], "10000000244");
-        temp2.setValueString(factTableColumnIndex[1], "2012-11-11");
-        temp2.setValueString(factTableColumnIndex[2], "Auction");
-        temp2.setValueString(factTableColumnIndex[3], "16509");
-        temp2.setValueString(factTableColumnIndex[4], "101");
-        temp2.setValueString(factTableColumnIndex[5], "12");
-        temp2.setValueString(factTableColumnIndex[6], "2.09");
-        temp2.setValueString(factTableColumnIndex[7], "1");
-        temp2.setValueString(factTableColumnIndex[8], "10000004");
-
-        TableRecord temp3 = tableRecordInfo.createTableRecord();
-        temp3.setValueString(factTableColumnIndex[0], "10000000259");
-        temp3.setValueString(factTableColumnIndex[1], "2012-07-12");
-        temp3.setValueString(factTableColumnIndex[2], "Others");
-        temp3.setValueString(factTableColumnIndex[3], "15687");
-        temp3.setValueString(factTableColumnIndex[4], "0");
-        temp3.setValueString(factTableColumnIndex[5], "14");
-        temp3.setValueString(factTableColumnIndex[6], "100");
-        temp3.setValueString(factTableColumnIndex[7], "1");
-        temp3.setValueString(factTableColumnIndex[8], "10000020");
-
-        List<TableRecord> ret = new ArrayList<TableRecord>();
-        ret.add(temp1);
-        ret.add(temp2);
-        ret.add(temp3);
-        return ret;
-    }
-
     private List<FunctionDesc> buildAggregations() {
         List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
 
@@ -164,26 +84,72 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
     }
 
     @Test
-    public void testSerializeAggreagtor() {
-        EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
+    public void testSerializeAggregator() {
+        final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
+        final EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
         byte[] x = EndpointAggregators.serialize(endpointAggregators);
-        EndpointAggregators.deserialize(x);
+        final EndpointAggregators result = EndpointAggregators.deserialize(x);
+        assertArrayEquals(endpointAggregators.dataTypes, result.dataTypes);
+        assertArrayEquals(endpointAggregators.funcNames, result.funcNames);
+        assertArrayEquals(endpointAggregators.metricValues, result.metricValues);
+        assertEquals(endpointAggregators.rawTableRecord.getBytes().length, result.rawTableRecord.getBytes().length);
+    }
+
+    private byte[] randomBytes(final int length) {
+        byte[] result = new byte[length];
+        Random random = new Random();
+        for (int i = 0; i < length; i++) {
+            random.nextBytes(result);
+        }
+        return result;
+    }
+
+    private List<byte[]> mockData(TableRecordInfo tableRecordInfo) {
+        ArrayList<byte[]> result = Lists.newArrayList();
+        final int priceColumnIndex = 23;
+        final int groupByColumnIndex = 0;
+        TblColRef column = tableRecordInfo.getDescriptor().listAllColumns().get(priceColumnIndex);
+        FixedLenMeasureCodec codec = FixedLenMeasureCodec.get(column.getType());
+
+        byte[] data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());
+        byte[] groupOne = randomBytes(tableRecordInfo.getDigest().length(groupByColumnIndex));
+        codec.write(codec.valueOf("199.99"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+        System.arraycopy(groupOne, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupOne.length);
+        result.add(data);
+
+        data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());;
+        codec.write(codec.valueOf("2.09"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+        System.arraycopy(groupOne, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupOne.length);
+        result.add(data);
+
+        byte[] groupTwo = randomBytes(tableRecordInfo.getDigest().length(groupByColumnIndex));
+        data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());
+        System.arraycopy(groupTwo, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupTwo.length);
+        codec.write(codec.valueOf("100"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+        result.add(data);
+
+        return result;
     }
 
     @Test
-    @SuppressWarnings("rawtypes")
     public void basicTest() {
-
-        for (int i = 0; i < tableData.size(); ++i) {
-            byte[] data = tableData.get(i).getBytes();
+        final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
+        final EndpointAggregators aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
+        final EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);
+        final Collection<TblColRef> dims = new HashSet<>();
+        final TblColRef groupByColumn = ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME");
+        dims.add(groupByColumn);
+        CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, dims);
+        List<byte[]> rawData = mockData(tableRecordInfo);
+        for (int i = 0; i < rawData.size(); ++i) {
+            byte[] data = rawData.get(i);
             CoprocessorProjector.AggrKey aggKey = projector.getAggrKey(data);
             MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
             aggregators.aggregate(bufs, data);
             aggCache.checkMemoryUsage();
         }
-
-        assertEquals(aggCache.getAllEntries().size(), 2);
-
         long sumTotal = 0;
         long minTotal = 0;
         for (Map.Entry<CoprocessorProjector.AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
@@ -193,7 +159,6 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
         }
         assertEquals(3020800, sumTotal);
         assertEquals(1020900, minTotal);
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7eebef63/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
index 22bfa42..6246215 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/SliceBuilder.java
@@ -34,28 +34,22 @@
 
 package org.apache.kylin.streaming.invertedindex;
 
-import java.util.Collection;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.invertedindex.index.BatchSliceMaker;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
 import org.apache.kylin.streaming.Stream;
 import org.apache.kylin.streaming.StreamParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+import java.util.List;
 
 /**
  * Created by qianzhou on 3/27/15.
@@ -81,39 +75,11 @@ public final class SliceBuilder {
                 return streamParser.parse(input);
             }
         });
-        final Dictionary<?>[] dictionaryMap = buildDictionary(table, iiDesc);
+        final Dictionary<?>[] dictionaryMap = IIDictionaryBuilder.buildDictionary(table, iiDesc);
         TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaryMap);
         return build(table,  tableRecordInfo, dictionaryMap);
     }
 
-    private Dictionary<?>[] buildDictionary(List<List<String>> table, IIDesc desc) {
-        HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
-        final List<TblColRef> allColumns = desc.listAllColumns();
-        for (List<String> row : table) {
-            for (int i = 0; i < row.size(); i++) {
-                String cell = row.get(i);
-                if (!desc.isMetricsCol(i)) {
-                    valueMap.put(allColumns.get(i), cell);
-                }
-            }
-        }
-
-        Dictionary<?>[] result = new Dictionary<?>[allColumns.size()];
-        for (TblColRef tblColRef : valueMap.keySet()) {
-            final Collection<byte[]> bytes = Collections2.transform(valueMap.get(tblColRef), new Function<String, byte[]>() {
-                @Nullable
-                @Override
-                public byte[] apply(String input) {
-                    return input == null ? null : input.getBytes();
-                }
-            });
-            logger.info("build dictionary for column " + tblColRef);
-            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueList(tblColRef.getType(), bytes);
-            result[desc.findColumn(tblColRef)] = dict;
-        }
-        return result;
-    }
-
     private Slice build(List<List<String>> table,  final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
         final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
             @Nullable