You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/25 00:00:35 UTC

[kylin] branch master updated (a334ee4 -> 80c60ee)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from a334ee4  KYLIN-3442 Fact distinct columns in Spark
     new 637f45d  KYLIN-3491 add a shrunken global dictionary step to improve the encoding process
     new 80c60ee  KYLIN-3491 enable shrunken gd in ci test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../java/org/apache/kylin/cube/model/CubeDesc.java |   4 +
 .../org/apache/kylin/dict/ShrunkenDictionary.java  | 159 +++++++++++++++++++++
 ...Builder.java => ShrunkenDictionaryBuilder.java} |  39 +++--
 .../apache/kylin/dict/ShrunkenDictionaryTest.java  |  77 ++++++++++
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  10 ++
 .../java/org/apache/kylin/engine/mr/IMRInput.java  |   4 +
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  22 +++
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |   3 +
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   6 +-
 .../kylin/engine/mr/common/BatchConstants.java     |   1 +
 .../engine/mr/common/DictionaryGetterUtil.java     |  76 ++++++++++
 .../engine/mr/steps/BaseCuboidMapperBase.java      |  10 +-
 .../apache/kylin/engine/mr/steps/CuboidJob.java    |   5 +
 ...ob.java => ExtractDictionaryFromGlobalJob.java} |  72 +++++-----
 .../steps/ExtractDictionaryFromGlobalMapper.java   | 150 +++++++++++++++++++
 .../kylin/engine/mr/steps/InMemCuboidJob.java      |   5 +
 .../engine/mr/steps/InMemCuboidMapperBase.java     |  15 +-
 .../template/cube/kylin_sales_cube.json            |   8 +-
 .../template/cube/kylin_streaming_cube.json        |   2 +-
 .../localmeta/cube_desc/ci_left_join_cube.json     |   3 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |   9 ++
 .../apache/kylin/source/kafka/KafkaMRInput.java    |   7 +
 .../kylin/storage/hbase/steps/HBaseJobSteps.java   |   1 +
 25 files changed, 620 insertions(+), 73 deletions(-)
 create mode 100644 core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java
 copy core-dictionary/src/main/java/org/apache/kylin/dict/{IDictionaryBuilder.java => ShrunkenDictionaryBuilder.java} (52%)
 create mode 100644 core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java
 create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java
 copy engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/{UpdateOldCuboidShardJob.java => ExtractDictionaryFromGlobalJob.java} (65%)
 create mode 100644 engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java


[kylin] 02/02: KYLIN-3491 enable shrunken gd in ci test

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 80c60ee3142a1de56fc70b1d1a2f1cf57442fd21
Author: shaofengshi <sh...@apache.org>
AuthorDate: Fri Aug 24 08:38:03 2018 +0800

    KYLIN-3491 enable shrunken gd in ci test
---
 .../steps/ExtractDictionaryFromGlobalMapper.java   | 35 ++++++++++++++--------
 .../template/cube/kylin_sales_cube.json            |  8 ++---
 .../template/cube/kylin_streaming_cube.json        |  2 +-
 .../localmeta/cube_desc/ci_left_join_cube.json     |  3 +-
 4 files changed, 29 insertions(+), 19 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java
index 34a5ec7..be2549e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java
@@ -18,13 +18,8 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,8 +43,13 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 public class ExtractDictionaryFromGlobalMapper<KEYIN, Object> extends KylinMapper<KEYIN, Object, Text, Text> {
     private String cubeName;
@@ -66,12 +66,13 @@ public class ExtractDictionaryFromGlobalMapper<KEYIN, Object> extends KylinMappe
     private List<Dictionary<String>> globalDicts;
 
     private String splitKey;
+    private KylinConfig config;
 
     @Override
     protected void doSetup(Context context) throws IOException {
         Configuration conf = context.getConfiguration();
         bindCurrentConfiguration(conf);
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 
         cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
         cube = CubeManager.getInstance(config).getCube(cubeName);
@@ -84,14 +85,12 @@ public class ExtractDictionaryFromGlobalMapper<KEYIN, Object> extends KylinMappe
         globalColumns = cubeDesc.getAllGlobalDictColumns();
         globalColumnIndex = new int[globalColumns.size()];
         globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size());
-        globalDicts = Lists.newArrayListWithExpectedSize(globalColumns.size());
+
         for (int i = 0; i < globalColumns.size(); i++) {
             TblColRef colRef = globalColumns.get(i);
             int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
             globalColumnIndex[i] = columnIndexOnFlatTbl;
-
             globalColumnValues.add(Sets.<String> newHashSet());
-            globalDicts.add(cubeSeg.getDictionary(colRef));
         }
 
         splitKey = DictionaryGetterUtil.getInputSplitSignature(cubeSeg, context.getInputSplit());
@@ -117,9 +116,19 @@ public class ExtractDictionaryFromGlobalMapper<KEYIN, Object> extends KylinMappe
         FileSystem fs = FileSystem.get(context.getConfiguration());
         Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR));
 
+        globalDicts = Lists.newArrayListWithExpectedSize(globalColumns.size());
+        Map<TblColRef, Dictionary<String>> dictionaryMap = cubeSeg.buildDictionaryMap();
+        for (int i = 0; i < globalColumns.size(); i++) {
+            TblColRef colRef = globalColumns.get(i);
+            globalDicts.add(dictionaryMap.get(colRef));
+        }
+
         ShrunkenDictionary.StringValueSerializer strValueSerializer = new ShrunkenDictionary.StringValueSerializer();
         for (int i = 0; i < globalColumns.size(); i++) {
             List<String> colDistinctValues = Lists.newArrayList(globalColumnValues.get(i));
+            if (colDistinctValues.size() == 0) {
+                continue;
+            }
             // sort values to accelerate the encoding process by reducing the swapping of global dictionary slices
             Collections.sort(colDistinctValues);
 
diff --git a/examples/sample_cube/template/cube/kylin_sales_cube.json b/examples/sample_cube/template/cube/kylin_sales_cube.json
index be07ffb..361a5f9 100644
--- a/examples/sample_cube/template/cube/kylin_sales_cube.json
+++ b/examples/sample_cube/template/cube/kylin_sales_cube.json
@@ -3,10 +3,10 @@
  
   "last_modified" : 0,
   "name" : "kylin_sales_cube",
-  "owner" : null,
-  "descriptor" : "kylin_sales_cube",
-  "display_name" : "kylin_sales_cube",
+  "owner" : "ADMIN",
+  "descriptor" : "kylin_sales_cube",
+  "display_name" : "kylin_sales_cube",
   "status" : "DISABLED",
   "segments" : [ ],
-  "create_time_utc" : 0
+  "create_time_utc" : 0
 }
\ No newline at end of file
diff --git a/examples/sample_cube/template/cube/kylin_streaming_cube.json b/examples/sample_cube/template/cube/kylin_streaming_cube.json
index e5286c3..ea7460d 100644
--- a/examples/sample_cube/template/cube/kylin_streaming_cube.json
+++ b/examples/sample_cube/template/cube/kylin_streaming_cube.json
@@ -2,7 +2,7 @@
   "uuid": "40a27d9d-c5f3-45c4-9b8b-513552219193",
  
   "name": "kylin_streaming_cube",
-  "owner": null,
+  "owner": "ADMIN",
   "status": "DISABLED",
   "segments": [],
   "last_modified": 0,
diff --git a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
index e42c522..363c680 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_left_join_cube.json
@@ -580,7 +580,8 @@
   "engine_type": 2,
   "storage_type": 2,
   "override_kylin_properties": {
-    "kylin.cube.algorithm": "INMEM"
+    "kylin.cube.algorithm": "INMEM",
+    "kylin.dictionary.shrunken-from-global-enabled": "true"
   },
   "snapshot_table_desc_list": [
     {


[kylin] 01/02: KYLIN-3491 add a shrunken global dictionary step to improve the encoding process

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 637f45d8444c7b52713780c1701d33d6656fffc0
Author: Zhong <nj...@apache.org>
AuthorDate: Wed Aug 15 17:40:54 2018 +0800

    KYLIN-3491 add a shrunken global dictionary step to improve the encoding process
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../java/org/apache/kylin/cube/model/CubeDesc.java |   4 +
 .../org/apache/kylin/dict/ShrunkenDictionary.java  | 159 +++++++++++++++++++++
 .../kylin/dict/ShrunkenDictionaryBuilder.java      |  49 +++++++
 .../apache/kylin/dict/ShrunkenDictionaryTest.java  |  77 ++++++++++
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  10 ++
 .../java/org/apache/kylin/engine/mr/IMRInput.java  |   4 +
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  22 +++
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |   3 +
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   6 +-
 .../kylin/engine/mr/common/BatchConstants.java     |   1 +
 .../engine/mr/common/DictionaryGetterUtil.java     |  76 ++++++++++
 .../engine/mr/steps/BaseCuboidMapperBase.java      |  10 +-
 .../apache/kylin/engine/mr/steps/CuboidJob.java    |   5 +
 ...ob.java => ExtractDictionaryFromGlobalJob.java} | 108 ++++++--------
 .../steps/ExtractDictionaryFromGlobalMapper.java   | 141 ++++++++++++++++++
 .../kylin/engine/mr/steps/InMemCuboidJob.java      |   5 +
 .../engine/mr/steps/InMemCuboidMapperBase.java     |  15 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |   9 ++
 .../apache/kylin/source/kafka/KafkaMRInput.java    |   7 +
 .../kylin/storage/hbase/steps/HBaseJobSteps.java   |   1 +
 22 files changed, 633 insertions(+), 84 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 925488f..58d9caa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -430,6 +430,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", "200"));
     }
 
+    public boolean isShrunkenDictFromGlobalEnabled() {
+        return Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled", "false"));
+    }
+
     // ============================================================================
     // CUBE
     // ============================================================================
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 95c8b40..15d67ea 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -1480,6 +1480,10 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware {
         }
         return globalDictCols;
     }
+
+    public boolean isShrunkenDictFromGlobalEnabled() {
+        return config.isShrunkenDictFromGlobalEnabled() && !getAllGlobalDictColumns().isEmpty();
+    }
     
     // UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
     public List<TblColRef> getAllUHCColumns() {
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java
new file mode 100644
index 0000000..35c995e
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+public class ShrunkenDictionary<T> extends Dictionary<T> {
+
+    private ImmutableMap<T, Integer> valueToIdMap;
+    private ImmutableMap<Integer, T> idToValueMap;
+
+    private int minId;
+    private int maxId;
+    private int sizeOfId;
+    private int sizeOfValue;
+
+    private ValueSerializer<T> valueSerializer;
+
+    public ShrunkenDictionary(ValueSerializer<T> valueSerializer) { // default constructor for Writable interface
+        this.valueSerializer = valueSerializer;
+    }
+
+    public ShrunkenDictionary(ValueSerializer<T> valueSerializer, int minId, int maxId, int sizeOfId, int sizeOfValue,
+            Map<T, Integer> valueToIdMap) {
+        this.valueSerializer = valueSerializer;
+
+        this.minId = minId;
+        this.maxId = maxId;
+        this.sizeOfId = sizeOfId;
+        this.sizeOfValue = sizeOfValue;
+
+        Preconditions.checkNotNull(valueToIdMap);
+        this.valueToIdMap = ImmutableMap.<T, Integer> builder().putAll(valueToIdMap).build();
+    }
+
+    @Override
+    public int getMinId() {
+        return minId;
+    }
+
+    @Override
+    public int getMaxId() {
+        return maxId;
+    }
+
+    @Override
+    public int getSizeOfId() {
+        return sizeOfId;
+    }
+
+    @Override
+    public int getSizeOfValue() {
+        return sizeOfValue;
+    }
+
+    @Override
+    public boolean contains(Dictionary<?> another) {
+        return false;
+    }
+
+    protected int getIdFromValueImpl(T value, int roundingFlag) {
+        Integer id = valueToIdMap.get(value);
+        if (id == null) {
+            return -1;
+        }
+        return id;
+    }
+
+    protected T getValueFromIdImpl(int id) {
+        if (idToValueMap == null) {
+            idToValueMap = buildIdToValueMap();
+        }
+        return idToValueMap.get(id);
+    }
+
+    private ImmutableMap<Integer, T> buildIdToValueMap() {
+        ImmutableMap.Builder<Integer, T> idToValueMapBuilder = ImmutableMap.builder();
+        for (T value : valueToIdMap.keySet()) {
+            idToValueMapBuilder.put(valueToIdMap.get(value), value);
+        }
+        return idToValueMapBuilder.build();
+    }
+
+    public void dump(PrintStream out) {
+        out.println(String.format("Total %d values for ShrunkenDictionary", valueToIdMap.size()));
+    }
+
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(minId);
+        out.writeInt(maxId);
+        out.writeInt(sizeOfId);
+        out.writeInt(sizeOfValue);
+
+        out.writeInt(valueToIdMap.size());
+        for (T value : valueToIdMap.keySet()) {
+            valueSerializer.writeValue(out, value);
+            out.writeInt(valueToIdMap.get(value));
+        }
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        this.minId = in.readInt();
+        this.maxId = in.readInt();
+        this.sizeOfId = in.readInt();
+        this.sizeOfValue = in.readInt();
+
+        int sizeValueMap = in.readInt();
+        ImmutableMap.Builder<T, Integer> valueToIdMapBuilder = ImmutableMap.builder();
+        for (int i = 0; i < sizeValueMap; i++) {
+            T value = valueSerializer.readValue(in);
+            int id = in.readInt();
+            valueToIdMapBuilder.put(value, id);
+        }
+        this.valueToIdMap = valueToIdMapBuilder.build();
+    }
+
+    public interface ValueSerializer<T> {
+        void writeValue(DataOutput out, T value) throws IOException;
+
+        T readValue(DataInput in) throws IOException;
+    }
+
+    public static class StringValueSerializer implements ValueSerializer<String> {
+        @Override
+        public void writeValue(DataOutput out, String value) throws IOException {
+            out.writeUTF(value);
+        }
+
+        @Override
+        public String readValue(DataInput in) throws IOException {
+            return in.readUTF();
+        }
+    }
+}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java
new file mode 100644
index 0000000..ab3df5e
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.dict.ShrunkenDictionary.ValueSerializer;
+
+import com.google.common.collect.Maps;
+
+public class ShrunkenDictionaryBuilder<T> {
+
+    private Map<T, Integer> valueToIdMap;
+
+    private Dictionary<T> fullDict;
+
+    public ShrunkenDictionaryBuilder(Dictionary<T> fullDict) {
+        this.fullDict = fullDict;
+
+        this.valueToIdMap = Maps.newHashMap();
+    }
+
+    public void addValue(T value) {
+        int id = fullDict.getIdFromValue(value);
+        valueToIdMap.put(value, id);
+    }
+
+    public ShrunkenDictionary<T> build(ValueSerializer<T> valueSerializer) {
+        return new ShrunkenDictionary<>(valueSerializer, fullDict.getMinId(), fullDict.getMaxId(),
+                fullDict.getSizeOfId(), fullDict.getSizeOfValue(), valueToIdMap);
+    }
+}
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java
new file mode 100644
index 0000000..7a86e5f
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ShrunkenDictionaryTest {
+
+    @Test
+    public void testStringDictionary() {
+        ArrayList<String> strList = new ArrayList<String>();
+        strList.add("");
+        strList.add("part");
+        strList.add("par");
+        strList.add("partition");
+        strList.add("party");
+        strList.add("parties");
+        strList.add("paint");
+
+        TrieDictionaryBuilder<String> dictBuilder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+        for (String str : strList) {
+            dictBuilder.addValue(str);
+        }
+        Dictionary<String> dict = dictBuilder.build(0);
+
+        ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer();
+        ShrunkenDictionaryBuilder<String> shrunkenDictBuilder = new ShrunkenDictionaryBuilder<>(dict);
+        for (int i = 0; i < strList.size(); i += 2) {
+            shrunkenDictBuilder.addValue(strList.get(i));
+        }
+        Dictionary<String> shrunkenDict = shrunkenDictBuilder.build(valueSerializer);
+
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+
+            shrunkenDict.write(dos);
+
+            ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
+            DataInputStream dis = new DataInputStream(bis);
+
+            Dictionary<String> dShrunkenDict = new ShrunkenDictionary<>(valueSerializer);
+            dShrunkenDict.readFields(dis);
+
+            for (int i = 0; i < strList.size(); i += 2) {
+                String value = strList.get(i);
+                Assert.assertEquals(dict.getIdFromValue(value), dShrunkenDict.getIdFromValue(value));
+            }
+        } catch (IOException e) {
+        }
+    }
+}
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index c805f8a..560293c 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -34,6 +34,7 @@ public final class ExecutableConstants {
     public static final String SOURCE_RECORDS_COUNT = "source_records_count";
     public static final String SOURCE_RECORDS_SIZE = "source_records_size";
 
+    public static final String STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL = "Extract Dictionary from Global Dictionary";
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
     public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 5498365..1695a22 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -73,6 +73,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
 
         outputSide.addStepPhase2_BuildDictionary(result);
 
+        if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+            result.addTask(createExtractDictionaryFromGlobalJob(jobId));
+        }
+
         // Phase 3: Build Cube
         addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
         addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, only selected algorithm will execute
@@ -124,6 +128,9 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, cuboidRootPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Cube_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+            appendExecCmdParameters(cmd, BatchConstants.ARG_SHRUNKEN_DICT_PATH, getShrunkenDictionaryPath(jobId));
+        }
 
         cubeStep.setMapReduceParams(cmd.toString());
         cubeStep.setMapReduceJobClass(getInMemCuboidJob());
@@ -150,6 +157,9 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+            appendExecCmdParameters(cmd, BatchConstants.ARG_SHRUNKEN_DICT_PATH, getShrunkenDictionaryPath(jobId));
+        }
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index aca9853..f650321 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr;
 
 import java.util.Collection;
 
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -50,6 +51,9 @@ public interface IMRInput {
 
         /** Parse a mapper input object into column values. */
         public Collection<String[]> parseMapperInput(Object mapperInput);
+
+        /** Get the signature for the input split*/
+        public String getInputSplitSignature(InputSplit inputSplit);
     }
 
     /**
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index c6abf16..02e9fe5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -36,6 +36,7 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.ExtractDictionaryFromGlobalJob;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
 import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
@@ -175,6 +176,23 @@ public class JobBuilderSupport {
         return buildDictionaryStep;
     }
 
+    public MapReduceExecutable createExtractDictionaryFromGlobalJob(String jobId) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        result.setName(ExecutableConstants.STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL);
+        result.setMapReduceJobClass(ExtractDictionaryFromGlobalJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Extract_Dictionary_from_Global_" + seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getShrunkenDictionaryPath(jobId));
+
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
     public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext lookupMaterializeContext) {
         final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
@@ -291,6 +309,10 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS;
     }
 
+    public String getShrunkenDictionaryPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/dictionary_shrunken";
+    }
+
     public String getDictRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/dict";
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 2976080..8873f30 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -112,6 +112,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
             .hasArg().isRequired(true).withDescription("Partition file path.").create(BatchConstants.ARG_PARTITION);
     protected static final Option OPTION_HTABLE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME)
             .hasArg().isRequired(true).withDescription("HTable name").create(BatchConstants.ARG_HTABLE_NAME);
+    protected static final Option OPTION_DICTIONARY_SHRUNKEN_PATH = OptionBuilder
+            .withArgName(BatchConstants.ARG_SHRUNKEN_DICT_PATH).hasArg().isRequired(false)
+            .withDescription("Dictionary shrunken path").create(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
 
     protected static final Option OPTION_STATISTICS_OUTPUT = OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT)
             .hasArg().isRequired(false).withDescription("Statistics output").create(BatchConstants.ARG_STATS_OUTPUT);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 5dd55b2..13bc688 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -74,18 +74,18 @@ public class BaseCuboidBuilder implements java.io.Serializable {
         measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
     }
 
-    public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) {
+    public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment,
+            CubeJoinedFlatTableEnrich intermediateTableDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         this.kylinConfig = kylinConfig;
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
         this.intermediateTableDesc = intermediateTableDesc;
+        this.dictionaryMap = dictionaryMap;
 
         init();
         rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
         measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
         aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-        dictionaryMap = cubeSegment.buildDictionaryMap();
-
     }
 
     private void init() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 6fe55e2..8c2ba7f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -105,6 +105,7 @@ public interface BatchConstants {
     String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
     String ARG_META_URL = "metadataUrl";
     String ARG_HBASE_CONF_PATH = "hbaseConfPath";
+    String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
     /**
      * logger and counter
      */
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java
new file mode 100644
index 0000000..0895244
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.ShrunkenDictionary;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DictionaryGetterUtil {
+
+    private static final Logger logger = LoggerFactory.getLogger(DictionaryGetterUtil.class);
+
+    public static String getInputSplitSignature(CubeSegment cubeSegment, InputSplit inputSplit) {
+        return MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat().getInputSplitSignature(inputSplit);
+    }
+
+    public static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeSegment cubeSegment, InputSplit inputSplit,
+                                                                      Configuration configuration) throws IOException {
+        Map<TblColRef, Dictionary<String>> dictionaryMap = cubeSegment.buildDictionaryMap();
+
+        String shrunkenDictPath = configuration.get(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
+        if (shrunkenDictPath == null) {
+            return dictionaryMap;
+        }
+
+        // replace global dictionary with shrunken dictionary if possible
+        String inputSplitSignature = getInputSplitSignature(cubeSegment, inputSplit);
+        FileSystem fs = FileSystem.get(configuration);
+        ShrunkenDictionary.StringValueSerializer valueSerializer = new ShrunkenDictionary.StringValueSerializer();
+        for (TblColRef colRef : cubeSegment.getCubeDesc().getAllGlobalDictColumns()) {
+            Path colShrunkenDictDir = new Path(shrunkenDictPath, colRef.getIdentity());
+            Path colShrunkenDictPath = new Path(colShrunkenDictDir, inputSplitSignature);
+            if (!fs.exists(colShrunkenDictPath)) {
+                logger.warn("Shrunken dictionary for column " + colRef.getIdentity() + " in split "
+                        + inputSplitSignature + " does not exist!!!");
+                continue;
+            }
+            try (DataInputStream dis = fs.open(colShrunkenDictPath)) {
+                Dictionary<String> shrunkenDict = new ShrunkenDictionary(valueSerializer);
+                shrunkenDict.readFields(dis);
+
+                dictionaryMap.put(colRef, shrunkenDict);
+            }
+        }
+
+        return dictionaryMap;
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 091f9a2..b5dc961 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -21,10 +21,12 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Map;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -35,6 +37,8 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,8 +70,12 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
         cubeDesc = cube.getDescriptor();
         cubeSegment = cube.getSegmentById(segmentID);
         CubeJoinedFlatTableEnrich intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
-        baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc);
 
+        Map<TblColRef, Dictionary<String>> dictionaryMap = DictionaryGetterUtil.getDictionaryMap(cubeSegment,
+                context.getInputSplit(), context.getConfiguration());
+
+        baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc,
+                dictionaryMap);
     }
 
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index b49b639..d7da2c2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -93,6 +93,7 @@ public class CuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_NCUBOID_LEVEL);
             options.addOption(OPTION_CUBING_JOB_ID);
             options.addOption(OPTION_CUBOID_MODE);
+            options.addOption(OPTION_DICTIONARY_SHRUNKEN_PATH);
             parseOptions(options, args);
 
             String output = getOptionValue(OPTION_OUTPUT_PATH);
@@ -118,6 +119,10 @@ public class CuboidJob extends AbstractHadoopJob {
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
+            String shrunkenDictPath = getOptionValue(OPTION_DICTIONARY_SHRUNKEN_PATH);
+            if (shrunkenDictPath != null) {
+                job.getConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkenDictPath);
+            }
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java
similarity index 54%
copy from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java
index b0ea7b7..df61ca9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java
@@ -6,59 +6,44 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr.steps;
 
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * Directly using global dictionary to encode values will bring lots of memory swapping of the slices, which will make
+ * the encoding process very slow. This job will change the encoding process for the raw column values to
+ * 1. For each data block, a mapper will generating distinct values, sort them, extract shrunken dictionary from global
+ * 2. For each data block, scan again to encode the raw values by the shrunken dictionary rather than the global one
  */
-public class InMemCuboidJob extends AbstractHadoopJob {
-
-    protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
-
-    private boolean skipped = false;
-
-    @Override
-    public boolean isSkipped() {
-        return skipped;
-    }
-
-    private boolean checkSkip(String cubingJobId) {
-        if (cubingJobId == null)
-            return false;
-
-        ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-        CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId);
-        skipped = cubingJob.isInMemCubing() == false;
-        return skipped;
-    }
+public class ExtractDictionaryFromGlobalJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(ExtractDictionaryFromGlobalJob.class);
 
     @Override
     public int run(String[] args) throws Exception {
@@ -66,68 +51,57 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
         try {
             options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_CUBE_NAME);
             options.addOption(OPTION_SEGMENT_ID);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_CUBING_JOB_ID);
             parseOptions(options, args);
 
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
             String segmentID = getOptionValue(OPTION_SEGMENT_ID);
-            String output = getOptionValue(OPTION_OUTPUT_PATH);
 
+            // ----------------------------------------------------------------------------
+            // add metadata to distributed cache
             CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
             CubeSegment segment = cube.getSegmentById(segmentID);
-            String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
 
-            if (checkSkip(cubingJobId)) {
-                logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segment);
-                return 0;
-            }
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
 
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
             logger.info("Starting: " + job.getJobName());
 
+            job.getConfiguration().set("mapreduce.map.speculative", "false");
             setJobClasspath(job, cube.getConfig());
 
-            // add metadata to distributed cache
-            attachSegmentMetadataWithAll(segment, job.getConfiguration());
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+            // Mapper
+            job.setMapperClass(ExtractDictionaryFromGlobalMapper.class);
 
-            // set mapper
-            job.setMapperClass(InMemCuboidMapper.class);
-            job.setMapOutputKeyClass(ByteArrayWritable.class);
-            job.setMapOutputValueClass(ByteArrayWritable.class);
+            // Reducer
+            job.setNumReduceTasks(0);
 
-            // set reducer
-            // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
-            job.setReducerClass(InMemCuboidReducer.class);
+            // Input
+            IMRInput.IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment)
+                    .getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
+            // Output
+            //// prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            FileOutputFormat.setOutputPath(job, output);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
+            deletePath(job.getConfiguration(), output);
 
-            // set input
-            IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
-            flatTableInputFormat.configureJob(job);
-
-            // set output
-            IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat();
-            outputFormat.configureJobOutput(job, output, segment, segment.getCuboidScheduler(), 0);
-
+            attachSegmentMetadataWithDict(segment, job.getConfiguration());
             return waitForCompletion(job);
         } finally {
             if (job != null)
                 cleanupTempConfFile(job.getConfiguration());
         }
     }
-
-    public static void main(String[] args) throws Exception {
-        InMemCuboidJob job = new InMemCuboidJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java
new file mode 100644
index 0000000..34a5ec7
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.ShrunkenDictionary;
+import org.apache.kylin.dict.ShrunkenDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ExtractDictionaryFromGlobalMapper<KEYIN, Object> extends KylinMapper<KEYIN, Object, Text, Text> {
+    private String cubeName;
+    private CubeDesc cubeDesc;
+    private CubeInstance cube;
+    private CubeSegment cubeSeg;
+
+    private IMRInput.IMRTableInputFormat flatTableInputFormat;
+    private CubeJoinedFlatTableEnrich intermediateTableDesc;
+
+    private List<TblColRef> globalColumns;
+    private int[] globalColumnIndex;
+    private List<Set<String>> globalColumnValues;
+    private List<Dictionary<String>> globalDicts;
+
+    private String splitKey;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        bindCurrentConfiguration(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+
+        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), cubeDesc);
+
+        globalColumns = cubeDesc.getAllGlobalDictColumns();
+        globalColumnIndex = new int[globalColumns.size()];
+        globalColumnValues = Lists.newArrayListWithExpectedSize(globalColumns.size());
+        globalDicts = Lists.newArrayListWithExpectedSize(globalColumns.size());
+        for (int i = 0; i < globalColumns.size(); i++) {
+            TblColRef colRef = globalColumns.get(i);
+            int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+            globalColumnIndex[i] = columnIndexOnFlatTbl;
+
+            globalColumnValues.add(Sets.<String> newHashSet());
+            globalDicts.add(cubeSeg.getDictionary(colRef));
+        }
+
+        splitKey = DictionaryGetterUtil.getInputSplitSignature(cubeSeg, context.getInputSplit());
+    }
+
+    @Override
+    public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+        Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
+
+        for (String[] row : rowCollection) {
+            for (int i = 0; i < globalColumnIndex.length; i++) {
+                String fieldValue = row[globalColumnIndex[i]];
+                if (fieldValue == null)
+                    continue;
+
+                globalColumnValues.get(i).add(fieldValue);
+            }
+        }
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, InterruptedException {
+        FileSystem fs = FileSystem.get(context.getConfiguration());
+        Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR));
+
+        ShrunkenDictionary.StringValueSerializer strValueSerializer = new ShrunkenDictionary.StringValueSerializer();
+        for (int i = 0; i < globalColumns.size(); i++) {
+            List<String> colDistinctValues = Lists.newArrayList(globalColumnValues.get(i));
+            // sort values to accelerate the encoding process by reducing the swapping of global dictionary slices
+            Collections.sort(colDistinctValues);
+
+            ShrunkenDictionaryBuilder<String> dictBuilder = new ShrunkenDictionaryBuilder<>(globalDicts.get(i));
+            for (String colValue : colDistinctValues) {
+                dictBuilder.addValue(colValue);
+            }
+            Dictionary<String> shrunkenDict = dictBuilder.build(strValueSerializer);
+
+            Path colDictDir = new Path(outputDirBase, globalColumns.get(i).getIdentity());
+            if (!fs.exists(colDictDir)) {
+                fs.mkdirs(colDictDir);
+            }
+            try (DataOutputStream dos = fs.create(new Path(colDictDir, splitKey))) {
+                shrunkenDict.write(dos);
+            }
+        }
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index b0ea7b7..f8874fe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -70,6 +70,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_SEGMENT_ID);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_DICTIONARY_SHRUNKEN_PATH);
             parseOptions(options, args);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
@@ -88,6 +89,10 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, cubingJobId);
+            String shrunkenDictPath = getOptionValue(OPTION_DICTIONARY_SHRUNKEN_PATH);
+            if (shrunkenDictPath != null) {
+                job.getConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, shrunkenDictPath);
+            }
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index 73af138..e95ce8a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -42,13 +42,12 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
+import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
 /**
  */
 public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T> extends KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
@@ -94,17 +93,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
         cubeSegment = cube.getSegmentById(segmentID);
         flatDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), cubeDesc);
 
-        dictionaryMap = Maps.newHashMap();
-
-        // dictionary
-        for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) {
-            Dictionary<?> dict = cubeSegment.getDictionary(col);
-            if (dict == null) {
-                logger.warn("Dictionary for " + col + " was not found.");
-            }
-
-            dictionaryMap.put(col, cubeSegment.getDictionary(col));
-        }
+        dictionaryMap = DictionaryGetterUtil.getDictionaryMap(cubeSegment, context.getInputSplit(), conf);
 
         // check memory more often if a single row is big
         if (cubeDesc.hasMemoryHungryMeasures()) {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 2e39285..33b1059 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -22,9 +22,12 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatSplit;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringUtil;
@@ -100,6 +103,12 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
             return Collections.singletonList(HiveTableReader.getRowAsStringArray((HCatRecord) mapperInput));
         }
 
+        @Override
+        public String getInputSplitSignature(InputSplit inputSplit) {
+            FileSplit baseSplit = (FileSplit) ((HCatSplit) inputSplit).getBaseSplit();
+            //file name(for intermediate table) + start pos + length
+            return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength();
+        }
     }
 
     public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index a45cc63..2c95c1c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -100,6 +102,11 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
             return Collections.singletonList(columns);
         }
 
+        @Override
+        public String getInputSplitSignature(InputSplit inputSplit) {
+            FileSplit baseSplit = (FileSplit) inputSplit;
+            return baseSplit.getPath().getName() + "_" + baseSplit.getStart() + "_" + baseSplit.getLength();
+        }
     }
 
     public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index e48090d..4d61d9b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -250,6 +250,7 @@ public abstract class HBaseJobSteps extends JobBuilderSupport {
         List<String> toDeletePaths = new ArrayList<>();
         toDeletePaths.add(getFactDistinctColumnsPath(jobId));
         toDeletePaths.add(getHFilePath(jobId));
+        toDeletePaths.add(getShrunkenDictionaryPath(jobId));
 
         HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
         step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);