You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/07 13:52:45 UTC

[kylin] 01/02: KYLIN-3471 Merge dictionary using Mapreduce

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9a92f4922ad17e40c1914f34db659aaad3c98605
Author: chao long <wa...@qq.com>
AuthorDate: Tue Aug 7 20:08:20 2018 +0800

    KYLIN-3471 Merge dictionary using Mapreduce
---
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/engine/mr/BatchMergeJobBuilder2.java     |  30 ++-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  36 +++-
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |  41 ++++
 .../kylin/engine/mr/common/BatchConstants.java     |   1 +
 .../kylin/engine/mr/steps/MergeDictionaryJob.java  | 234 +++++++++++++++++++++
 .../engine/mr/steps/MergeDictionaryMapper.java     | 207 ++++++++++++++++++
 .../engine/mr/steps/MergeDictionaryReducer.java    |  45 ++++
 .../engine/mr/steps/UpdateDictionaryStep.java      | 151 +++++++++++++
 9 files changed, 740 insertions(+), 6 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index f6ad0ed..c805f8a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -54,6 +54,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment";
     public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
     public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
+    public static final String STEP_NAME_MERGE_UPDATE_DICTIONARY = "Update Dictionary Data";
     public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization";
     public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index d443f52..d902708 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -20,7 +20,12 @@ package org.apache.kylin.engine.mr;
 
 import java.util.List;
 
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,8 +60,8 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
 
         // Phase 1: Merge Dictionary
         inputSide.addStepPhase1_MergeDictionary(result);
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
+        result.addTask(createMergeDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
+        result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
         outputSide.addStepPhase1_MergeDictionary(result);
 
         // Phase 2: Merge Cube Files
@@ -69,4 +74,25 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
         return result;
     }
 
+    public MapReduceExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
+        MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
+        mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
+        appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
+        appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
+        appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Dictionary_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeDictionaryStep.setMapReduceParams(cmd.toString());
+        mergeDictionaryStep.setMapReduceJobClass(MergeDictionaryJob.class);
+
+        return mergeDictionaryStep;
+    }
+
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 42e0f42..649b4c3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -19,10 +19,14 @@
 package org.apache.kylin.engine.mr;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -39,11 +43,12 @@ import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
+import org.apache.kylin.engine.mr.steps.UpdateDictionaryStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
-import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  * Hold reusable steps for builders.
@@ -98,6 +103,22 @@ public class JobBuilderSupport {
         return result;
     }
 
+    public UpdateDictionaryStep createUpdateDictionaryStep(CubeSegment seg, String jobId, List<String> mergingSegmentIds) {
+        UpdateDictionaryStep result = new UpdateDictionaryStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_UPDATE_DICTIONARY);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+
+        // merged dict info path
+        result.getParams().put(BatchConstants.ARG_DICT_PATH, getDictInfoPath(jobId));
+        // metadata url
+        result.getParams().put(BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobId));
+
+        return result;
+    }
+
     public MapReduceExecutable createBuildUHCDictStep(String jobId) {
         MapReduceExecutable result = new MapReduceExecutable();
         result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
@@ -192,7 +213,6 @@ public class JobBuilderSupport {
         return result;
     }
 
-
     public boolean isEnableUHCDictStep() {
         if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
             return false;
@@ -220,7 +240,6 @@ public class JobBuilderSupport {
         return lookupMaterializeContext;
     }
 
-
     public SaveStatisticsStep createSaveStatisticsStep(String jobId) {
         SaveStatisticsStep result = new SaveStatisticsStep();
         result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
@@ -276,6 +295,10 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/dict";
     }
 
+    public String getDictInfoPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/dict_info";
+    }
+
     public String getOptimizationRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/optimize";
     }
@@ -327,7 +350,6 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/metadata";
     }
 
-
     public static String extractJobIDFromPath(String path) {
         Matcher matcher = JOB_NAME_PATTERN.matcher(path);
         // check the first occurrence
@@ -337,4 +359,10 @@ public class JobBuilderSupport {
             throw new IllegalStateException("Can not extract job ID from file path : " + path);
         }
     }
+
+    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+        Map<String, String> param = new HashMap<>();
+        param.put("path", getDumpMetadataPath(jobId));
+        return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
+    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 716eafe..180b56a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.util.StringUtils.formatTime;
 import static org.apache.kylin.engine.mr.common.JobRelatedMetaUtil.collectCubeMetadata;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
@@ -60,7 +61,9 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
+import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -126,6 +129,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
     protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg()
             .isRequired(true).withDescription("Lookup table snapshotID")
             .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
+    protected static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL)
+            .hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
@@ -548,6 +554,41 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
         dumpKylinPropsAndMetadata(cube.getProject(), dumpList, cube.getConfig(), conf);
     }
 
+    protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, String metaUrl) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+        for (CubeSegment segment : segments) {
+            dumpList.addAll(segment.getDictionaryPaths());
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
+        dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(), metaUrl);
+    }
+
+    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl)
+            throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
+
+        // dump metadata
+        JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
+
+        // write kylin.properties
+        Properties props = kylinConfig.exportToProperties();
+        props.setProperty("kylin.metadata.url", metadataUrl);
+
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
+            props.store(os, kylinPropsFile.getAbsolutePath());
+        }
+
+        KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
+        //upload metadata
+        ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+    }
+
     protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
         attachSegmentMetadata(segment, conf, true, false);
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d38f7a4..a4a52ad 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -103,6 +103,7 @@ public interface BatchConstants {
     String ARG_TABLE_NAME = "tableName";
     String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
     String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
+    String ARG_META_URL = "metadataUrl";
 
     /**
      * logger and counter
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
new file mode 100644
index 0000000..32e8e0c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.Segments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class MergeDictionaryJob extends AbstractHadoopJob {
+    private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryJob.class);
+
+    public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg()
+            .isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds");
+    public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg()
+            .isRequired(true).withDescription("merged dictionary resource path").create("dictOutputPath");
+    public static final Option OPTION_OUTPUT_PATH_STAT = OptionBuilder.withArgName("statOutputPath").hasArg()
+            .isRequired(true).withDescription("merged statistics resource path").create("statOutputPath");
+
+    @Override
+    public int run(String[] args) throws Exception {
+        try {
+            Options options = new Options();
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_META_URL);
+            options.addOption(OPTION_MERGE_SEGMENT_IDS);
+            options.addOption(OPTION_OUTPUT_PATH_DICT);
+            options.addOption(OPTION_OUTPUT_PATH_STAT);
+            parseOptions(options, args);
+
+            final String segmentId = getOptionValue(OPTION_SEGMENT_ID);
+            final String segmentIds = getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+            final String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            final String metaUrl = getOptionValue(OPTION_META_URL);
+            final String dictOutputPath = getOptionValue(OPTION_OUTPUT_PATH_DICT);
+            final String statOutputPath = getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeDesc cubeDesc = cube.getDescriptor();
+            CubeSegment segment = cube.getSegmentById(segmentId);
+            Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            job.getConfiguration().set(BatchConstants.ARG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(OPTION_META_URL.getOpt(), metaUrl);
+            job.getConfiguration().set(OPTION_SEGMENT_ID.getOpt(), segmentId);
+            job.getConfiguration().set(OPTION_MERGE_SEGMENT_IDS.getOpt(), segmentIds);
+            job.getConfiguration().set(OPTION_OUTPUT_PATH_STAT.getOpt(), statOutputPath);
+            job.getConfiguration().set("num.map.tasks", String.valueOf(cubeDesc.getAllColumnsNeedDictionaryBuilt().size() + 1));
+            job.setNumReduceTasks(1);
+
+            setJobClasspath(job, cube.getConfig());
+
+            // dump metadata to HDFS
+            attachSegmentsMetadataWithDict(mergingSeg, metaUrl);
+
+            // clean output dir
+            HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));
+
+            job.setMapperClass(MergeDictionaryMapper.class);
+            job.setReducerClass(MergeDictionaryReducer.class);
+
+            job.setMapOutputKeyClass(IntWritable.class);
+            job.setMapOutputValueClass(Text.class);
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            job.setInputFormatClass(IndexArrInputFormat.class);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+            SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.NONE);
+            SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));
+
+            logger.info("Starting: " + job.getJobName());
+
+            return waitForCompletion(job);
+
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    static class IndexArrInputFormat extends InputFormat<IntWritable, NullWritable> {
+
+        @Override
+        public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
+            int numMapTasks = jobContext.getConfiguration().getInt("num.map.tasks", 0);
+            List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(numMapTasks);
+
+            for (int i = 0; i < numMapTasks; i++) {
+                inputSplits.add(new IntInputSplit(i));
+            }
+
+            return inputSplits;
+        }
+
+        @Override
+        public RecordReader<IntWritable, NullWritable> createRecordReader(InputSplit inputSplit,
+                TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+
+            return new RecordReader<IntWritable, NullWritable>() {
+                private int index;
+                private IntWritable key;
+                private NullWritable value;
+
+                @Override
+                public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+                        throws IOException, InterruptedException {
+                    IntInputSplit intInputSplit = (IntInputSplit) inputSplit;
+                    index = intInputSplit.getIndex();
+                }
+
+                @Override
+                public boolean nextKeyValue() throws IOException, InterruptedException {
+
+                    if (key == null) {
+                        key = new IntWritable(index);
+                        value = NullWritable.get();
+                        return true;
+                    }
+
+                    return false;
+                }
+
+                @Override
+                public IntWritable getCurrentKey() throws IOException, InterruptedException {
+                    return key;
+                }
+
+                @Override
+                public NullWritable getCurrentValue() throws IOException, InterruptedException {
+                    return value;
+                }
+
+                @Override
+                public float getProgress() throws IOException, InterruptedException {
+                    return 1;
+                }
+
+                @Override
+                public void close() throws IOException {
+
+                }
+            };
+        }
+    }
+
+    static class IntInputSplit extends InputSplit implements Writable {
+        private int index;
+
+        public IntInputSplit() {
+
+        }
+
+        public IntInputSplit(int index) {
+            this.index = index;
+        }
+
+        @Override
+        public void write(DataOutput dataOutput) throws IOException {
+            dataOutput.writeInt(index);
+        }
+
+        @Override
+        public void readFields(DataInput dataInput) throws IOException {
+            this.index = dataInput.readInt();
+        }
+
+        @Override
+        public long getLength() throws IOException, InterruptedException {
+            return 1L;
+        }
+
+        @Override
+        public String[] getLocations() throws IOException, InterruptedException {
+            return new String[0];
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
new file mode 100644
index 0000000..522c06a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable, IntWritable, Text> {
+    private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryMapper.class);
+
+    List<CubeSegment> mergingSegments;
+    TblColRef[] tblColRefs;
+    DictionaryManager dictMgr;
+
+    @Override
+    protected void doSetup(Context context) throws IOException, InterruptedException {
+        super.doSetup(context);
+
+        final SerializableConfiguration sConf = new SerializableConfiguration(context.getConfiguration());
+        final String metaUrl = context.getConfiguration().get(BatchConstants.ARG_META_URL);
+        final String cubeName = context.getConfiguration().get(BatchConstants.ARG_CUBE_NAME);
+        final String segmentIds = context.getConfiguration().get(MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt());
+
+        final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeInstance.getDescName());
+
+        mergingSegments = getMergingSegments(cubeInstance, segmentIds.split(","));
+        tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+        dictMgr = DictionaryManager.getInstance(kylinConfig);
+    }
+
+    @Override
+    protected void doMap(IntWritable key, NullWritable value, Context context)
+            throws IOException, InterruptedException {
+
+        int index = key.get();
+
+        if (index < tblColRefs.length) {
+            // merge dictionary
+            TblColRef col = tblColRefs[index];
+            List<DictionaryInfo> dictInfos = Lists.newArrayList();
+            for (CubeSegment segment : mergingSegments) {
+                if (segment.getDictResPath(col) != null) {
+                    DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+                    if (dictInfo != null && !dictInfos.contains(dictInfo)) {
+                        dictInfos.add(dictInfo);
+                    }
+                }
+            }
+
+            DictionaryInfo mergedDictInfo = dictMgr.mergeDictionary(dictInfos);
+            String tblCol = col.getTableAlias() + ":" + col.getName();
+            String dictInfoPath = mergedDictInfo == null ? "" : mergedDictInfo.getResourcePath();
+
+            context.write(new IntWritable(-1), new Text(tblCol + "=" + dictInfoPath));
+
+        } else {
+            // merge statistics
+            KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(context.getConfiguration()), context.getConfiguration().get(BatchConstants.ARG_META_URL));
+
+            final String cubeName = context.getConfiguration().get(BatchConstants.ARG_CUBE_NAME);
+            final String segmentId = context.getConfiguration().get(BatchConstants.ARG_SEGMENT_ID);
+            final String statOutputPath = context.getConfiguration().get(MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt());
+            CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+            logger.info("Statistics output path: {}", statOutputPath);
+
+            CubeSegment newSegment = cubeInstance.getSegmentById(segmentId);
+            ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+            Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+            Configuration conf = null;
+            int averageSamplingPercentage = 0;
+
+            for (CubeSegment cubeSegment : mergingSegments) {
+                String filePath = cubeSegment.getStatisticsResourcePath();
+                InputStream is = rs.getResource(filePath).inputStream;
+                File tempFile;
+                FileOutputStream tempFileStream = null;
+
+                try {
+                    tempFile = File.createTempFile(segmentId, ".seq");
+                    tempFileStream = new FileOutputStream(tempFile);
+                    org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+                } finally {
+                    IOUtils.closeStream(is);
+                    IOUtils.closeStream(tempFileStream);
+                }
+
+                FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+                SequenceFile.Reader reader = null;
+                try {
+                    conf = HadoopUtil.getCurrentConfiguration();
+                    //noinspection deprecation
+                    reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+                    LongWritable keyW = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    BytesWritable valueW = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                    while (reader.next(keyW, valueW)) {
+                        if (keyW.get() == 0L) {
+                            // sampling percentage;
+                            averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
+                        } else if (keyW.get() > 0) {
+                            HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+                            ByteArray byteArray = new ByteArray(valueW.getBytes());
+                            hll.readRegisters(byteArray.asBuffer());
+
+                            if (cuboidHLLMap.get(keyW.get()) != null) {
+                                cuboidHLLMap.get(keyW.get()).merge(hll);
+                            } else {
+                                cuboidHLLMap.put(keyW.get(), hll);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                } finally {
+                    IOUtils.closeStream(reader);
+                }
+            }
+
+            averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+            CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+            Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+            FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+            FSDataInputStream fis = fs.open(statisticsFilePath);
+
+            try {
+                // put the statistics to metadata store
+                String statisticsFileName = newSegment.getStatisticsResourcePath();
+                rs.putResource(statisticsFileName, fis, System.currentTimeMillis());
+            } finally {
+                IOUtils.closeStream(fis);
+            }
+
+            context.write(new IntWritable(-1), new Text(""));
+        }
+    }
+
+    private List<CubeSegment> getMergingSegments(CubeInstance cube, String[] segmentIds) {
+        List<CubeSegment> result = Lists.newArrayListWithCapacity(segmentIds.length);
+        for (String id : segmentIds) {
+            result.add(cube.getSegmentById(id));
+        }
+        return result;
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
new file mode 100644
index 0000000..1eb3c07
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeDictionaryReducer extends KylinReducer<IntWritable, Text, Text, Text> {
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryReducer.class);
+
+    @Override
+    protected void doReduce(IntWritable key, Iterable<Text> values, Context context)
+            throws IOException, InterruptedException {
+        for (Text text : values) {
+            String value = text.toString();
+            String[] splited = value.split("=");
+            if (splited != null && splited.length == 2) {
+                logger.info("Dictionary for col {}, save at {}", splited[0], splited[1]);
+                context.write(new Text(splited[0]), new Text(splited[1]));
+            }
+        }
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
new file mode 100644
index 0000000..bcdb29f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateDictionaryStep extends AbstractExecutable {
+    private static final Logger logger = LoggerFactory.getLogger(UpdateDictionaryStep.class);
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager cubeMgr = CubeManager.getInstance(context.getConfig());
+        final DictionaryManager dictMgrHdfs;
+        final DictionaryManager dictMgrHbase;
+        final CubeInstance cube = cubeMgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
+        final String dictInfoPath = this.getParams().get(BatchConstants.ARG_DICT_PATH);
+        final String metadataUrl = this.getParams().get(BatchConstants.ARG_META_URL);
+
+        final KylinConfig kylinConfHbase = cube.getConfig();
+        final KylinConfig kylinConfHdfs = KylinConfig.createInstanceFromUri(metadataUrl);
+
+        Collections.sort(mergingSegments);
+
+        try {
+            Configuration conf = HadoopUtil.getCurrentConfiguration();
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            ResourceStore hbaseRS = ResourceStore.getStore(kylinConfHbase);
+            ResourceStore hdfsRS = ResourceStore.getStore(kylinConfHdfs);
+            dictMgrHdfs = DictionaryManager.getInstance(kylinConfHdfs);
+            dictMgrHbase = DictionaryManager.getInstance(kylinConfHbase);
+
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cube.latestCopyForWrite();
+            CubeSegment newSegCopy = cubeCopy.getSegmentById(newSegment.getUuid());
+
+            // update cube segment dictionary
+
+            FileStatus[] fileStatuss = fs.listStatus(new Path(dictInfoPath), new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    return path.getName().startsWith("part");
+                }
+            });
+
+            for (FileStatus fileStatus : fileStatuss) {
+                Path filePath = fileStatus.getPath();
+
+                SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+                Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                while (reader.next(key, value)) {
+                    String tblCol = key.toString();
+                    String dictInfoResource = value.toString();
+
+                    if (StringUtils.isNotEmpty(dictInfoResource)) {
+                        logger.info(dictInfoResource);
+                        // put dictionary file to metadata store
+                        DictionaryInfo dictInfoHdfs = dictMgrHdfs.getDictionaryInfo(dictInfoResource);
+                        DictionaryInfo dicInfoHbase = dictMgrHbase.trySaveNewDict(dictInfoHdfs.getDictionaryObject(), dictInfoHdfs);
+
+                        if (dicInfoHbase != null){
+                            TblColRef tblColRef = cube.getDescriptor().findColumnRef(tblCol.split(":")[0], tblCol.split(":")[1]);
+                            newSegCopy.putDictResPath(tblColRef, dicInfoHbase.getResourcePath());
+                        }
+                    }
+                }
+
+                IOUtils.closeStream(reader);
+            }
+
+            CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
+            for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
+                newSegCopy.putSnapshotResPath(entry.getKey(), entry.getValue());
+            }
+
+            // update statistics
+            // put the statistics to metadata store
+            String statisticsFileName = newSegment.getStatisticsResourcePath();
+            hbaseRS.putResource(statisticsFileName, hdfsRS.getResource(newSegment.getStatisticsResourcePath()).inputStream, System.currentTimeMillis());
+
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(newSegCopy);
+            cubeMgr.updateCube(update);
+
+            return ExecuteResult.createSucceed();
+        } catch (IOException e) {
+            logger.error("fail to merge dictionary", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
+        List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
+        List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
+        for (String id : mergingSegmentIds) {
+            result.add(cube.getSegmentById(id));
+        }
+        return result;
+    }
+}