You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/08/07 13:52:45 UTC
[kylin] 01/02: KYLIN-3471 Merge dictionary using Mapreduce
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 9a92f4922ad17e40c1914f34db659aaad3c98605
Author: chao long <wa...@qq.com>
AuthorDate: Tue Aug 7 20:08:20 2018 +0800
KYLIN-3471 Merge dictionary using Mapreduce
---
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 30 ++-
.../apache/kylin/engine/mr/JobBuilderSupport.java | 36 +++-
.../kylin/engine/mr/common/AbstractHadoopJob.java | 41 ++++
.../kylin/engine/mr/common/BatchConstants.java | 1 +
.../kylin/engine/mr/steps/MergeDictionaryJob.java | 234 +++++++++++++++++++++
.../engine/mr/steps/MergeDictionaryMapper.java | 207 ++++++++++++++++++
.../engine/mr/steps/MergeDictionaryReducer.java | 45 ++++
.../engine/mr/steps/UpdateDictionaryStep.java | 151 +++++++++++++
9 files changed, 740 insertions(+), 6 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index f6ad0ed..c805f8a 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -54,6 +54,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary from Old Segment";
public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
+ public static final String STEP_NAME_MERGE_UPDATE_DICTIONARY = "Update Dictionary Data";
public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge Cuboid Statistics with Old for Optimization";
public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index d443f52..d902708 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -20,7 +20,12 @@ package org.apache.kylin.engine.mr;
import java.util.List;
+import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,8 +60,8 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
// Phase 1: Merge Dictionary
inputSide.addStepPhase1_MergeDictionary(result);
- result.addTask(createMergeDictionaryStep(mergingSegmentIds));
- result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
+ result.addTask(createMergeDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
+ result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, mergingSegmentIds));
outputSide.addStepPhase1_MergeDictionary(result);
// Phase 2: Merge Cube Files
@@ -69,4 +74,25 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
return result;
}
+ public MapReduceExecutable createMergeDictionaryStep(CubeSegment seg, String jobID, List<String> mergingSegmentIds) {
+ MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
+ mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobID));
+ appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt(), StringUtil.join(mergingSegmentIds, ","));
+ appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
+ appendExecCmdParameters(cmd, MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Merge_Dictionary_" + seg.getCubeInstance().getName() + "_Step");
+
+ mergeDictionaryStep.setMapReduceParams(cmd.toString());
+ mergeDictionaryStep.setMapReduceJobClass(MergeDictionaryJob.class);
+
+ return mergeDictionaryStep;
+ }
+
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 42e0f42..649b4c3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -19,10 +19,14 @@
package org.apache.kylin.engine.mr;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.cube.model.CubeDesc;
@@ -39,11 +43,12 @@ import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
+import org.apache.kylin.engine.mr.steps.UpdateDictionaryStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.base.Preconditions;
-import org.apache.kylin.metadata.model.TblColRef;
/**
* Hold reusable steps for builders.
@@ -98,6 +103,22 @@ public class JobBuilderSupport {
return result;
}
+ public UpdateDictionaryStep createUpdateDictionaryStep(CubeSegment seg, String jobId, List<String> mergingSegmentIds) {
+ UpdateDictionaryStep result = new UpdateDictionaryStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_UPDATE_DICTIONARY);
+
+ CubingExecutableUtil.setCubeName(seg.getRealization().getName(), result.getParams());
+ CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+ CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, result.getParams());
+
+ // merged dict info path
+ result.getParams().put(BatchConstants.ARG_DICT_PATH, getDictInfoPath(jobId));
+ // metadata url
+ result.getParams().put(BatchConstants.ARG_META_URL, getSegmentMetadataUrl(seg.getConfig(), jobId));
+
+ return result;
+ }
+
public MapReduceExecutable createBuildUHCDictStep(String jobId) {
MapReduceExecutable result = new MapReduceExecutable();
result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
@@ -192,7 +213,6 @@ public class JobBuilderSupport {
return result;
}
-
public boolean isEnableUHCDictStep() {
if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
return false;
@@ -220,7 +240,6 @@ public class JobBuilderSupport {
return lookupMaterializeContext;
}
-
public SaveStatisticsStep createSaveStatisticsStep(String jobId) {
SaveStatisticsStep result = new SaveStatisticsStep();
result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
@@ -276,6 +295,10 @@ public class JobBuilderSupport {
return getRealizationRootPath(jobId) + "/dict";
}
+ public String getDictInfoPath(String jobId) {
+ return getRealizationRootPath(jobId) + "/dict_info";
+ }
+
public String getOptimizationRootPath(String jobId) {
return getRealizationRootPath(jobId) + "/optimize";
}
@@ -327,7 +350,6 @@ public class JobBuilderSupport {
return getRealizationRootPath(jobId) + "/metadata";
}
-
public static String extractJobIDFromPath(String path) {
Matcher matcher = JOB_NAME_PATTERN.matcher(path);
// check the first occurrence
@@ -337,4 +359,10 @@ public class JobBuilderSupport {
throw new IllegalStateException("Can not extract job ID from file path : " + path);
}
}
+
+ public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+ Map<String, String> param = new HashMap<>();
+ param.put("path", getDumpMetadataPath(jobId));
+ return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
+ }
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 716eafe..180b56a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -27,6 +27,7 @@ import static org.apache.hadoop.util.StringUtils.formatTime;
import static org.apache.kylin.engine.mr.common.JobRelatedMetaUtil.collectCubeMetadata;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
@@ -60,7 +61,9 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
+import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.ResourceTool;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
@@ -126,6 +129,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg()
.isRequired(true).withDescription("Lookup table snapshotID")
.create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
+ protected static final Option OPTION_META_URL = OptionBuilder.withArgName(BatchConstants.ARG_META_URL)
+ .hasArg().isRequired(true).withDescription("HDFS metadata url").create(BatchConstants.ARG_META_URL);
+
private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
@@ -548,6 +554,41 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
dumpKylinPropsAndMetadata(cube.getProject(), dumpList, cube.getConfig(), conf);
}
+ protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, String metaUrl) throws IOException {
+ Set<String> dumpList = new LinkedHashSet<>();
+ dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+ for (CubeSegment segment : segments) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ dumpList.add(segment.getStatisticsResourcePath());
+ }
+ dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig(), metaUrl);
+ }
+
+ private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig, String metadataUrl)
+ throws IOException {
+ File tmp = File.createTempFile("kylin_job_meta", "");
+ FileUtils.forceDelete(tmp); // we need a directory, so delete the file first
+
+ File metaDir = new File(tmp, "meta");
+ metaDir.mkdirs();
+
+ // dump metadata
+ JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
+
+ // write kylin.properties
+ Properties props = kylinConfig.exportToProperties();
+ props.setProperty("kylin.metadata.url", metadataUrl);
+
+ File kylinPropsFile = new File(metaDir, "kylin.properties");
+ try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
+ props.store(os, kylinPropsFile.getAbsolutePath());
+ }
+
+ KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
+ //upload metadata
+ ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), dstConfig);
+ }
+
protected void attachSegmentMetadataWithDict(CubeSegment segment, Configuration conf) throws IOException {
attachSegmentMetadata(segment, conf, true, false);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d38f7a4..a4a52ad 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -103,6 +103,7 @@ public interface BatchConstants {
String ARG_TABLE_NAME = "tableName";
String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
+ String ARG_META_URL = "metadataUrl";
/**
* logger and counter
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
new file mode 100644
index 0000000..32e8e0c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.Segments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class MergeDictionaryJob extends AbstractHadoopJob {
+ private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryJob.class);
+
+ public static final Option OPTION_MERGE_SEGMENT_IDS = OptionBuilder.withArgName("segmentIds").hasArg()
+ .isRequired(true).withDescription("Merging Cube Segment Ids").create("segmentIds");
+ public static final Option OPTION_OUTPUT_PATH_DICT = OptionBuilder.withArgName("dictOutputPath").hasArg()
+ .isRequired(true).withDescription("merged dictionary resource path").create("dictOutputPath");
+ public static final Option OPTION_OUTPUT_PATH_STAT = OptionBuilder.withArgName("statOutputPath").hasArg()
+ .isRequired(true).withDescription("merged statistics resource path").create("statOutputPath");
+
+ @Override
+ public int run(String[] args) throws Exception {
+ try {
+ Options options = new Options();
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_MERGE_SEGMENT_IDS);
+ options.addOption(OPTION_OUTPUT_PATH_DICT);
+ options.addOption(OPTION_OUTPUT_PATH_STAT);
+ parseOptions(options, args);
+
+ final String segmentId = getOptionValue(OPTION_SEGMENT_ID);
+ final String segmentIds = getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+ final String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ final String metaUrl = getOptionValue(OPTION_META_URL);
+ final String dictOutputPath = getOptionValue(OPTION_OUTPUT_PATH_DICT);
+ final String statOutputPath = getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ CubeSegment segment = cube.getSegmentById(segmentId);
+ Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ job.getConfiguration().set(BatchConstants.ARG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(OPTION_META_URL.getOpt(), metaUrl);
+ job.getConfiguration().set(OPTION_SEGMENT_ID.getOpt(), segmentId);
+ job.getConfiguration().set(OPTION_MERGE_SEGMENT_IDS.getOpt(), segmentIds);
+ job.getConfiguration().set(OPTION_OUTPUT_PATH_STAT.getOpt(), statOutputPath);
+ job.getConfiguration().set("num.map.tasks", String.valueOf(cubeDesc.getAllColumnsNeedDictionaryBuilt().size() + 1));
+ job.setNumReduceTasks(1);
+
+ setJobClasspath(job, cube.getConfig());
+
+ // dump metadata to HDFS
+ attachSegmentsMetadataWithDict(mergingSeg, metaUrl);
+
+ // clean output dir
+ HadoopUtil.deletePath(job.getConfiguration(), new Path(dictOutputPath));
+
+ job.setMapperClass(MergeDictionaryMapper.class);
+ job.setReducerClass(MergeDictionaryReducer.class);
+
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ job.setInputFormatClass(IndexArrInputFormat.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.NONE);
+ SequenceFileOutputFormat.setOutputPath(job, new Path(dictOutputPath));
+
+ logger.info("Starting: " + job.getJobName());
+
+ return waitForCompletion(job);
+
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+ }
+
+ static class IndexArrInputFormat extends InputFormat<IntWritable, NullWritable> {
+
+ @Override
+ public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
+ int numMapTasks = jobContext.getConfiguration().getInt("num.map.tasks", 0);
+ List<InputSplit> inputSplits = Lists.newArrayListWithCapacity(numMapTasks);
+
+ for (int i = 0; i < numMapTasks; i++) {
+ inputSplits.add(new IntInputSplit(i));
+ }
+
+ return inputSplits;
+ }
+
+ @Override
+ public RecordReader<IntWritable, NullWritable> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+
+ return new RecordReader<IntWritable, NullWritable>() {
+ private int index;
+ private IntWritable key;
+ private NullWritable value;
+
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException, InterruptedException {
+ IntInputSplit intInputSplit = (IntInputSplit) inputSplit;
+ index = intInputSplit.getIndex();
+ }
+
+ @Override
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+
+ if (key == null) {
+ key = new IntWritable(index);
+ value = NullWritable.get();
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public IntWritable getCurrentKey() throws IOException, InterruptedException {
+ return key;
+ }
+
+ @Override
+ public NullWritable getCurrentValue() throws IOException, InterruptedException {
+ return value;
+ }
+
+ @Override
+ public float getProgress() throws IOException, InterruptedException {
+ return 1;
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ }
+ };
+ }
+ }
+
+ static class IntInputSplit extends InputSplit implements Writable {
+ private int index;
+
+ public IntInputSplit() {
+
+ }
+
+ public IntInputSplit(int index) {
+ this.index = index;
+ }
+
+ @Override
+ public void write(DataOutput dataOutput) throws IOException {
+ dataOutput.writeInt(index);
+ }
+
+ @Override
+ public void readFields(DataInput dataInput) throws IOException {
+ this.index = dataInput.readInt();
+ }
+
+ @Override
+ public long getLength() throws IOException, InterruptedException {
+ return 1L;
+ }
+
+ @Override
+ public String[] getLocations() throws IOException, InterruptedException {
+ return new String[0];
+ }
+
+ public int getIndex() {
+ return index;
+ }
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
new file mode 100644
index 0000000..522c06a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class MergeDictionaryMapper extends KylinMapper<IntWritable, NullWritable, IntWritable, Text> {
+ private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryMapper.class);
+
+ List<CubeSegment> mergingSegments;
+ TblColRef[] tblColRefs;
+ DictionaryManager dictMgr;
+
+ @Override
+ protected void doSetup(Context context) throws IOException, InterruptedException {
+ super.doSetup(context);
+
+ final SerializableConfiguration sConf = new SerializableConfiguration(context.getConfiguration());
+ final String metaUrl = context.getConfiguration().get(BatchConstants.ARG_META_URL);
+ final String cubeName = context.getConfiguration().get(BatchConstants.ARG_CUBE_NAME);
+ final String segmentIds = context.getConfiguration().get(MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt());
+
+ final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeInstance.getDescName());
+
+ mergingSegments = getMergingSegments(cubeInstance, segmentIds.split(","));
+ tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+ dictMgr = DictionaryManager.getInstance(kylinConfig);
+ }
+
+ @Override
+ protected void doMap(IntWritable key, NullWritable value, Context context)
+ throws IOException, InterruptedException {
+
+ int index = key.get();
+
+ if (index < tblColRefs.length) {
+ // merge dictionary
+ TblColRef col = tblColRefs[index];
+ List<DictionaryInfo> dictInfos = Lists.newArrayList();
+ for (CubeSegment segment : mergingSegments) {
+ if (segment.getDictResPath(col) != null) {
+ DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+ if (dictInfo != null && !dictInfos.contains(dictInfo)) {
+ dictInfos.add(dictInfo);
+ }
+ }
+ }
+
+ DictionaryInfo mergedDictInfo = dictMgr.mergeDictionary(dictInfos);
+ String tblCol = col.getTableAlias() + ":" + col.getName();
+ String dictInfoPath = mergedDictInfo == null ? "" : mergedDictInfo.getResourcePath();
+
+ context.write(new IntWritable(-1), new Text(tblCol + "=" + dictInfoPath));
+
+ } else {
+ // merge statistics
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(new SerializableConfiguration(context.getConfiguration()), context.getConfiguration().get(BatchConstants.ARG_META_URL));
+
+ final String cubeName = context.getConfiguration().get(BatchConstants.ARG_CUBE_NAME);
+ final String segmentId = context.getConfiguration().get(BatchConstants.ARG_SEGMENT_ID);
+ final String statOutputPath = context.getConfiguration().get(MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt());
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+ logger.info("Statistics output path: {}", statOutputPath);
+
+ CubeSegment newSegment = cubeInstance.getSegmentById(segmentId);
+ ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+ Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+ Configuration conf = null;
+ int averageSamplingPercentage = 0;
+
+ for (CubeSegment cubeSegment : mergingSegments) {
+ String filePath = cubeSegment.getStatisticsResourcePath();
+ InputStream is = rs.getResource(filePath).inputStream;
+ File tempFile;
+ FileOutputStream tempFileStream = null;
+
+ try {
+ tempFile = File.createTempFile(segmentId, ".seq");
+ tempFileStream = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+ } finally {
+ IOUtils.closeStream(is);
+ IOUtils.closeStream(tempFileStream);
+ }
+
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+ SequenceFile.Reader reader = null;
+ try {
+ conf = HadoopUtil.getCurrentConfiguration();
+ //noinspection deprecation
+ reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+ LongWritable keyW = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ BytesWritable valueW = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ while (reader.next(keyW, valueW)) {
+ if (keyW.get() == 0L) {
+ // sampling percentage;
+ averageSamplingPercentage += Bytes.toInt(valueW.getBytes());
+ } else if (keyW.get() > 0) {
+ HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+ ByteArray byteArray = new ByteArray(valueW.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ if (cuboidHLLMap.get(keyW.get()) != null) {
+ cuboidHLLMap.get(keyW.get()).merge(hll);
+ } else {
+ cuboidHLLMap.put(keyW.get(), hll);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ IOUtils.closeStream(reader);
+ }
+ }
+
+ averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+ Path statisticsFilePath = new Path(statOutputPath, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+ FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+ FSDataInputStream fis = fs.open(statisticsFilePath);
+
+ try {
+ // put the statistics to metadata store
+ String statisticsFileName = newSegment.getStatisticsResourcePath();
+ rs.putResource(statisticsFileName, fis, System.currentTimeMillis());
+ } finally {
+ IOUtils.closeStream(fis);
+ }
+
+ context.write(new IntWritable(-1), new Text(""));
+ }
+ }
+
+ private List<CubeSegment> getMergingSegments(CubeInstance cube, String[] segmentIds) {
+ List<CubeSegment> result = Lists.newArrayListWithCapacity(segmentIds.length);
+ for (String id : segmentIds) {
+ result.add(cube.getSegmentById(id));
+ }
+ return result;
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
new file mode 100644
index 0000000..1eb3c07
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeDictionaryReducer extends KylinReducer<IntWritable, Text, Text, Text> {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeDictionaryReducer.class);
+
+ @Override
+ protected void doReduce(IntWritable key, Iterable<Text> values, Context context)
+ throws IOException, InterruptedException {
+ for (Text text : values) {
+ String value = text.toString();
+ String[] splited = value.split("=");
+ if (splited != null && splited.length == 2) {
+ logger.info("Dictionary for col {}, save at {}", splited[0], splited[1]);
+ context.write(new Text(splited[0]), new Text(splited[1]));
+ }
+ }
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
new file mode 100644
index 0000000..bcdb29f
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateDictionaryStep extends AbstractExecutable {
+ private static final Logger logger = LoggerFactory.getLogger(UpdateDictionaryStep.class);
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeMgr = CubeManager.getInstance(context.getConfig());
+ final DictionaryManager dictMgrHdfs;
+ final DictionaryManager dictMgrHbase;
+ final CubeInstance cube = cubeMgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment newSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+ final List<CubeSegment> mergingSegments = getMergingSegments(cube);
+ final String dictInfoPath = this.getParams().get(BatchConstants.ARG_DICT_PATH);
+ final String metadataUrl = this.getParams().get(BatchConstants.ARG_META_URL);
+
+ final KylinConfig kylinConfHbase = cube.getConfig();
+ final KylinConfig kylinConfHdfs = KylinConfig.createInstanceFromUri(metadataUrl);
+
+ Collections.sort(mergingSegments);
+
+ try {
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ ResourceStore hbaseRS = ResourceStore.getStore(kylinConfHbase);
+ ResourceStore hdfsRS = ResourceStore.getStore(kylinConfHdfs);
+ dictMgrHdfs = DictionaryManager.getInstance(kylinConfHdfs);
+ dictMgrHbase = DictionaryManager.getInstance(kylinConfHbase);
+
+ // work on copy instead of cached objects
+ CubeInstance cubeCopy = cube.latestCopyForWrite();
+ CubeSegment newSegCopy = cubeCopy.getSegmentById(newSegment.getUuid());
+
+ // update cube segment dictionary
+
+ FileStatus[] fileStatuss = fs.listStatus(new Path(dictInfoPath), new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ return path.getName().startsWith("part");
+ }
+ });
+
+ for (FileStatus fileStatus : fileStatuss) {
+ Path filePath = fileStatus.getPath();
+
+ SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
+ Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ while (reader.next(key, value)) {
+ String tblCol = key.toString();
+ String dictInfoResource = value.toString();
+
+ if (StringUtils.isNotEmpty(dictInfoResource)) {
+ logger.info(dictInfoResource);
+ // put dictionary file to metadata store
+ DictionaryInfo dictInfoHdfs = dictMgrHdfs.getDictionaryInfo(dictInfoResource);
+ DictionaryInfo dicInfoHbase = dictMgrHbase.trySaveNewDict(dictInfoHdfs.getDictionaryObject(), dictInfoHdfs);
+
+ if (dicInfoHbase != null){
+ TblColRef tblColRef = cube.getDescriptor().findColumnRef(tblCol.split(":")[0], tblCol.split(":")[1]);
+ newSegCopy.putDictResPath(tblColRef, dicInfoHbase.getResourcePath());
+ }
+ }
+ }
+
+ IOUtils.closeStream(reader);
+ }
+
+ CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
+ for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
+ newSegCopy.putSnapshotResPath(entry.getKey(), entry.getValue());
+ }
+
+ // update statistics
+ // put the statistics to metadata store
+ String statisticsFileName = newSegment.getStatisticsResourcePath();
+ hbaseRS.putResource(statisticsFileName, hdfsRS.getResource(newSegment.getStatisticsResourcePath()).inputStream, System.currentTimeMillis());
+
+ CubeUpdate update = new CubeUpdate(cubeCopy);
+ update.setToUpdateSegs(newSegCopy);
+ cubeMgr.updateCube(update);
+
+ return ExecuteResult.createSucceed();
+ } catch (IOException e) {
+ logger.error("fail to merge dictionary", e);
+ return ExecuteResult.createError(e);
+ }
+ }
+
+ private List<CubeSegment> getMergingSegments(CubeInstance cube) {
+ List<String> mergingSegmentIds = CubingExecutableUtil.getMergingSegmentIds(this.getParams());
+ List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
+ for (String id : mergingSegmentIds) {
+ result.add(cube.getSegmentById(id));
+ }
+ return result;
+ }
+}