You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/05 02:54:19 UTC

[05/31] incubator-kylin git commit: KYLIN-1094 use Kyro Serializer for SparkCubing

KYLIN-1094 use Kyro Serializer for SparkCubing


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/730739f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/730739f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/730739f7

Branch: refs/heads/KYLIN-1112
Commit: 730739f70797f296cb9cb0a1cb94c8d7649f769e
Parents: 6ab480c
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Oct 29 11:30:06 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Thu Nov 5 09:53:15 2015 +0800

----------------------------------------------------------------------
 commit_SHA1                                     |  16 --
 .../cube/inmemcubing/InMemCubeBuilder.java      | 159 ++-----------------
 .../InMemCubeBuilderInputConverter.java         | 146 +++++++++++++++++
 .../cube/inmemcubing/InMemCubeBuilderUtils.java |  85 ++++++++++
 .../apache/kylin/engine/spark/SparkCubing.java  |  86 ++++++++--
 5 files changed, 316 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/commit_SHA1
----------------------------------------------------------------------
diff --git a/commit_SHA1 b/commit_SHA1
deleted file mode 100644
index d2f3970..0000000
--- a/commit_SHA1
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-b66c25803a2f976cca067148278dbe7d7b0d79ef

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 7d943a3..84abd47 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -17,14 +17,7 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -32,11 +25,12 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.*;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.Dictionary;
@@ -50,14 +44,12 @@ import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.measure.DoubleMutable;
 import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 
 /**
  * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
@@ -87,7 +79,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private CuboidResult baseResult;
     private Object[] totalSumForSanityCheck;
     private ICuboidCollector resultCollector;
-    private Map<Integer, Dictionary<String>> topNDisplayColDictMap;
 
     public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
         super(cubeDesc, dictionaryMap);
@@ -100,35 +91,16 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         this.measureCount = cubeDesc.getMeasures().size();
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
 
-        Map<String, Integer> measureIndexMap = Maps.newHashMap();
         List<String> metricsAggrFuncsList = Lists.newArrayList();
 
         for (int i = 0; i < measureCount; i++) {
             MeasureDesc measureDesc = measureDescs[i];
             metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
-            measureIndexMap.put(measureDesc.getName(), i);
         }
         this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-
-        initTopNDisplayColDictionaryMap();
     }
 
-    private void initTopNDisplayColDictionaryMap() {
-        topNDisplayColDictMap = Maps.newHashMap();
-        for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++) {
-            MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
-            FunctionDesc func = measureDesc.getFunction();
-            if (func.isTopN()) {
-                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-                int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
-                TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
-                @SuppressWarnings("unchecked")
-                Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(displayCol);
-                assert dictionary != null;
-                topNDisplayColDictMap.put(displayColIdx, dictionary);
-            }
-        }
-    }
+    
 
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
         GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
@@ -142,14 +114,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         return gridTable;
     }
 
-    private Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
-        BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
-        BitSet dimension = new BitSet();
-        dimension.set(0, bitSet.cardinality());
-        BitSet metrics = new BitSet();
-        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
-        return new Pair<ImmutableBitSet, ImmutableBitSet>(new ImmutableBitSet(dimension), new ImmutableBitSet(metrics));
-    }
 
     @Override
     public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
@@ -376,7 +340,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         int mbBefore = getSystemAvailMB();
         int mbAfter = 0;
 
-        Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
+        Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
         GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
         GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
 
@@ -442,27 +406,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
-        Pair<ImmutableBitSet, ImmutableBitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parent.cuboidId);
-        ImmutableBitSet parentDimensions = columnBitSets.getFirst();
-        ImmutableBitSet measureColumns = columnBitSets.getSecond();
-        ImmutableBitSet childDimensions = parentDimensions;
-
-        long mask = Long.highestOneBit(parent.cuboidId);
-        long childCuboidId = cuboidId;
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parent.cuboidId);
-        int index = 0;
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & parent.cuboidId) > 0) {
-                if ((mask & childCuboidId) == 0) {
-                    // this dim will be aggregated
-                    childDimensions = childDimensions.set(index, false);
-                }
-                index++;
-            }
-            mask = mask >> 1;
-        }
-
-        return scanAndAggregateGridTable(parent.table, parent.cuboidId, cuboidId, childDimensions, measureColumns);
+        final Pair<ImmutableBitSet, ImmutableBitSet> allNeededColumns = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(parent.cuboidId, cuboidId, measureCount);
+        return scanAndAggregateGridTable(parent.table, parent.cuboidId, cuboidId, allNeededColumns.getFirst(), allNeededColumns.getSecond());
     }
 
     private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
@@ -557,12 +502,15 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         GTInfo info;
         GTRecord record;
         BlockingQueue<List<String>> input;
-        ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        final InMemCubeBuilderInputConverter inMemCubeBuilderInputConverter;
 
         public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
             this.info = info;
             this.input = input;
             this.record = new GTRecord(info);
+            this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, 
+                    InMemCubeBuilderUtils.createTopNDisplayColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap), 
+                    info);
         }
 
         @Override
@@ -586,7 +534,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                     if (currentObject.size() == 0)
                         throw new IllegalStateException();
 
-                    buildGTRecord(currentObject, record);
+                    inMemCubeBuilderInputConverter.convert(currentObject, record);
                     return record;
                 }
 
@@ -611,86 +559,5 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             return 0;
         }
 
-        private void buildGTRecord(List<String> row, GTRecord record) {
-            Object[] dimensions = buildKey(row);
-            Object[] metricsValues = buildValue(row);
-            Object[] recordValues = new Object[dimensions.length + metricsValues.length];
-            System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
-            System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
-            record.setValues(recordValues);
-        }
-
-        private Object[] buildKey(List<String> row) {
-            int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
-            Object[] key = new Object[keySize];
-
-            for (int i = 0; i < keySize; i++) {
-                key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
-            }
-
-            return key;
-        }
-
-        private Object[] buildValue(List<String> row) {
-
-            Object[] values = new Object[measureCount];
-            MeasureDesc measureDesc = null;
-
-            for (int i = 0; i < measureCount; i++) {
-                measureDesc = measureDescs[i];
-
-                Object value = null;
-                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
-                FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
-                if (flatTableIdx == null) {
-                    value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
-                } else if (function.isCount() || function.isHolisticCountDistinct()) {
-                    // note for holistic count distinct, this value will be ignored
-                    value = ONE;
-                } else if (function.isTopN()) {
-                    // encode the key column with dict, and get the counter column;
-                    int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
-                    Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
-                    int keyColEncoded = displayColDict.getIdFromValue(row.get(keyColIndex));
-                    valueBuf.clear();
-                    valueBuf.putInt(displayColDict.getSizeOfId());
-                    valueBuf.putInt(keyColEncoded);
-                    if (flatTableIdx.length == 1) {
-                        // only displayCol, use 1.0 as counter
-                        valueBuf.putDouble(1.0);
-                    } else {
-                        // get the counter column value
-                        valueBuf.putDouble(Double.valueOf(row.get(flatTableIdx[0])));
-                    }
-
-                    value = measureCodec.getSerializer(i).valueOf(valueBuf.array());
-
-                } else if (flatTableIdx.length == 1) {
-                    value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
-                } else {
-
-                    byte[] result = null;
-                    for (int x = 0; x < flatTableIdx.length; x++) {
-                        byte[] split = toBytes(row.get(flatTableIdx[x]));
-                        if (result == null) {
-                            result = Arrays.copyOf(split, split.length);
-                        } else {
-                            byte[] newResult = new byte[result.length + split.length];
-                            System.arraycopy(result, 0, newResult, 0, result.length);
-                            System.arraycopy(split, 0, newResult, result.length, split.length);
-                            result = newResult;
-                        }
-                    }
-                    value = measureCodec.getSerializer(i).valueOf(result);
-                }
-                values[i] = value;
-            }
-            return values;
-        }
-
-        private byte[] toBytes(String v) {
-            return v == null ? null : Bytes.toBytes(v);
-        }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
new file mode 100644
index 0000000..d9099ce
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.cube.inmemcubing;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class InMemCubeBuilderInputConverter {
+
+    private static final LongMutable ONE = new LongMutable(1l);
+    
+    private final CubeDesc cubeDesc;
+    private final CubeJoinedFlatTableDesc intermediateTableDesc;
+    private final MeasureDesc[] measureDescs;
+    private final MeasureCodec measureCodec;
+    private final int measureCount;
+    private final ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private final Map<Integer, Dictionary<String>> topNDisplayColDictMap;
+    private final GTInfo gtInfo;
+    
+
+    public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNDisplayColDictMap, GTInfo gtInfo) {
+        this.cubeDesc = cubeDesc;
+        this.gtInfo = gtInfo;
+        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        this.measureCount = cubeDesc.getMeasures().size();
+        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
+        this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+        this.topNDisplayColDictMap = Preconditions.checkNotNull(topNDisplayColDictMap, "topNDisplayColDictMap cannot be null");
+    }
+    
+    public final GTRecord convert(List<String> row) {
+        final GTRecord record = new GTRecord(gtInfo);
+        convert(row, record);
+        return record;
+    }
+
+    public final void convert(List<String> row, GTRecord record) {
+        Object[] dimensions = buildKey(row);
+        Object[] metricsValues = buildValue(row);
+        Object[] recordValues = new Object[dimensions.length + metricsValues.length];
+        System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
+        System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
+        record.setValues(recordValues);
+    }
+
+    private Object[] buildKey(List<String> row) {
+        int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
+        Object[] key = new Object[keySize];
+
+        for (int i = 0; i < keySize; i++) {
+            key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
+        }
+
+        return key;
+    }
+
+    private Object[] buildValue(List<String> row) {
+
+        Object[] values = new Object[measureCount];
+        for (int i = 0; i < measureCount; i++) {
+            MeasureDesc measureDesc = measureDescs[i];
+            int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
+            FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
+            if (flatTableIdx == null) {
+                values[i] = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
+            } else if (function.isCount() || function.isHolisticCountDistinct()) {
+                // note for holistic count distinct, this value will be ignored
+                values[i] = ONE;
+            } else if (function.isTopN()) {
+                // encode the key column with dict, and get the counter column;
+                int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
+                Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
+                int keyColEncoded = displayColDict.getIdFromValue(row.get(keyColIndex));
+                valueBuf.clear();
+                valueBuf.putInt(displayColDict.getSizeOfId());
+                valueBuf.putInt(keyColEncoded);
+                if (flatTableIdx.length == 1) {
+                    // only displayCol, use 1.0 as counter
+                    valueBuf.putDouble(1.0);
+                } else {
+                    // get the counter column value
+                    valueBuf.putDouble(Double.valueOf(row.get(flatTableIdx[0])));
+                }
+
+                values[i] = measureCodec.getSerializer(i).valueOf(valueBuf.array());
+
+            } else if (flatTableIdx.length == 1) {
+                values[i] = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
+            } else {
+
+                byte[] result = null;
+                for (int x = 0; x < flatTableIdx.length; x++) {
+                    byte[] split = toBytes(row.get(flatTableIdx[x]));
+                    if (result == null) {
+                        result = Arrays.copyOf(split, split.length);
+                    } else {
+                        byte[] newResult = new byte[result.length + split.length];
+                        System.arraycopy(result, 0, newResult, 0, result.length);
+                        System.arraycopy(split, 0, newResult, result.length, split.length);
+                        result = newResult;
+                    }
+                }
+                values[i] = measureCodec.getSerializer(i).valueOf(result);
+            }
+        }
+        return values;
+    }
+
+    private byte[] toBytes(String v) {
+        return v == null ? null : Bytes.toBytes(v);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
new file mode 100644
index 0000000..3b68d47
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.cube.inmemcubing;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ */
+public final class InMemCubeBuilderUtils {
+    
+    public static final HashMap<Integer, Dictionary<String>> createTopNDisplayColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+        HashMap<Integer, Dictionary<String>> result = Maps.newHashMap();
+        for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++) {
+            MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
+            FunctionDesc func = measureDesc.getFunction();
+            if (func.isTopN()) {
+                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+                int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
+                TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
+                @SuppressWarnings("unchecked")
+                Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(displayCol);
+                result.put(displayColIdx, Preconditions.checkNotNull(dictionary));
+            }
+        }
+        return result;
+    }
+
+    public static final Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(final long cuboidId, final int measureCount) {
+        BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
+        BitSet dimension = new BitSet();
+        dimension.set(0, bitSet.cardinality());
+        BitSet metrics = new BitSet();
+        metrics.set(bitSet.cardinality(), bitSet.cardinality() + measureCount);
+        return Pair.newPair(new ImmutableBitSet(dimension), new ImmutableBitSet(metrics));
+    }
+    
+    public static final Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(final long baseCuboidId, final long childCuboidId, final int measureCount) {
+        final Pair<ImmutableBitSet, ImmutableBitSet> parentDimensionAndMetricColumnBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
+        ImmutableBitSet parentDimensions = parentDimensionAndMetricColumnBitSet.getFirst();
+        ImmutableBitSet measureColumns = parentDimensionAndMetricColumnBitSet.getSecond();
+        ImmutableBitSet childDimensions = parentDimensions;
+        long mask = Long.highestOneBit(baseCuboidId);
+        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
+        int index = 0;
+        for (int i = 0; i < parentCuboidIdActualLength; i++) {
+            if ((mask & baseCuboidId) > 0) {
+                if ((mask & childCuboidId) == 0) {
+                    // this dim will be aggregated
+                    childDimensions = childDimensions.set(index, false);
+                }
+                index++;
+            }
+            mask = mask >> 1;
+        }
+        return Pair.newPair(childDimensions, measureColumns);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/730739f7/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 0e5081e..e4fcc16 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -17,9 +17,8 @@
 */
 package org.apache.kylin.engine.spark;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.base.*;
+import com.google.common.collect.*;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
@@ -28,6 +27,7 @@ import com.google.common.primitives.UnsignedBytes;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -54,6 +54,7 @@ import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.*;
 import org.apache.kylin.cube.util.CubingUtils;
+import org.apache.kylin.dict.*;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.engine.mr.HadoopUtil;
@@ -78,15 +79,18 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.*;
+import org.apache.spark.api.java.function.Function;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
-
+import org.reflections.Reflections;
 import scala.Tuple2;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Executors;
@@ -143,12 +147,12 @@ public class SparkCubing extends AbstractApplication {
         final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         final String[] columns = intermediateTable.columns();
-        long start = System.currentTimeMillis();
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
         final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
         final List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
-        RowKeyDesc rowKey = cubeDesc.getRowkey();
+        final long start = System.currentTimeMillis();
+        final RowKeyDesc rowKey = cubeDesc.getRowkey();
         for (int i = 0; i < baseCuboidColumn.size(); i++) {
             TblColRef col = baseCuboidColumn.get(i);
             if (!rowKey.isUseDictionary(col)) {
@@ -217,12 +221,13 @@ public class SparkCubing extends AbstractApplication {
         }
 
         CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-        
+
         final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
         final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
         final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        
         final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
+        final ByteArray[] row_hashcodes = new ByteArray[nRowKey];
+
         for (Long cuboidId : allCuboidIds) {
             BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
             Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
@@ -238,16 +243,15 @@ public class SparkCubing extends AbstractApplication {
             }
             allCuboidsBitSet.put(cuboidId, cuboidBitSet);
         }
-        final ByteArray[] row_hashcodes = new ByteArray[nRowKey];
         for (int i = 0; i < nRowKey; ++i) {
             row_hashcodes[i] = new ByteArray();
         }
-        
+
         final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue,
                 new Function2<HashMap<Long, HyperLogLogPlusCounter>,
                         List<String>,
                         HashMap<Long, HyperLogLogPlusCounter>>() {
-                    
+
                     final HashFunction hashFunction = Hashing.murmur3_32();
 
                     @Override
@@ -319,7 +323,7 @@ public class SparkCubing extends AbstractApplication {
                 }
             }
         }
-        
+
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
             if (measureDesc.getFunction().isTopN()) {
                 List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
@@ -328,7 +332,7 @@ public class SparkCubing extends AbstractApplication {
             }
         }
 
-        
+
         final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
 
             @Override
@@ -375,7 +379,7 @@ public class SparkCubing extends AbstractApplication {
         writeToHFile2(javaPairRDD, dataTypes, measureSize, aggs, splitKeys, conf, url);
         return url;
     }
-    
+
     private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) {
         javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
             @Override
@@ -505,13 +509,67 @@ public class SparkCubing extends AbstractApplication {
         }
     }
 
+    private Collection<String> getKyroClasses() {
+        Set<Class> kyroClasses = Sets.newHashSet();
+        kyroClasses.addAll(new Reflections("org.apache.kylin").getSubTypesOf(Serializable.class));
+        kyroClasses.addAll(new Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class));
+        kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class));
+        kyroClasses.addAll(new Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class));
+        kyroClasses.add(HashMap.class);
+        kyroClasses.add(org.apache.spark.sql.Row[].class);
+        kyroClasses.add(org.apache.spark.sql.Row.class);
+        kyroClasses.add(org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema.class);
+        kyroClasses.add(org.apache.spark.sql.types.StructType.class);
+        kyroClasses.add(org.apache.spark.sql.types.StructField[].class);
+        kyroClasses.add(org.apache.spark.sql.types.StructField.class);
+        kyroClasses.add(org.apache.spark.sql.types.DateType$.class);
+        kyroClasses.add(org.apache.spark.sql.types.Metadata.class);
+        kyroClasses.add(Object[].class);
+        kyroClasses.add(org.apache.spark.sql.types.StringType$.class);
+        kyroClasses.add(Hashing.murmur3_128().getClass());
+        kyroClasses.add(org.apache.spark.sql.columnar.CachedBatch.class);
+        kyroClasses.add(byte[][].class);
+        kyroClasses.add(org.apache.spark.sql.types.Decimal.class);
+        kyroClasses.add(scala.math.BigDecimal.class);
+        kyroClasses.add(java.math.BigDecimal.class);
+        kyroClasses.add(java.math.MathContext.class);
+        kyroClasses.add(java.math.RoundingMode.class);
+        kyroClasses.add(java.util.ArrayList.class);
+        kyroClasses.add(java.util.LinkedList.class);
+        
+        
+        ArrayList<String> result = Lists.newArrayList();
+        for (Class kyroClass : kyroClasses) {
+            result.add(kyroClass.getName());
+        }
+        result.add("scala.collection.immutable.Map$EmptyMap$");
+        result.add("org.apache.spark.sql.catalyst.expressions.GenericInternalRow");
+        result.add("org.apache.spark.unsafe.types.UTF8String");
+        return result;
+    }
+
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
         SparkConf conf = new SparkConf().setAppName("Simple Application");
+        //memory conf
         conf.set("spark.executor.memory", "6g");
         conf.set("spark.storage.memoryFraction", "0.3");
 
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrationRequired", "true");
+        final Iterable<String> allClasses = Iterables.filter(
+                Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister", "").split(",")), getKyroClasses()),
+                new Predicate<String>() {
+                    @Override
+                    public boolean apply(@Nullable String input) {
+                        return input != null && input.trim().length() > 0;
+                    }
+                });
+        System.out.println("kyro classes:" + allClasses.toString());
+        conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, ","));
+
         JavaSparkContext sc = new JavaSparkContext(conf);
         HiveContext sqlContext = new HiveContext(sc.sc());
         final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable);