You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kylin.apache.org by GitBox <gi...@apache.org> on 2018/08/07 13:52:44 UTC

[GitHub] shaofengshi closed pull request #181: Kylin 3471

shaofengshi closed pull request #181: Kylin 3471
URL: https://github.com/apache/kylin/pull/181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index f6ad0ed8ef..c805f8a69a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -54,6 +54,7 @@ private ExecutableConstants() {
     public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment";
     public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
     public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
+    public static final String STEP_NAME_MERGE_UPDATE_DICTIONARY = "Update Dictionary Data";
     public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization";
     public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index d443f523b4..d9027082b6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -20,7 +20,12 @@
 
 import java.util.List;
 
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,8 +60,8 @@ public CubingJob build() {
 
         // Phase 1: Merge Dictionary
         inputSide.addStepPhase1_MergeDictionary(result);
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
+        result.addTask(createMergeDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
+        result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
         outputSide.addStepPhase1_MergeDictionary(result);
 
         // Phase 2: Merge Cube Files
@@ -69,4 +74,25 @@ public CubingJob build() {
         return result;
     }
 
+    public MapReduceExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
+        MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
+        mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
+        appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
+        appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
+        appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Dictionary_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeDictionaryStep.setMapReduceParams(cmd.toString());
+        mergeDictionaryStep.setMapReduceJobClass(MergeDictionaryJob.class);
+
+        return mergeDictionaryStep;
+    }
+
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 42e0f42893..649b4c3507 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -19,10 +19,14 @@
 package org.apache.kylin.engine.mr;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -39,11 +43,12 @@
 import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
+import org.apache.kylin.engine.mr.steps.UpdateDictionaryStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
-import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  * Hold reusable steps for builders.
@@ -98,6 +103,22 @@ public MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<Strin
         return result;
     }
 
+    public UpdateDictionaryStep createUpdateDictionaryStep(CubeSegment seg, String jobId, List<String> mergingSegmentIds) {
+        UpdateDictionaryStep result = new UpdateDictionaryStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_UPDATE_DICTIONARY);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+
+        // merged dict info path
+        result.getParams().put(BatchConstants.ARG_DICT_PATH, getDictInfoPath(jobId));
+        // metadata url
+        result.getParams().put(BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobId));
+
+        return result;
+    }
+
     public MapReduceExecutable createBuildUHCDictStep(String jobId) {
         MapReduceExecutable result = new MapReduceExecutable();
         result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
@@ -192,7 +213,6 @@ public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<Stri
         return result;
     }
 
-
     public boolean isEnableUHCDictStep() {
         if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
             return false;
@@ -220,7 +240,6 @@ public LookupMaterializeContext addMaterializeLookupTableSteps(final CubingJob r
         return lookupMaterializeContext;
     }
 
-
     public SaveStatisticsStep createSaveStatisticsStep(String jobId) {
         SaveStatisticsStep result = new SaveStatisticsStep();
         result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
@@ -276,6 +295,10 @@ public String getDictRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/dict";
     }
 
+    public String getDictInfoPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/dict_info";
+    }
+
     public String getOptimizationRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/optimize";
     }
@@ -327,7 +350,6 @@ public String getDumpMetadataPath(String jobId) {
         return getRealizationRootPath(jobId) + "/metadata";
     }
 
-
     public static String extractJobIDFromPath(String path) {
         Matcher matcher = JOB_NAME_PATTERN.matcher(path);
         // check the first occurrence
@@ -337,4 +359,10 @@ public static String extractJobIDFromPath(String path) {
             throw new IllegalStateException("Can not extract job ID from file path : " + path);
         }
     }
+
+    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+        Map<String, String> param = new HashMap<>();
+        param.put("path", getDumpMetadataPath(jobId));
+        return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
+    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 716eafe08f..180b56a485 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -27,6 +27,7 @@
 import static org.apache.kylin.engine.mr.common.JobRelatedMetaUtil.collectCubeMetadata;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
@@ -60,7 +61,9 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
+import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -126,6 +129,9 @@
     protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg()
             .isRequired(true).withDescription("Lookup table snapshotID")
             .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
+    protected static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL)
+            .hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+
 
     private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
 
@@ -548,6 +554,41 @@ protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, Config
         dumpKylinPropsAndMetadata(cube.getProject(), dumpList, cube.getConfig(), conf);
     }
 
+    protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, String metaUrl) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+        for (CubeSegment segment : segments) {
+            dumpList.addAll(segment.getDictionaryPaths());
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
+        dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(), metaUrl);
+    }
+
+    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl)
+            throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
+
+        // dump metadata
+        JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
+
+        // write kylin.properties
+        Properties props = kylinConfig.exportToProperties();
+        props.setProperty("kylin.metadata.url", metadataUrl);
+
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
+            props.store(os, kylinPropsFile.getAbsolutePath());
+        }
+
+        KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
+        //upload metadata
+        ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+    }
+
     protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
         attachSegmentMetadata(segment, conf, true, false);
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d38f7a4168..a4a52ad0c4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -103,6 +103,7 @@
     String ARG_TABLE_NAME = "tableName";
     String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
     String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
+    String ARG_META_URL = "metadataUrl";
 
     /**
      * logger and counter
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
new file mode 100644
index 0000000000..32e8e0c651
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.Segments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class MergeDictionaryJob extends AbstractHadoopJob {
+    private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryJob.class);
+
+    public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg()
+            .isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds");
+    public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg()
+            .isRequired(true).withDescription("merged dictionary resource path").create("dictOutputPath");
+    public static final Option OPTION_OUTPUT_PATH_STAT = OptionBuilder.withArgName("statOutputPath").hasArg()
+            .isRequired(true).withDescription("merged statistics resource path").create("statOutputPath");
+
+    @Override
+    public int run(String[] args) throws Exception {
+        try {
+            Options options = new Options();
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_META_URL);
+            options.addOption(OPTION_MERGE_SEGMENT_IDS);
+            options.addOption(OPTION_OUTPUT_PATH_DICT);
+            options.addOption(OPTION_OUTPUT_PATH_STAT);
+            parseOptions(options, args);
+
+            final String segmentId = getOptionValue(OPTION_SEGMENT_ID);
+            final String segmentIds = getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+            final String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            final String metaUrl = getOptionValue(OPTION_META_URL);
+            final String dictOutputPath = getOptionValue(OPTION_OUTPUT_PATH_DICT);
+            final String statOutputPath = getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeDesc cubeDesc = cube.getDescriptor();
+            CubeSegment segment = cube.getSegmentById(segmentId);
+            Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            job.getConfiguration().set(BatchConstants.ARG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(OPTION_META_URL.getOpt(), metaUrl);
+            job.getConfiguration().set(OPTION_SEGMENT_ID.getOpt(), segmentId);
+            job.getConfiguration().set(OPTION_MERGE_SEGMENT_IDS.getOpt(), segmentIds);
+            job.getConfiguration().set(OPTION_OUTPUT_PATH_STAT.getOpt(), statOutputPath);
+            job.getConfiguration().set("num.map.tasks", String.valueOf(cubeDesc.getAllColumnsNeedDictionaryBuilt().size() + 1));
+            job.setNumReduceTasks(1);
+
+            setJobClasspath(job, cube.getConfig());
+
+            // dump metadata to HDFS
+            attachSegmentsMetadataWithDict(mergingSeg, metaUrl);
+
+            // clean output dir
+            HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));
+
+            job.setMapperClass(MergeDictionaryMapper.class);
+            job.setReducerClass(MergeDictionaryReducer.class);
+
+            job.setMapOutputKeyClass(IntWritable.class);
+            job.setMapOutputValueClass(Text.class);
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            job.setInputFormatClass(IndexArrInputFormat.class);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+            SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.NONE);
+            SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));
+
+            logger.info("Starting: " + job.getJobName());
+
+            return waitForCompletion(job);
+
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    static class IndexArrInputFormat extends InputFormat<IntWritable, NullWritable> {
+
+        @Override
+        public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
+            int numMapTasks = jobContext.getConfiguration().getInt("num.map.tasks", 0);
+            List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(numMapTasks);
+
+            for (int i = 0; i < numMapTasks; i++) {
+                inputSplits.add(new IntInputSplit(i));
+            }
+
+            return inputSplits;
+        }
+
+        @Override
+        public RecordReader<IntWritable, NullWritable> createRecordReader(InputSplit inputSplit,
+                TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+
+            return new RecordReader<IntWritable, NullWritable>() {
+                private int index;
+                private IntWritable key;
+                private NullWritable value;
+
+                @Override
+                public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+                        throws IOException, InterruptedException {
+                    IntInputSplit intInputSplit = (IntInputSplit) inputSplit;
+                    index = intInputSplit.getIndex();
+                }
+
+                @Override
+                public boolean nextKeyValue() throws IOException, InterruptedException {
+
+                    if (key == null) {
+                        key = new IntWritable(index);
+                        value = NullWritable.get();
+                        return true;
+                    }
+
+                    return false;
+                }
+
+                @Override
+                public IntWritable getCurrentKey() throws IOException, InterruptedException {
+                    return key;
+                }
+
+                @Override
+                public NullWritable getCurrentValue() throws IOException, InterruptedException {
+                    return value;
+                }
+
+                @Override
+                public float getProgress() throws IOException, InterruptedException {
+                    return 1;
+                }
+
+                @Override
+                public void close() throws IOException {
+
+                }
+            };
+        }
+    }
+
+    static class IntInputSplit extends InputSplit implements Writable {
+        private int index;
+
+        public IntInputSplit() {
+
+        }
+
+        public IntInputSplit(int index) {
+            this.index = index;
+        }
+
+        @Override
+        public void write(DataOutput dataOutput) throws IOException {
+            dataOutput.writeInt(index);
+        }
+
+        @Override
+        public void readFields(DataInput dataInput) throws IOException {
+            this.index = dataInput.readInt();
+        }
+
+        @Override
+        public long getLength() throws IOException, InterruptedException {
+            return 1L;
+        }
+
+        @Override
+        public String[] getLocations() throws IOException, InterruptedException {
+            return new String[0];
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
new file mode 100644
index 0000000000..522c06a491
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable, IntWritable, Text> {
+    private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryMapper.class);
+
+    List<CubeSegment> mergingSegments;
+    TblColRef[] tblColRefs;
+    DictionaryManager dictMgr;
+
+    @Override
+    protected void doSetup(Context context) throws IOException, InterruptedException {
+        super.doSetup(context);
+
+        final SerializableConfiguration sConf = new SerializableConfiguration(context.getConfiguration());
+        final String metaUrl = context.getConfiguration().get(BatchConstants.ARG_META_URL);
+        final String cubeName = context.getConfiguration().get(BatchConstants.ARG_CUBE_NAME);
+        final String segmentIds = context.getConfiguration().get(MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt());
+
+        final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeInstance.getDescName());
+
+        mergingSegments = getMergingSegments(cubeInstance, segmentIds.split(","));
+        tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+        dictMgr = DictionaryManager.getInstance(kylinConfig);
+    }
+
+    @Override
+    protected void doMap(IntWritable key, NullWritable value, Context context)
+            throws IOException, InterruptedException {
+
+        int index = key.get();
+
+        if (index < tblColRefs.length) {
+            // merge dictionary
+            TblColRef col = tblColRefs[index];
+            List<DictionaryInfo> dictInfos = Lists.newArrayList();
+            for (CubeSegment segment : mergingSegments) {
+                if (segment.getDictResPath(col) != null) {
+                    DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+                    if (dictInfo != null && !dictInfos.contains(dictInfo)) {
+                        dictInfos.add(dictInfo);
+                    }
+                }
+            }
+
+            DictionaryInfo mergedDictInfo = dictMgr.mergeDictionary(dictInfos);
+            String tblCol = col.getTableAlias() + ":" + col.getName();
+            String dictInfoPath = mergedDictInfo == null ? "" : mergedDictInfo.getResourcePath();
+
+            context.write(new IntWritable(-1), new Text(tblCol + "=" + dictInfoPath));
+
+        } else {
+            // merge statistics
+            KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(context.getConfiguration()), context.getConfiguration().get(BatchConstants.ARG_META_URL));
+
+            final String cubeName = context.getConfiguration().get(BatchConstants.ARG_CUBE_NAME);
+            final String segmentId = context.getConfiguration().get(BatchConstants.ARG_SEGMENT_ID);
+            final String statOutputPath = context.getConfiguration().get(MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt());
+            CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+            logger.info("Statistics output path: {}", statOutputPath);
+
+            CubeSegment newSegment = cubeInstance.getSegmentById(segmentId);
+            ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+            Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+            Configuration conf = null;
+            int averageSamplingPercentage = 0;
+
+            for (CubeSegment cubeSegment : mergingSegments) {
+                String filePath = cubeSegment.getStatisticsResourcePath();
+                InputStream is = rs.getResource(filePath).inputStream;
+                File tempFile;
+                FileOutputStream tempFileStream = null;
+
+                try {
+                    tempFile = File.createTempFile(segmentId, ".seq");
+                    tempFileStream = new FileOutputStream(tempFile);
+                    org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+                } finally {
+                    IOUtils.closeStream(is);
+                    IOUtils.closeStream(tempFileStream);
+                }
+
+                FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+                SequenceFile.Reader reader = null;
+                try {
+                    conf = HadoopUtil.getCurrentConfiguration();
+                    //noinspection deprecation
+                    reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+                    LongWritable keyW = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    BytesWritable valueW = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                    while (reader.next(keyW, valueW)) {
+                        if (keyW.get() == 0L) {
+                            // sampling percentage;
+                            averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
+                        } else if (keyW.get() > 0) {
+                            HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+                            ByteArray byteArray = new ByteArray(valueW.getBytes());
+                            hll.readRegisters(byteArray.asBuffer());
+
+                            if (cuboidHLLMap.get(keyW.get()) != null) {
+                                cuboidHLLMap.get(keyW.get()).merge(hll);
+                            } else {
+                                cuboidHLLMap.put(keyW.get(), hll);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                } finally {
+                    IOUtils.closeStream(reader);
+                }
+            }
+
+            averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+            CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+            Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+            FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+            FSDataInputStream fis = fs.open(statisticsFilePath);
+
+            try {
+                // put the statistics to metadata store
+                String statisticsFileName = newSegment.getStatisticsResourcePath();
+                rs.putResource(statisticsFileName, fis, System.currentTimeMillis());
+            } finally {
+                IOUtils.closeStream(fis);
+            }
+
+            context.write(new IntWritable(-1), new Text(""));
+        }
+    }
+
+    private List<CubeSegment> getMergingSegments(CubeInstance cube, String[] segmentIds) {
+        List<CubeSegment> result = Lists.newArrayListWithCapacity(segmentIds.length);
+        for (String id : segmentIds) {
+            result.add(cube.getSegmentById(id));
+        }
+        return result;
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
new file mode 100644
index 0000000000..1eb3c07f20
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeDictionaryReducer extends KylinReducer<IntWritable, Text, Text, Text> {
+
+    private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryReducer.class);
+
+    @Override
+    protected void doReduce(IntWritable key, Iterable<Text> values, Context context)
+            throws IOException, InterruptedException {
+        for (Text text : values) {
+            String value = text.toString();
+            String[] splited = value.split("=");
+            if (splited != null && splited.length == 2) {
+                logger.info("Dictionary for col {}, save at {}", splited[0], splited[1]);
+                context.write(new Text(splited[0]), new Text(splited[1]));
+            }
+        }
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
new file mode 100644
index 0000000000..bcdb29f52c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateDictionaryStep extends AbstractExecutable {
+    private static final Logger logger = LoggerFactory.getLogger(UpdateDictionaryStep.class);
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        final CubeManager cubeMgr = CubeManager.getInstance(context.getConfig());
+        final DictionaryManager dictMgrHdfs;
+        final DictionaryManager dictMgrHbase;
+        final CubeInstance cube = cubeMgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
+        final String dictInfoPath = this.getParams().get(BatchConstants.ARG_DICT_PATH);
+        final String metadataUrl = this.getParams().get(BatchConstants.ARG_META_URL);
+
+        final KylinConfig kylinConfHbase = cube.getConfig();
+        final KylinConfig kylinConfHdfs = KylinConfig.createInstanceFromUri(metadataUrl);
+
+        Collections.sort(mergingSegments);
+
+        try {
+            Configuration conf = HadoopUtil.getCurrentConfiguration();
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            ResourceStore hbaseRS = ResourceStore.getStore(kylinConfHbase);
+            ResourceStore hdfsRS = ResourceStore.getStore(kylinConfHdfs);
+            dictMgrHdfs = DictionaryManager.getInstance(kylinConfHdfs);
+            dictMgrHbase = DictionaryManager.getInstance(kylinConfHbase);
+
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cube.latestCopyForWrite();
+            CubeSegment newSegCopy = cubeCopy.getSegmentById(newSegment.getUuid());
+
+            // update cube segment dictionary
+
+            FileStatus[] fileStatuss = fs.listStatus(new Path(dictInfoPath), new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    return path.getName().startsWith("part");
+                }
+            });
+
+            for (FileStatus fileStatus : fileStatuss) {
+                Path filePath = fileStatus.getPath();
+
+                SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+                Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                while (reader.next(key, value)) {
+                    String tblCol = key.toString();
+                    String dictInfoResource = value.toString();
+
+                    if (StringUtils.isNotEmpty(dictInfoResource)) {
+                        logger.info(dictInfoResource);
+                        // put dictionary file to metadata store
+                        DictionaryInfo dictInfoHdfs = dictMgrHdfs.getDictionaryInfo(dictInfoResource);
+                        DictionaryInfo dicInfoHbase = dictMgrHbase.trySaveNewDict(dictInfoHdfs.getDictionaryObject(), dictInfoHdfs);
+
+                        if (dicInfoHbase != null){
+                            TblColRef tblColRef = cube.getDescriptor().findColumnRef(tblCol.split(":")[0], tblCol.split(":")[1]);
+                            newSegCopy.putDictResPath(tblColRef, dicInfoHbase.getResourcePath());
+                        }
+                    }
+                }
+
+                IOUtils.closeStream(reader);
+            }
+
+            CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
+            for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
+                newSegCopy.putSnapshotResPath(entry.getKey(), entry.getValue());
+            }
+
+            // update statistics
+            // put the statistics to metadata store
+            String statisticsFileName = newSegment.getStatisticsResourcePath();
+            hbaseRS.putResource(statisticsFileName, hdfsRS.getResource(newSegment.getStatisticsResourcePath()).inputStream, System.currentTimeMillis());
+
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(newSegCopy);
+            cubeMgr.updateCube(update);
+
+            return ExecuteResult.createSucceed();
+        } catch (IOException e) {
+            logger.error("fail to merge dictionary", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
+        List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
+        List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
+        for (String id : mergingSegmentIds) {
+            result.add(cube.getSegmentById(id));
+        }
+        return result;
+    }
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index 97861a3623..4487610885 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -18,12 +18,8 @@
 
 package org.apache.kylin.engine.spark;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
@@ -63,8 +59,8 @@ public CubingJob build() {
 
         // Phase 1: Merge Dictionary
         inputSide.addStepPhase1_MergeDictionary(result);
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
+        result.addTask(createMergeDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
+        result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
         outputSide.addStepPhase1_MergeDictionary(result);
 
         // merge cube
@@ -80,6 +76,28 @@ public CubingJob build() {
         return result;
     }
 
+    public SparkExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        sparkExecutable.setClassName(SparkMergingDictionary.class.getName());
+
+        sparkExecutable.setParam(SparkMergingDictionary.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+        sparkExecutable.setParam(SparkMergingDictionary.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+        sparkExecutable.setParam(SparkMergingDictionary.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobID));
+        sparkExecutable.setParam(SparkMergingDictionary.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
+        sparkExecutable.setParam(SparkMergingDictionary.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
+        sparkExecutable.setParam(SparkMergingDictionary.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
+
+        sparkExecutable.setJobId(jobID);
+        sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+
+        return sparkExecutable;
+    }
+
     public SparkExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID) {
 
         final List<String> mergingCuboidPaths = Lists.newArrayList();
@@ -94,8 +112,7 @@ public SparkExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegme
         sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
         sparkExecutable.setParam(SparkCubingMerge.OPTION_INPUT_PATH.getOpt(), formattedPath);
-        sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(),
-                getSegmentMetadataUrl(seg.getConfig(), jobID));
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(), getSegmentMetadataUrl(seg.getConfig(), jobID));
         sparkExecutable.setParam(SparkCubingMerge.OPTION_OUTPUT_PATH.getOpt(), outputPath);
 
         sparkExecutable.setJobId(jobID);
@@ -108,10 +125,4 @@ public SparkExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegme
 
         return sparkExecutable;
     }
-
-    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
-        Map<String, String> param = new HashMap<>();
-        param.put("path", getDumpMetadataPath(jobId));
-        return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
-    }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 4635fad255..d8eba713b3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.util.Shell;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.Pair;
@@ -386,9 +387,13 @@ private void attachSegmentMetadataWithDict(CubeSegment segment) throws IOExcepti
     private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+        ResourceStore rs = ResourceStore.getStore(segments.get(0).getConfig());
         for (CubeSegment segment : segments) {
             dumpList.addAll(segment.getDictionaryPaths());
-            dumpList.add(segment.getStatisticsResourcePath());
+            if (rs.exists(segment.getStatisticsResourcePath())) {
+                // cube statistics is not available for new segment
+                dumpList.add(segment.getStatisticsResourcePath());
+            }
         }
         dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig());
     }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
new file mode 100644
index 0000000000..deb7968812
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import scala.Tuple2;
+
+/**
+    merge dictionary
+ */
+public class SparkMergingDictionary extends AbstractApplication implements Serializable {
+    protected static final Logger logger = LoggerFactory.getLogger(SparkMergingDictionary.class);
+
+    public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg()
+            .isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds");
+    public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg()
+            .isRequired(true).withDescription("merged dictionary resource path").create("dictOutputPath");
+    public static final Option OPTION_OUTPUT_PATH_STAT = OptionBuilder.withArgName("statOutputPath").hasArg()
+            .isRequired(true).withDescription("merged statistics resource path").create("statOutputPath");
+
+    private Options options;
+
+    public SparkMergingDictionary() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_MERGE_SEGMENT_IDS);
+        options.addOption(OPTION_OUTPUT_PATH_DICT);
+        options.addOption(OPTION_OUTPUT_PATH_STAT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        final String segmentIds = optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+        final String dictOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
+        final String statOutputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+        Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"),
+                Class.forName("scala.collection.mutable.WrappedArray$ofRef") };
+
+        SparkConf conf = new SparkConf().setAppName("Merge dictionary for cube:" + cubeName + ", segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        sc.sc().addSparkListener(jobListener);
+
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(dictOutputPath));
+
+        final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+        final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+
+        logger.info("Dictionary output path: {}", dictOutputPath);
+        logger.info("Statistics output path: {}", statOutputPath);
+
+        final TblColRef[] tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+        final int columnLength = tblColRefs.length;
+
+        List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+
+        for (int i = 0; i <= columnLength; i++) {
+            indexs.add(i);
+        }
+
+        JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+
+        JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new MergeDictAndStatsFunction(cubeName, metaUrl,
+                segmentId, segmentIds.split(","), statOutputPath, tblColRefs, sConf));
+
+        colToDictPathRDD.coalesce(1, false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class, SequenceFileOutputFormat.class);
+    }
+
+    static public class MergeDictAndStatsFunction implements PairFunction<Integer, Text, Text> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String metaUrl;
+        private String segmentId;
+        private String[] segmentIds;
+        private String statOutputPath;
+        private TblColRef[] tblColRefs;
+        private SerializableConfiguration conf;
+        private DictionaryManager dictMgr;
+        private KylinConfig kylinConfig;
+        private List<CubeSegment> mergingSegments;
+
+        public MergeDictAndStatsFunction(String cubeName, String metaUrl, String segmentId, String[] segmentIds,
+                String statOutputPath, TblColRef[] tblColRefs, SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.segmentId = segmentId;
+            this.segmentIds = segmentIds;
+            this.statOutputPath = statOutputPath;
+            this.tblColRefs = tblColRefs;
+            this.conf = conf;
+        }
+
+        private void init() {
+            kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
+            CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            dictMgr = DictionaryManager.getInstance(kylinConfig);
+            mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+        }
+
+        @Override
+        public Tuple2<Text, Text> call(Integer index) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkMergingDictionary.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
+            }
+
+            if (index < tblColRefs.length) {
+                // merge dictionary
+                TblColRef col = tblColRefs[index];
+                List<DictionaryInfo> dictInfos = Lists.newArrayList();
+                for (CubeSegment segment : mergingSegments) {
+                    if (segment.getDictResPath(col) != null) {
+                        DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+                        if (dictInfo != null && !dictInfos.contains(dictInfo)) {
+                            dictInfos.add(dictInfo);
+                        }
+                    }
+                }
+
+                DictionaryInfo mergedDictInfo = dictMgr.mergeDictionary(dictInfos);
+                String tblCol = col.getTableAlias() + ":" + col.getName();
+                String dictInfoPath = mergedDictInfo == null ? "" : mergedDictInfo.getResourcePath();
+
+                return new Tuple2<>(new Text(tblCol), new Text(dictInfoPath));
+            } else {
+                // merge statistics
+                CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+                CubeSegment newSegment = cubeInstance.getSegmentById(segmentId);
+                ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+                Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+                Configuration conf = null;
+                int averageSamplingPercentage = 0;
+
+                for (CubeSegment cubeSegment : mergingSegments) {
+                    String filePath = cubeSegment.getStatisticsResourcePath();
+                    InputStream is = rs.getResource(filePath).inputStream;
+                    File tempFile;
+                    FileOutputStream tempFileStream = null;
+
+                    try {
+                        tempFile = File.createTempFile(segmentId, ".seq");
+                        tempFileStream = new FileOutputStream(tempFile);
+                        org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+                    } finally {
+                        IOUtils.closeStream(is);
+                        IOUtils.closeStream(tempFileStream);
+                    }
+
+                    FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+                    SequenceFile.Reader reader = null;
+
+                    try {
+                        conf = HadoopUtil.getCurrentConfiguration();
+                        //noinspection deprecation
+                        reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+                        LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                        BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                        while (reader.next(key, value)) {
+                            if (key.get() == 0L) {
+                                // sampling percentage;
+                                averageSamplingPercentage += Bytes.toInt(value.getBytes());
+                            } else if (key.get() > 0) {
+                                HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+                                ByteArray byteArray = new ByteArray(value.getBytes());
+                                hll.readRegisters(byteArray.asBuffer());
+
+                                if (cuboidHLLMap.get(key.get()) != null) {
+                                    cuboidHLLMap.get(key.get()).merge(hll);
+                                } else {
+                                    cuboidHLLMap.put(key.get(), hll);
+                                }
+                            }
+                        }
+                    } finally {
+                        IOUtils.closeStream(reader);
+                    }
+                }
+
+                averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+                CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+                Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+                FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+                FSDataInputStream fis = fs.open(statisticsFilePath);
+
+                try {
+                    // put the statistics to metadata store
+                    String statisticsFileName = newSegment.getStatisticsResourcePath();
+                    rs.putResource(statisticsFileName, fis, System.currentTimeMillis());
+                } finally {
+                    IOUtils.closeStream(fis);
+                }
+
+                return new Tuple2<>(new Text(""), new Text(""));
+            }
+
+        }
+
+        private List<CubeSegment> getMergingSegments(CubeInstance cube, String[] segmentIds) {
+            List<CubeSegment> result = Lists.newArrayListWithCapacity(segmentIds.length);
+            for (String id : segmentIds) {
+                result.add(cube.getSegmentById(id));
+            }
+            return result;
+        }
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services