You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:08 UTC
[06/18] kylin git commit: APACHE-KYLIN-2733: Introduce optimize job
for adjusting cuboid set
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
new file mode 100644
index 0000000..0379f64
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.job.exception.JobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MapReduceUtil {
+
+ private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class);
+
+ /**
+ * @param cuboidScheduler specified can provide more flexibility
+ * */
+ public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler,
+ double totalMapInputMB, int level)
+ throws ClassNotFoundException, IOException, InterruptedException, JobException {
+ CubeDesc cubeDesc = cubeSegment.getCubeDesc();
+ KylinConfig kylinConfig = cubeDesc.getConfig();
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+ logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level "
+ + level);
+
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig);
+
+ double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
+
+ if (level == -1) {
+ //merge case
+ double estimatedSize = cubeStatsReader.estimateCubeSize();
+ adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
+ logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize,
+ totalMapInputMB, adjustedCurrentLayerSizeEst);
+ } else if (level == 0) {
+ //base cuboid case TODO: the estimation could be very WRONG because it has no correction
+ adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
+ logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
+ } else {
+ parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
+ currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
+ adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
+ logger.debug(
+ "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}",
+ totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
+ }
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
+
+ // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+ if (cubeDesc.hasMemoryHungryMeasures()) {
+ logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
+ numReduceTasks = numReduceTasks * 4;
+ }
+
+ // at least 1 reducer by default
+ numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+ // no more than 500 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ return numReduceTasks;
+ }
+
+ public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler)
+ throws IOException {
+ KylinConfig kylinConfig = cubeSeg.getConfig();
+
+ Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap();
+ double totalSizeInM = 0;
+ for (Double cuboidSize : cubeSizeMap.values()) {
+ totalSizeInM += cuboidSize;
+ }
+
+ double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
+ double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
+
+ // number of reduce tasks
+ int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio);
+
+ // at least 1 reducer by default
+ numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
+ // no more than 500 reducer by default
+ numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
+
+ logger.info("Having total map input MB " + Math.round(totalSizeInM));
+ logger.info("Having per reduce MB " + perReduceInputMB);
+ logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
+ return numReduceTasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
index 9c805a8..a5a1ba8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java
@@ -38,7 +38,7 @@ public class StatisticsDecisionUtil {
protected static final Logger logger = LoggerFactory.getLogger(StatisticsDecisionUtil.class);
public static void decideCubingAlgorithm(CubingJob cubingJob, CubeSegment seg) throws IOException {
- CubeStatsReader cubeStats = new CubeStatsReader(seg, seg.getConfig());
+ CubeStatsReader cubeStats = new CubeStatsReader(seg, null, seg.getConfig());
decideCubingAlgorithm(cubingJob, seg, cubeStats.getMapperOverlapRatioOfFirstBuild(),
cubeStats.getMapperNumberOfFirstBuild());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 1ae6cd0..0ad4b9e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -60,7 +60,7 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<K
@Override
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
final KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata();
cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
new file mode 100644
index 0000000..b60076c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob {
+
+ private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+ options.addOption(OPTION_CUBOID_MODE);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+ String cuboidMode = getOptionValue(OPTION_CUBOID_MODE);
+
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+
+ job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidMode);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+ job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job, cube.getConfig());
+
+ setupMapper(input);
+ setupReducer(output, 1);
+
+ attachSegmentMetadataWithDict(cubeSegment, job.getConfiguration());
+
+ return waitForCompletion(job);
+
+ } catch (Exception e) {
+ logger.error("error in CalculateStatsFromBaseCuboidJob", e);
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+ }
+
+ private void setupMapper(Path input) throws IOException {
+ FileInputFormat.setInputPaths(job, input);
+ job.setMapperClass(CalculateStatsFromBaseCuboidMapper.class);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ }
+
+ private void setupReducer(Path output, int numberOfReducers) throws IOException {
+ job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Text.class);
+ job.setNumReduceTasks(numberOfReducers);
+
+ FileOutputFormat.setOutputPath(job, output);
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+ deletePath(job.getConfiguration(), output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java
new file mode 100644
index 0000000..1b32944
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapper.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinVersion;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+public class CalculateStatsFromBaseCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
+ private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidMapper.class);
+
+ protected int nRowKey;
+ protected long baseCuboidId;
+
+ private int samplingPercentage;
+ private int rowCount = 0;
+ private long[] rowHashCodesLong = null;
+ //about details of the new algorithm, please see KYLIN-2518
+ private boolean isUsePutRowKeyToHllNewAlgorithm;
+
+ private HLLCounter[] allCuboidsHLL = null;
+ private Long[] cuboidIds;
+ private Integer[][] allCuboidsBitSet = null;
+ private HashFunction hf = null;
+
+ RowKeyDecoder rowKeyDecoder;
+
+ protected Text outputKey = new Text();
+ protected Text outputValue = new Text();
+
+ @Override
+ protected void doSetup(Context context) throws IOException {
+ Configuration conf = context.getConfiguration();
+ HadoopUtil.setCurrentConfiguration(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ CubeSegment cubeSegment = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
+
+ baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+ nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+ String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE);
+ Set<Long> cuboidIdSet = cube.getCuboidsByMode(cuboidModeName);
+
+ cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+ allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
+
+ samplingPercentage = Integer
+ .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+
+ allCuboidsHLL = new HLLCounter[cuboidIds.length];
+ for (int i = 0; i < cuboidIds.length; i++) {
+ allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+ }
+
+ //for KYLIN-2518 backward compatibility
+ if (KylinVersion.isBefore200(cubeDesc.getVersion())) {
+ isUsePutRowKeyToHllNewAlgorithm = false;
+ hf = Hashing.murmur3_32();
+ logger.info("Found KylinVersion : {}. Use old algorithm for cuboid sampling.", cubeDesc.getVersion());
+ } else {
+ isUsePutRowKeyToHllNewAlgorithm = true;
+ rowHashCodesLong = new long[nRowKey];
+ hf = Hashing.murmur3_128();
+ logger.info(
+ "Found KylinVersion : {}. Use new algorithm for cuboid sampling. About the details of the new algorithm, please refer to KYLIN-2518",
+ cubeDesc.getVersion());
+ }
+
+ rowKeyDecoder = new RowKeyDecoder(cubeSegment);
+ }
+
+ @Override
+ public void doMap(Text key, Text value, Context context) throws InterruptedException, IOException {
+ long cuboidID = rowKeyDecoder.decode(key.getBytes());
+ if (cuboidID != baseCuboidId) {
+ return; // Skip data from cuboids which are not the base cuboid
+ }
+
+ List<String> keyValues = rowKeyDecoder.getValues();
+
+ if (rowCount < samplingPercentage) {
+ Preconditions.checkArgument(nRowKey == keyValues.size());
+
+ String[] row = keyValues.toArray(new String[keyValues.size()]);
+ if (isUsePutRowKeyToHllNewAlgorithm) {
+ putRowKeyToHLLNew(row);
+ } else {
+ putRowKeyToHLLOld(row);
+ }
+ }
+
+ if (++rowCount == 100)
+ rowCount = 0;
+ }
+
+ public void putRowKeyToHLLOld(String[] row) {
+ //generate hash for each row key column
+ byte[][] rowHashCodes = new byte[nRowKey][];
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[i];
+ if (colValue != null) {
+ rowHashCodes[i] = hc.putString(colValue).hash().asBytes();
+ } else {
+ rowHashCodes[i] = hc.putInt(0).hash().asBytes();
+ }
+ }
+
+ // use the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0; i < cuboidIds.length; i++) {
+ Hasher hc = hf.newHasher();
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ hc.putBytes(rowHashCodes[allCuboidsBitSet[i][position]]);
+ }
+
+ allCuboidsHLL[i].add(hc.hash().asBytes());
+ }
+ }
+
+ private void putRowKeyToHLLNew(String[] row) {
+ //generate hash for each row key column
+ for (int i = 0; i < nRowKey; i++) {
+ Hasher hc = hf.newHasher();
+ String colValue = row[i];
+ if (colValue == null)
+ colValue = "0";
+ byte[] bytes = hc.putString(colValue).hash().asBytes();
+ rowHashCodesLong[i] = (Bytes.toLong(bytes) + i);//add column ordinal to the hash value to distinguish between (a,b) and (b,a)
+ }
+
+ // user the row key column hash to get a consolidated hash for each cuboid
+ for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+ long value = 0;
+ for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+ value += rowHashCodesLong[allCuboidsBitSet[i][position]];
+ }
+ allCuboidsHLL[i].addHashDirectly(value);
+ }
+ }
+
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+ HLLCounter hll;
+ for (int i = 0; i < cuboidIds.length; i++) {
+ hll = allCuboidsHLL[i];
+
+ outputKey.set(Bytes.toBytes(cuboidIds[i]));
+ hllBuf.clear();
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array(), 0, hllBuf.position());
+ context.write(outputKey, outputValue);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
new file mode 100644
index 0000000..489dac4
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidReducer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class CalculateStatsFromBaseCuboidReducer extends KylinReducer<Text, Text, NullWritable, Text> {
+
+ private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidReducer.class);
+
+ private KylinConfig cubeConfig;
+ protected long baseCuboidId;
+ protected Map<Long, HLLCounter> cuboidHLLMap = null;
+ private List<Long> baseCuboidRowCountInMappers;
+ private long totalRowsBeforeMerge = 0;
+
+ private String output = null;
+ private int samplingPercentage;
+
+ @Override
+ protected void doSetup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeConfig = cube.getConfig();
+
+ baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
+ baseCuboidRowCountInMappers = Lists.newLinkedList();
+
+ output = conf.get(BatchConstants.CFG_OUTPUT_PATH);
+ samplingPercentage = Integer
+ .parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+
+ cuboidHLLMap = Maps.newHashMap();
+ }
+
+ @Override
+ public void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ long cuboidId = Bytes.toLong(key.getBytes());
+ logger.info("Cuboid id to be processed: " + cuboidId);
+ for (Text value : values) {
+ HLLCounter hll = new HLLCounter(cubeConfig.getCubeStatsHLLPrecision());
+ ByteBuffer bf = ByteBuffer.wrap(value.getBytes(), 0, value.getLength());
+ hll.readRegisters(bf);
+
+ if (cuboidId == baseCuboidId) {
+ baseCuboidRowCountInMappers.add(hll.getCountEstimate());
+ }
+
+ totalRowsBeforeMerge += hll.getCountEstimate();
+
+ if (cuboidHLLMap.get(cuboidId) != null) {
+ cuboidHLLMap.get(cuboidId).merge(hll);
+ } else {
+ cuboidHLLMap.put(cuboidId, hll);
+ }
+ }
+ }
+
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ long grandTotal = 0;
+ for (HLLCounter hll : cuboidHLLMap.values()) {
+ grandTotal += hll.getCountEstimate();
+ }
+ double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal;
+
+ CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(output), //
+ cuboidHLLMap, samplingPercentage, baseCuboidRowCountInMappers.size(), mapperOverlapRatio);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
new file mode 100644
index 0000000..3341be9
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CopyDictionaryStep.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class CopyDictionaryStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(CopyDictionaryStep.class);
+
+ public CopyDictionaryStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager mgr = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+ CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
+ Preconditions.checkNotNull(oldSegment,
+ "cannot find the original segment to be optimized by " + optimizeSegment);
+
+ // --- Copy dictionary
+ optimizeSegment.getDictionaries().putAll(oldSegment.getDictionaries());
+ optimizeSegment.getSnapshots().putAll(oldSegment.getSnapshots());
+ optimizeSegment.getRowkeyStats().addAll(oldSegment.getRowkeyStats());
+
+ try {
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToUpdateSegs(optimizeSegment);
+ mgr.updateCube(cubeBuilder);
+ } catch (IOException e) {
+ logger.error("fail to merge dictionary or lookup snapshots", e);
+ return ExecuteResult.createError(e);
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
index 65c5869..e06077a 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java
@@ -38,6 +38,7 @@ import javax.annotation.Nullable;
public class CubingExecutableUtil {
public static final String CUBE_NAME = "cubeName";
+ public static final String SEGMENT_NAME = "segmentName";
public static final String SEGMENT_ID = "segmentId";
public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
public static final String STATISTICS_PATH = "statisticsPath";
@@ -61,6 +62,14 @@ public class CubingExecutableUtil {
return params.get(CUBE_NAME);
}
+ public static void setSegmentName(String segmentName, Map<String, String> params) {
+ params.put(SEGMENT_NAME, segmentName);
+ }
+
+ public static String getSegmentName(Map<String, String> params) {
+ return params.get(SEGMENT_NAME);
+ }
+
public static void setSegmentId(String segmentId, Map<String, String> params) {
params.put(SEGMENT_ID, segmentId);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 6a8ba4c..b49b639 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -28,12 +28,15 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
import org.apache.kylin.job.execution.ExecutableManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,18 +53,27 @@ public class CuboidJob extends AbstractHadoopJob {
private boolean skipped = false;
+ private CuboidScheduler cuboidScheduler;
+
@Override
public boolean isSkipped() {
return skipped;
}
- private boolean checkSkip(String cubingJobId) {
+ private boolean checkSkip(String cubingJobId, int level) {
if (cubingJobId == null)
return false;
ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId);
skipped = cubingJob.isLayerCubing() == false;
+ if (!skipped) {
+ skipped = (level > cuboidScheduler.getBuildLevel());
+ if (skipped) {
+ logger.info("Job level: " + level + " for " + cubingJobId + "[" + cubingJobId
+ + "] exceeds real cuboid tree levels : " + cuboidScheduler.getBuildLevel());
+ }
+ }
return skipped;
}
@@ -80,6 +92,7 @@ public class CuboidJob extends AbstractHadoopJob {
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_NCUBOID_LEVEL);
options.addOption(OPTION_CUBING_JOB_ID);
+ options.addOption(OPTION_CUBOID_MODE);
parseOptions(options, args);
String output = getOptionValue(OPTION_OUTPUT_PATH);
@@ -87,12 +100,18 @@ public class CuboidJob extends AbstractHadoopJob {
int nCuboidLevel = Integer.parseInt(getOptionValue(OPTION_NCUBOID_LEVEL));
String segmentID = getOptionValue(OPTION_SEGMENT_ID);
String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
+ String cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+ if (cuboidModeName == null) {
+ cuboidModeName = CuboidModeEnum.CURRENT.toString();
+ }
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(cubeName);
CubeSegment segment = cube.getSegmentById(segmentID);
- if (checkSkip(cubingJobId)) {
+ cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(segment, cuboidModeName);
+
+ if (checkSkip(cubingJobId, nCuboidLevel)) {
logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + segmentID + "[" + segmentID + "]");
return 0;
}
@@ -104,7 +123,7 @@ public class CuboidJob extends AbstractHadoopJob {
setJobClasspath(job, cube.getConfig());
// add metadata to distributed cache
- attachSegmentMetadataWithDict(segment, job.getConfiguration());
+ attachSegmentMetadataWithAll(segment, job.getConfiguration());
// Mapper
job.setMapperClass(this.mapperClass);
@@ -122,12 +141,13 @@ public class CuboidJob extends AbstractHadoopJob {
// set output
IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat();
- outputFormat.configureJobOutput(job, output, segment, nCuboidLevel);
+ outputFormat.configureJobOutput(job, output, segment, cuboidScheduler, nCuboidLevel);
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
job.getConfiguration().setInt(BatchConstants.CFG_CUBE_CUBOID_LEVEL, nCuboidLevel);
+ job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidModeName);
return waitForCompletion(job);
} finally {
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index 2965e2f..e9fd3bd 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -21,13 +21,13 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.List;
+import java.util.Set;
import org.apache.hadoop.io.Text;
import org.apache.kylin.common.KylinVersion;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.cuboid.CuboidUtil;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
@@ -37,7 +37,6 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
@@ -56,7 +55,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
protected boolean collectStatistics = false;
- protected CuboidScheduler cuboidScheduler = null;
protected int nRowKey;
private Integer[][] allCuboidsBitSet = null;
private HLLCounter[] allCuboidsHLL = null;
@@ -86,15 +84,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
if (collectStatistics) {
samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
- cuboidScheduler = cubeDesc.getInitialCuboidScheduler();
nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
- List<Long> cuboidIdList = Lists.newArrayList();
- List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
- addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
-
- allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
- cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
+ Set<Long> cuboidIdSet = cubeSeg.getCuboidScheduler().getAllCuboidIds();
+ cuboidIds = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+ allCuboidsBitSet = CuboidUtil.getCuboidBitSet(cuboidIds, nRowKey);
allCuboidsHLL = new HLLCounter[cuboidIds.length];
for (int i = 0; i < cuboidIds.length; i++) {
@@ -129,27 +123,6 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB
}
- private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
- allCuboids.add(cuboidId);
- Integer[] indice = new Integer[Long.bitCount(cuboidId)];
-
- long mask = Long.highestOneBit(baseCuboidId);
- int position = 0;
- for (int i = 0; i < nRowKey; i++) {
- if ((mask & cuboidId) > 0) {
- indice[position] = i;
- position++;
- }
- mask = mask >> 1;
- }
-
- allCuboidsBitSet.add(indice);
- Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
- for (Long childId : children) {
- addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
- }
- }
-
@Override
public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
Collection<String[]> rowCollection = flatTableInputFormat.parseMapperInput(record);
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 74c8c2c..0f65a3e 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -110,6 +110,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
if (collectStatistics && (taskId == numberOfTasks - 1)) {
// hll
isStatistics = true;
+ baseCuboidId = cube.getCuboidScheduler().getBaseCuboidId();
baseCuboidRowCountInMappers = Lists.newArrayList();
cuboidHLLMap = Maps.newHashMap();
samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
new file mode 100644
index 0000000..97f9dc1
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataJob.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FilterRecommendCuboidDataJob extends AbstractHadoopJob {
+
+ private static final Logger logger = LoggerFactory.getLogger(FilterRecommendCuboidDataJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment optSegment = cube.getSegmentById(segmentID);
+ CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job, cube.getConfig());
+
+ // Mapper
+ job.setMapperClass(FilterRecommendCuboidDataMapper.class);
+
+ // Reducer
+ job.setNumReduceTasks(0);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ // Input
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, input);
+ // Output
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, output);
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+ // add metadata to distributed cache
+ attachSegmentMetadataWithDict(originalSegment, job.getConfiguration());
+
+ this.deletePath(job.getConfiguration(), output);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in CuboidJob", e);
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
new file mode 100644
index 0000000..d8ae3ab
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FilterRecommendCuboidDataMapper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+
+import com.google.common.base.Preconditions;
+
+public class FilterRecommendCuboidDataMapper extends KylinMapper<Text, Text, Text, Text> {
+
+ private MultipleOutputs mos;
+
+ private RowKeySplitter rowKeySplitter;
+ private long baseCuboid;
+ private Set<Long> recommendCuboids;
+
+ @Override
+ protected void doSetup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ mos = new MultipleOutputs(context);
+
+ String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+ String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ CubeSegment optSegment = cube.getSegmentById(segmentID);
+ CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+
+ rowKeySplitter = new RowKeySplitter(originalSegment, 65, 255);
+ baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+
+ recommendCuboids = cube.getCuboidsRecommend();
+ Preconditions.checkNotNull(recommendCuboids, "The recommend cuboid map could not be null");
+ }
+
+ @Override
+ public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
+ long cuboidID = rowKeySplitter.split(key.getBytes());
+ if (cuboidID != baseCuboid && !recommendCuboids.contains(cuboidID)) {
+ return;
+ }
+
+ String baseOutputPath = PathNameCuboidOld;
+ if (cuboidID == baseCuboid) {
+ baseOutputPath = PathNameCuboidBase;
+ }
+ mos.write(key, value, generateFileName(baseOutputPath));
+ }
+
+ @Override
+ public void doCleanup(Context context) throws IOException, InterruptedException {
+ mos.close();
+
+ Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase);
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ if (!fs.exists(outputDirBase)) {
+ fs.mkdirs(outputDirBase);
+ SequenceFile
+ .createWriter(context.getConfiguration(),
+ SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")),
+ SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class))
+ .close();
+ }
+ }
+
+ private String generateFileName(String subDir) {
+ return subDir + "/part";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
new file mode 100644
index 0000000..62109f4
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidJob.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class InMemCuboidFromBaseCuboidJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidFromBaseCuboidJob.class);
+
+ private boolean skipped = false;
+
+ @Override
+ public boolean isSkipped() {
+ return skipped;
+ }
+
+ private boolean checkSkip(String cubingJobId) {
+ if (cubingJobId == null)
+ return false;
+
+ ExecutableManager execMgr = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId);
+ skipped = !cubingJob.isInMemCubing();
+ return skipped;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_CUBING_JOB_ID);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_CUBOID_MODE);
+ parseOptions(options, args);
+
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ String output = getOptionValue(OPTION_OUTPUT_PATH);
+
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment cubeSeg = cube.getSegmentById(segmentID);
+ String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
+
+ String cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
+ if (cuboidModeName == null) {
+ cuboidModeName = CuboidModeEnum.CURRENT.toString();
+ }
+
+ CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSeg, cuboidModeName);
+
+ if (checkSkip(cubingJobId)) {
+ logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " for " + cubeSeg);
+ return 0;
+ }
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job, cube.getConfig());
+
+ // add metadata to distributed cache
+ attachSegmentMetadataWithAll(cubeSeg, job.getConfiguration());
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+ job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidModeName);
+
+ String input = getOptionValue(OPTION_INPUT_PATH);
+ FileInputFormat.setInputPaths(job, new Path(input));
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+
+ // set mapper
+ job.setMapperClass(InMemCuboidFromBaseCuboidMapper.class);
+ job.setMapOutputKeyClass(ByteArrayWritable.class);
+ job.setMapOutputValueClass(ByteArrayWritable.class);
+
+ // set output
+ job.setReducerClass(InMemCuboidFromBaseCuboidReducer.class);
+ job.setNumReduceTasks(MapReduceUtil.getInmemCubingReduceTaskNum(cubeSeg, cuboidScheduler));
+
+ // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ Path outputPath = new Path(output);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ HadoopUtil.deletePath(job.getConfiguration(), outputPath);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in CuboidJob", e);
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ InMemCuboidFromBaseCuboidJob job = new InMemCuboidFromBaseCuboidJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
new file mode 100644
index 0000000..73a39d6
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidMapper.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnitForBaseCuboid;
+import org.apache.kylin.cube.kv.CubeDimEncMap;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class InMemCuboidFromBaseCuboidMapper
+ extends InMemCuboidMapperBase<Text, Text, ByteArrayWritable, ByteArrayWritable, ByteArray> {
+ private static final Log logger = LogFactory.getLog(InMemCuboidFromBaseCuboidMapper.class);
+
+ private ByteBuffer keyValueBuffer;
+ private int keyOffset;
+
+ @Override
+ protected void doSetup(Mapper.Context context) throws IOException {
+ super.doSetup(context);
+
+ long baseCuboid = Cuboid.getBaseCuboidId(cubeDesc);
+ GTInfo gtInfo = CubeGridTable.newGTInfo(Cuboid.findForMandatory(cubeDesc, baseCuboid),
+ new CubeDimEncMap(cubeDesc, dictionaryMap));
+ keyValueBuffer = ByteBuffer.allocate(gtInfo.getMaxRecordLength());
+ keyOffset = cubeSegment.getRowKeyPreambleSize();
+ }
+
+ @Override
+ protected InputConverterUnit<ByteArray> getInputConverterUnit() {
+ return new InputConverterUnitForBaseCuboid();
+ }
+
+ @Override
+ protected Future getCubingThreadFuture(Context context, Map<TblColRef, Dictionary<String>> dictionaryMap,
+ int reserveMemoryMB, CuboidScheduler cuboidScheduler, InputConverterUnit<ByteArray> inputConverterUnit) {
+ AbstractInMemCubeBuilder cubeBuilder = new DoggedCubeBuilder(cuboidScheduler, flatDesc, dictionaryMap);
+ cubeBuilder.setReserveMemoryMB(reserveMemoryMB);
+ cubeBuilder.setConcurrentThreads(taskThreadCount);
+
+ ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("inmemory-cube-building-from-base-cuboid-mapper-%d").build());
+ return executorService.submit(cubeBuilder.buildAsRunnable(queue, inputConverterUnit,
+ new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
+ }
+
+ @Override
+ protected ByteArray getRecordFromKeyValue(Text key, Text value) {
+ keyValueBuffer.clear();
+ keyValueBuffer.put(key.getBytes(), keyOffset, key.getBytes().length - keyOffset);
+ keyValueBuffer.put(value.getBytes());
+
+ byte[] keyValue = new byte[keyValueBuffer.position()];
+ System.arraycopy(keyValueBuffer.array(), 0, keyValue, 0, keyValueBuffer.position());
+
+ return new ByteArray(keyValue);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java
new file mode 100644
index 0000000..fbc45d9
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidFromBaseCuboidReducer.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+public class InMemCuboidFromBaseCuboidReducer extends InMemCuboidReducer {
+ //pass
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index 73a2eb9..b0ea7b7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -93,7 +93,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
setJobClasspath(job, cube.getConfig());
// add metadata to distributed cache
- attachSegmentMetadataWithDict(segment, job.getConfiguration());
+ attachSegmentMetadataWithAll(segment, job.getConfiguration());
// set job configuration
job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
@@ -116,7 +116,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
// set output
IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat();
- outputFormat.configureJobOutput(job, output, segment, 0);
+ outputFormat.configureJobOutput(job, output, segment, segment.getCuboidScheduler(), 0);
return waitForCompletion(job);
} finally {
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index 43f95e5..7b4738b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -33,8 +33,6 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.cuboid.DefaultCuboidScheduler;
-import org.apache.kylin.cube.cuboid.TreeCuboidSchedulerManager;
import org.apache.kylin.cube.inmemcubing.ConsumeBlockingQueueController;
import org.apache.kylin.cube.inmemcubing.InputConverterUnit;
import org.apache.kylin.cube.model.CubeDesc;
@@ -43,7 +41,7 @@ import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
@@ -114,11 +112,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, T>
}
String cuboidModeName = conf.get(BatchConstants.CFG_CUBOID_MODE);
- CuboidScheduler cuboidScheduler = TreeCuboidSchedulerManager.getTreeCuboidScheduler(cubeDesc, //
- CuboidStatsReaderUtil.readCuboidStatsFromSegment(cube.getCuboidsByMode(cuboidModeName), cubeSegment));
- if (cuboidScheduler == null) {
- cuboidScheduler = new DefaultCuboidScheduler(cubeDesc);
- }
+ CuboidScheduler cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidModeName);
taskThreadCount = config.getCubeAlgorithmInMemConcurrentThreads();
reserveMemoryMB = calculateReserveMB(conf);
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
index 2058bc9..60d0870 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -95,7 +95,7 @@ public abstract class KVGTRecordWriter implements ICuboidWriter {
protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws IOException;
private void initVariables(Long cuboidId) {
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeSegment, cuboidId));
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findForMandatory(cubeDesc, cuboidId));
keyBuf = rowKeyEncoder.createBuf();
dimensions = Long.bitCount(cuboidId);
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 5a72faf..4f68293 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -91,7 +91,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
protected void doSetup(Context context) throws IOException, InterruptedException {
super.bindCurrentConfiguration(context.getConfiguration());
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
config = AbstractHadoopJob.loadKylinPropsAndMetadata();
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
new file mode 100644
index 0000000..e97c6bb
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsWithOldStep.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+public class MergeStatisticsWithOldStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeStatisticsWithOldStep.class);
+
+ protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+
+ public MergeStatisticsWithOldStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager mgr = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = mgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment optimizeSegment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+ final String statsInputPath = CubingExecutableUtil.getStatisticsPath(this.getParams());
+
+ CubeSegment oldSegment = optimizeSegment.getCubeInstance().getOriginalSegmentToOptimize(optimizeSegment);
+ Preconditions.checkNotNull(oldSegment,
+ "cannot find the original segment to be optimized by " + optimizeSegment);
+
+ KylinConfig kylinConf = cube.getConfig();
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
+ ResourceStore rs = ResourceStore.getStore(kylinConf);
+ int averageSamplingPercentage = 0;
+
+ try {
+ //1. Add statistics from optimized segment
+ Path statisticsFilePath = new Path(statsInputPath,
+ BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ FileSystem hdfs = FileSystem.get(conf);
+ if (!hdfs.exists(statisticsFilePath))
+ throw new IOException("File " + statisticsFilePath + " does not exists");
+
+ CubeStatsReader optimizeSegmentStatsReader = new CubeStatsReader(optimizeSegment, null,
+ optimizeSegment.getConfig(), statisticsFilePath);
+ averageSamplingPercentage += optimizeSegmentStatsReader.getSamplingPercentage();
+ addFromCubeStatsReader(optimizeSegmentStatsReader);
+
+ //2. Add statistics from old segment
+ CubeStatsReader oldSegmentStatsReader = new CubeStatsReader(oldSegment, null, oldSegment.getConfig());
+ averageSamplingPercentage += oldSegmentStatsReader.getSamplingPercentage();
+ addFromCubeStatsReader(oldSegmentStatsReader);
+
+ logger.info("Cuboid set with stats info: " + cuboidHLLMap.keySet().toString());
+ //3. Store merged statistics for recommend cuboids
+ averageSamplingPercentage = averageSamplingPercentage / 2;
+ Set<Long> cuboidsRecommend = cube.getCuboidsRecommend();
+
+ Map<Long, HLLCounter> resultCuboidHLLMap = Maps.newHashMapWithExpectedSize(cuboidsRecommend.size());
+ for (Long cuboid : cuboidsRecommend) {
+ HLLCounter hll = cuboidHLLMap.get(cuboid);
+ if (hll == null) {
+ logger.warn("Cannot get the row count stats for cuboid " + cuboid);
+ } else {
+ resultCuboidHLLMap.put(cuboid, hll);
+ }
+ }
+
+ String resultDir = CubingExecutableUtil.getMergedStatisticsPath(this.getParams());
+ CubeStatsWriter.writeCuboidStatistics(conf, new Path(resultDir), resultCuboidHLLMap,
+ averageSamplingPercentage);
+
+ try (FSDataInputStream mergedStats = hdfs
+ .open(new Path(resultDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME))) {
+ // put the statistics to metadata store
+ String statisticsFileName = optimizeSegment.getStatisticsResourcePath();
+ rs.putResource(statisticsFileName, mergedStats, System.currentTimeMillis());
+ }
+
+ //By default, the cube optimization will use in-memory cubing
+ CubingJob cubingJob = (CubingJob) getManager()
+ .getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+ StatisticsDecisionUtil.decideCubingAlgorithm(cubingJob, optimizeSegment);
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to merge cuboid statistics", e);
+ return ExecuteResult.createError(e);
+ }
+
+ }
+
+ private void addFromCubeStatsReader(CubeStatsReader cubeStatsReader) {
+ for (Map.Entry<Long, HLLCounter> entry : cubeStatsReader.getCuboidRowHLLCounters().entrySet()) {
+ if (cuboidHLLMap.get(entry.getKey()) != null) {
+ cuboidHLLMap.get(entry.getKey()).merge(entry.getValue());
+ } else {
+ cuboidHLLMap.put(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index 9f2a6ad..359bd4c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -35,6 +35,7 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
import org.apache.kylin.engine.mr.common.NDCuboidBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,8 +51,8 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private Text outputKey = new Text();
private String cubeName;
private String segmentID;
- private CubeSegment cubeSegment;
private CubeDesc cubeDesc;
+ private CubeSegment cubeSegment;
private CuboidScheduler cuboidScheduler;
private int handleCounter;
@@ -65,17 +66,18 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
protected void doSetup(Context context) throws IOException {
super.bindCurrentConfiguration(context.getConfiguration());
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+ String cuboidModeName = context.getConfiguration().get(BatchConstants.CFG_CUBOID_MODE);
KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeSegment = cube.getSegmentById(segmentID);
cubeDesc = cube.getDescriptor();
+ cubeSegment = cube.getSegmentById(segmentID);
ndCuboidBuilder = new NDCuboidBuilder(cubeSegment);
// initialize CubiodScheduler
- cuboidScheduler = cubeSegment.getCuboidScheduler();
+ cuboidScheduler = CuboidSchedulerUtil.getCuboidSchedulerByMode(cubeSegment, cuboidModeName);
rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
}
@@ -104,7 +106,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
}
for (Long child : myChildren) {
- Cuboid childCuboid = Cuboid.findById(cuboidScheduler, child);
+ Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
outputKey.set(result.getSecond().array(), 0, result.getFirst());
context.write(outputKey, value);