You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/11/23 05:53:07 UTC
[05/18] kylin git commit: APACHE-KYLIN-2733: Introduce optimize job
for adjusting cuboid set
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
deleted file mode 100644
index 5c0555a..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ReducerNumSizing.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ReducerNumSizing {
-
- private static final Logger logger = LoggerFactory.getLogger(ReducerNumSizing.class);
-
- public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, double totalMapInputMB, int level) throws ClassNotFoundException, IOException, InterruptedException, JobException {
- CubeDesc cubeDesc = cubeSegment.getCubeDesc();
- KylinConfig kylinConfig = cubeDesc.getConfig();
-
- double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
- double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio();
- logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + level);
-
- CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, kylinConfig);
-
- double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst;
-
- if (level == -1) {
- //merge case
- double estimatedSize = cubeStatsReader.estimateCubeSize();
- adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize;
- logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, totalMapInputMB, adjustedCurrentLayerSizeEst);
- } else if (level == 0) {
- //base cuboid case TODO: the estimation could be very WRONG because it has no correction
- adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0);
- logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst);
- } else {
- parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1);
- currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level);
- adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst;
- logger.debug("totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst);
- }
-
- // number of reduce tasks
- int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99);
-
- // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
- if (cubeDesc.hasMemoryHungryMeasures()) {
- logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures");
- numReduceTasks = numReduceTasks * 4;
- }
-
- // at least 1 reducer by default
- numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
- // no more than 500 reducer by default
- numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
- return numReduceTasks;
- }
-
- public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg) throws IOException {
- KylinConfig kylinConfig = cubeSeg.getConfig();
-
- Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, kylinConfig).getCuboidSizeMap();
- double totalSizeInM = 0;
- for (Double cuboidSize : cubeSizeMap.values()) {
- totalSizeInM += cuboidSize;
- }
-
- double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB();
-
- // number of reduce tasks
- int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB);
-
- // at least 1 reducer by default
- numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks);
- // no more than 500 reducer by default
- numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks);
-
- logger.info("Having total map input MB " + Math.round(totalSizeInM));
- logger.info("Having per reduce MB " + perReduceInputMB);
- logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks);
- return numReduceTasks;
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
new file mode 100644
index 0000000..ed61b4a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterCheckpointStep.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.CuboidStatsReaderUtil;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateCubeInfoAfterCheckpointStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterCheckpointStep.class);
+
+ public UpdateCubeInfoAfterCheckpointStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+
+ Set<Long> recommendCuboids = cube.getCuboidsRecommend();
+ try {
+ List<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING);
+ Map<Long, Long> recommendCuboidsWithStats = CuboidStatsReaderUtil
+ .readCuboidStatsFromSegments(recommendCuboids, newSegments);
+ if (recommendCuboidsWithStats == null) {
+ throw new RuntimeException("Fail to get statistics info for recommended cuboids after optimization!!!");
+ }
+ cubeManager.promoteCheckpointOptimizeSegments(cube, recommendCuboidsWithStats,
+ newSegments.toArray(new CubeSegment[newSegments.size()]));
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (Exception e) {
+ logger.error("fail to update cube after build", e);
+ return ExecuteResult.createError(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
new file mode 100644
index 0000000..13c4f40
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterOptimizeStep.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class UpdateCubeInfoAfterOptimizeStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateCubeInfoAfterOptimizeStep.class);
+
+ public UpdateCubeInfoAfterOptimizeStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
+ final CubeInstance cube = cubeManager.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+ final CubeSegment segment = cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+
+ CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(segment);
+ long sourceCount = originalSegment.getInputRecords();
+ long sourceSizeBytes = originalSegment.getInputRecordsSize();
+
+ CubingJob cubingJob = (CubingJob) getManager().getJob(CubingExecutableUtil.getCubingJobId(this.getParams()));
+ long cubeSizeBytes = cubingJob.findCubeSizeBytes();
+
+ segment.setLastBuildJobID(CubingExecutableUtil.getCubingJobId(this.getParams()));
+ segment.setLastBuildTime(System.currentTimeMillis());
+ segment.setSizeKB(cubeSizeBytes / 1024);
+ segment.setInputRecords(sourceCount);
+ segment.setInputRecordsSize(sourceSizeBytes);
+
+ try {
+ cubeManager.promoteNewlyOptimizeSegments(cube, segment);
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+ } catch (IOException e) {
+ logger.error("fail to update cube after build", e);
+ return ExecuteResult.createError(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
new file mode 100644
index 0000000..0cd7264
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardJob.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class UpdateOldCuboidShardJob extends AbstractHadoopJob {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateOldCuboidShardJob.class);
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+ String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ CubeSegment optSegment = cube.getSegmentById(segmentID);
+ CubeSegment originalSegment = cube.getOriginalSegmentToOptimize(optSegment);
+
+ logger.info("Starting: " + job.getJobName());
+
+ setJobClasspath(job, cube.getConfig());
+
+ // Mapper
+ job.setMapperClass(UpdateOldCuboidShardMapper.class);
+
+ // Reducer
+ job.setNumReduceTasks(0);
+
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ // Input
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, input);
+ // Output
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job, output);
+
+ // set job configuration
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+ // add metadata to distributed cache
+ attachSegmentsMetadataWithDict(Lists.newArrayList(optSegment, originalSegment), job.getConfiguration());
+
+ this.deletePath(job.getConfiguration(), output);
+
+ return waitForCompletion(job);
+ } catch (Exception e) {
+ logger.error("error in CuboidJob", e);
+ printUsage(options);
+ throw e;
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
new file mode 100644
index 0000000..58b553e
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidBase;
+import static org.apache.kylin.engine.mr.JobBuilderSupport.PathNameCuboidOld;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.common.RowKeySplitter;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.kv.RowKeyEncoder;
+import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Text> {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateOldCuboidShardMapper.class);
+
+ private MultipleOutputs mos;
+ private long baseCuboid;
+
+ private CubeDesc cubeDesc;
+ private RowKeySplitter rowKeySplitter;
+ private RowKeyEncoderProvider rowKeyEncoderProvider;
+
+ private Text outputKey = new Text();
+ private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
+ private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+
+ @Override
+ protected void doSetup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ mos = new MultipleOutputs(context);
+
+ String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+ String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeSegment cubeSegment = cube.getSegmentById(segmentID);
+ CubeSegment oldSegment = cube.getOriginalSegmentToOptimize(cubeSegment);
+
+ cubeDesc = cube.getDescriptor();
+ baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+
+ rowKeySplitter = new RowKeySplitter(oldSegment, 65, 256);
+ rowKeyEncoderProvider = new RowKeyEncoderProvider(cubeSegment);
+ }
+
+ @Override
+ public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
+ long cuboidID = rowKeySplitter.split(key.getBytes());
+
+ Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
+ int fullKeySize = buildKey(cuboid, rowKeySplitter.getSplitBuffers());
+ outputKey.set(newKeyBuf.array(), 0, fullKeySize);
+
+ String baseOutputPath = PathNameCuboidOld;
+ if (cuboidID == baseCuboid) {
+ baseOutputPath = PathNameCuboidBase;
+ }
+ mos.write(outputKey, value, generateFileName(baseOutputPath));
+ }
+
+ private int buildKey(Cuboid cuboid, SplittedBytes[] splitBuffers) {
+ RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
+
+ int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
+ int endIdx = startIdx + Long.bitCount(cuboid.getId());
+ int offset = 0;
+ for (int i = startIdx; i < endIdx; i++) {
+ System.arraycopy(splitBuffers[i].value, 0, newKeyBodyBuf, offset, splitBuffers[i].length);
+ offset += splitBuffers[i].length;
+ }
+
+ int fullKeySize = rowkeyEncoder.getBytesLength();
+ while (newKeyBuf.array().length < fullKeySize) {
+ newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
+ }
+ newKeyBuf.setLength(fullKeySize);
+
+ rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
+
+ return fullKeySize;
+ }
+
+ @Override
+ public void doCleanup(Context context) throws IOException, InterruptedException {
+ mos.close();
+
+ Path outputDirBase = new Path(context.getConfiguration().get(FileOutputFormat.OUTDIR), PathNameCuboidBase);
+ FileSystem fs = FileSystem.get(context.getConfiguration());
+ if (!fs.exists(outputDirBase)) {
+ fs.mkdirs(outputDirBase);
+ SequenceFile
+ .createWriter(context.getConfiguration(),
+ SequenceFile.Writer.file(new Path(outputDirBase, "part-m-00000")),
+ SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class))
+ .close();
+ }
+ }
+
+ private String generateFileName(String subDir) {
+ return subDir + "/part";
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 06cc988..bdf0633 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -233,7 +233,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
final String cuboidOutputPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
IMROutput2.IMROutputFormat outputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
- outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level);
+ outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, cubeSeg.getCuboidScheduler(), level);
prepareOutput(rdd, kylinConfig, cubeSeg, level).mapToPair(
new PairFunction<Tuple2<ByteArray, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index ad00706..77bd498 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -38,6 +38,7 @@ import org.apache.kylin.dimension.DimensionEncodingFactory;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISourceAware;
import org.apache.kylin.metadata.model.SegmentRange;
@@ -51,6 +52,7 @@ import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.CubeRequest;
import org.apache.kylin.rest.request.JobBuildRequest;
import org.apache.kylin.rest.request.JobBuildRequest2;
+import org.apache.kylin.rest.request.JobOptimizeRequest;
import org.apache.kylin.rest.response.EnvelopeResponse;
import org.apache.kylin.rest.response.GeneralResponse;
import org.apache.kylin.rest.response.HBaseResponse;
@@ -362,6 +364,67 @@ public class CubeController extends BasicController {
}
}
+ /**
+ * Send a optimize cube job
+ *
+ * @param cubeName Cube ID
+ * @return JobInstance of CheckpointExecutable
+ */
+ @RequestMapping(value = "/{cubeName}/optimize", method = { RequestMethod.PUT })
+ @ResponseBody
+ public JobInstance optimize(@PathVariable String cubeName, @RequestBody JobOptimizeRequest jobOptimizeRequest) {
+ try {
+ String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
+ CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
+
+ if (cube == null) {
+ throw new InternalErrorException("Cannot find cube " + cubeName);
+ }
+ logger.info("cuboid recommend:" + jobOptimizeRequest.getCuboidsRecommend());
+ return jobService.submitOptimizeJob(cube, jobOptimizeRequest.getCuboidsRecommend(), submitter).getFirst();
+ } catch (BadRequestException e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw e;
+ } catch (JobException e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new BadRequestException(e.getLocalizedMessage());
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new InternalErrorException(e.getLocalizedMessage());
+ }
+ }
+
+ /**
+ * Send a optimize cube segment job
+ *
+ * @param cubeName Cube ID
+ * @param segmentID for segment to be optimized
+ */
+ @RequestMapping(value = "/{cubeName}/recover_segment_optimize/{segmentID}", method = { RequestMethod.PUT })
+ @ResponseBody
+ public JobInstance recoverSegmentOptimize(@PathVariable String cubeName, @PathVariable String segmentID) {
+ try {
+ String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
+ CubeInstance cube = jobService.getCubeManager().getCube(cubeName);
+ if (cube == null) {
+ throw new InternalErrorException("Cannot find cube " + cubeName);
+ }
+
+ CubeSegment segment = cube.getSegmentById(segmentID);
+ if (segment == null) {
+ throw new InternalErrorException("Cannot find segment '" + segmentID + "'");
+ }
+
+ return jobService.submitRecoverSegmentOptimizeJob(segment, submitter);
+ } catch (JobException e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new BadRequestException(e.getLocalizedMessage());
+ } catch (Exception e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new InternalErrorException(e.getLocalizedMessage());
+ }
+ }
+
@RequestMapping(value = "/{cubeName}/disable", method = { RequestMethod.PUT }, produces = { "application/json" })
@ResponseBody
public CubeInstance disableCube(@PathVariable String cubeName) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
new file mode 100644
index 0000000..51e8e7c
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/JobOptimizeRequest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+import java.util.Set;
+
+public class JobOptimizeRequest {
+
+ private Set<Long> cuboidsRecommend;
+
+ public Set<Long> getCuboidsRecommend() {
+ return cuboidsRecommend;
+ }
+
+ public void setCuboidsRecommend(Set<Long> cuboidsRecommend) {
+ this.cuboidsRecommend = cuboidsRecommend;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index a18aaf3..d5805a1 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -72,6 +72,8 @@ import org.springframework.stereotype.Component;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
* Stateless & lightweight service facade of cube management functions.
@@ -525,6 +527,8 @@ public class CubeService extends BasicService implements InitializingBean {
CubeUpdate update = new CubeUpdate(cube);
update.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()]));
+ update.setCuboids(Maps.<Long, Long> newHashMap());
+ update.setCuboidsRecommend(Sets.<Long> newHashSet());
CubeManager.getInstance(getConfig()).updateCube(update);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 529f3b8..9f2d0d9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -34,11 +34,13 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.directory.api.util.Strings;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.common.JobInfoConverter;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -48,6 +50,7 @@ import org.apache.kylin.job.SchedulerFactory;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.exception.SchedulerException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.CheckpointExecutable;
@@ -72,6 +75,7 @@ import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
+import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import com.google.common.base.Function;
@@ -273,6 +277,137 @@ public class JobService extends BasicService implements InitializingBean {
return jobInstance;
}
+ public Pair<JobInstance, List<JobInstance>> submitOptimizeJob(CubeInstance cube, Set<Long> cuboidsRecommend,
+ String submitter) throws IOException, JobException {
+
+ Pair<JobInstance, List<JobInstance>> result = submitOptimizeJobInternal(cube, cuboidsRecommend, submitter);
+ accessService.init(result.getFirst(), null);
+ accessService.inherit(result.getFirst(), cube);
+ for (JobInstance jobInstance : result.getSecond()) {
+ accessService.init(jobInstance, null);
+ accessService.inherit(jobInstance, cube);
+ }
+
+ return result;
+ }
+
+ private Pair<JobInstance, List<JobInstance>> submitOptimizeJobInternal(CubeInstance cube,
+ Set<Long> cuboidsRecommend, String submitter) throws IOException {
+ Message msg = MsgPicker.getMsg();
+
+ if (cube.getStatus() == RealizationStatusEnum.DESCBROKEN) {
+ throw new BadRequestException(String.format(msg.getBUILD_BROKEN_CUBE(), cube.getName()));
+ }
+
+ checkCubeDescSignature(cube);
+ checkAllowOptimization(cube, cuboidsRecommend);
+
+ CubeSegment[] optimizeSegments = null;
+ try {
+ /** Add optimize segments */
+ optimizeSegments = getCubeManager().optimizeSegments(cube, cuboidsRecommend);
+ List<JobInstance> optimizeJobInstances = Lists.newLinkedList();
+
+ /** Add optimize jobs */
+ List<AbstractExecutable> optimizeJobList = Lists.newArrayListWithExpectedSize(optimizeSegments.length);
+ for (CubeSegment optimizeSegment : optimizeSegments) {
+ DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, submitter);
+ getExecutableManager().addJob(optimizeJob);
+
+ optimizeJobList.add(optimizeJob);
+ optimizeJobInstances.add(getSingleJobInstance(optimizeJob));
+ }
+
+ /** Add checkpoint job for batch jobs */
+ CheckpointExecutable checkpointJob = new BatchOptimizeJobCheckpointBuilder(cube, submitter).build();
+ checkpointJob.addTaskListForCheck(optimizeJobList);
+
+ getExecutableManager().addJob(checkpointJob);
+
+ return new Pair(getCheckpointJobInstance(checkpointJob), optimizeJobInstances);
+ } catch (Exception e) {
+ if (optimizeSegments != null) {
+ logger.error("Job submission might failed for NEW segments {}, will clean the NEW segments from cube",
+ optimizeSegments);
+ try {
+ // Remove this segments
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
+ cubeBuilder.setToRemoveSegs(optimizeSegments);
+ getCubeManager().updateCube(cubeBuilder);
+ } catch (Exception ee) {
+ // swallow the exception
+ logger.error("Clean New segments failed, ignoring it", e);
+ }
+ }
+ throw e;
+ }
+ }
+
+ public JobInstance submitRecoverSegmentOptimizeJob(CubeSegment segment, String submitter)
+ throws IOException, JobException {
+ CubeInstance cubeInstance = segment.getCubeInstance();
+
+ checkCubeDescSignature(cubeInstance);
+
+ String cubeName = cubeInstance.getName();
+ List<JobInstance> jobInstanceList = searchJobsByCubeName(cubeName, null,
+ Lists.newArrayList(JobStatusEnum.NEW, JobStatusEnum.PENDING, JobStatusEnum.ERROR),
+ JobTimeFilterEnum.ALL, JobSearchMode.CHECKPOINT_ONLY);
+ if (jobInstanceList.size() > 1) {
+ throw new IllegalStateException("Exist more than one CheckpointExecutable for cube " + cubeName);
+ } else if (jobInstanceList.size() == 0) {
+ throw new IllegalStateException("There's no CheckpointExecutable for cube " + cubeName);
+ }
+ CheckpointExecutable checkpointExecutable = (CheckpointExecutable) getExecutableManager()
+ .getJob(jobInstanceList.get(0).getId());
+
+ AbstractExecutable toBeReplaced = null;
+ for (AbstractExecutable taskForCheck : checkpointExecutable.getSubTasksForCheck()) {
+ if (taskForCheck instanceof CubingJob) {
+ CubingJob subCubingJob = (CubingJob) taskForCheck;
+ String segmentName = CubingExecutableUtil.getSegmentName(subCubingJob.getParams());
+ if (segmentName != null && segmentName.equals(segment.getName())) {
+ String segmentID = CubingExecutableUtil.getSegmentId(subCubingJob.getParams());
+ CubeSegment beingOptimizedSegment = cubeInstance.getSegmentById(segmentID);
+ if (beingOptimizedSegment != null) { // beingOptimizedSegment exists & should not be recovered
+ throw new IllegalStateException("Segment " + beingOptimizedSegment.getName() + "-"
+ + beingOptimizedSegment.getUuid()
+ + " still exists. Please delete it or discard the related optimize job first!!!");
+ }
+ toBeReplaced = taskForCheck;
+ break;
+ }
+ }
+ }
+ if (toBeReplaced == null) {
+ throw new IllegalStateException("There's no CubingJob for segment " + segment.getName()
+ + " in CheckpointExecutable " + checkpointExecutable.getName());
+ }
+
+ /** Add CubingJob for the related segment **/
+ CubeSegment optimizeSegment = getCubeManager().appendSegment(cubeInstance, segment.getTSRange());
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+ cubeBuilder.setToAddSegs(optimizeSegment);
+ getCubeManager().updateCube(cubeBuilder);
+
+ DefaultChainedExecutable optimizeJob = EngineFactory.createBatchOptimizeJob(optimizeSegment, submitter);
+
+ getExecutableManager().addJob(optimizeJob);
+
+ JobInstance optimizeJobInstance = getSingleJobInstance(optimizeJob);
+ accessService.init(optimizeJobInstance, null);
+ accessService.inherit(optimizeJobInstance, cubeInstance);
+
+ /** Update the checkpoint job */
+ checkpointExecutable.getSubTasksForCheck().set(checkpointExecutable.getSubTasksForCheck().indexOf(toBeReplaced),
+ optimizeJob);
+
+ getExecutableManager().updateCheckpointJob(checkpointExecutable.getId(),
+ checkpointExecutable.getSubTasksForCheck());
+
+ return optimizeJobInstance;
+ }
+
private void checkCubeDescSignature(CubeInstance cube) {
Message msg = MsgPicker.getMsg();
@@ -281,8 +416,25 @@ public class JobService extends BasicService implements InitializingBean {
String.format(msg.getINCONSISTENT_CUBE_DESC_SIGNATURE(), cube.getDescriptor()));
}
+ private void checkAllowOptimization(CubeInstance cube, Set<Long> cuboidsRecommend) {
+ long baseCuboid = cube.getCuboidScheduler().getBaseCuboidId();
+ if (!cuboidsRecommend.contains(baseCuboid)) {
+ throw new BadRequestException("The recommend cuboids should contain the base cuboid " + baseCuboid);
+ }
+ Set<Long> currentCuboidSet = cube.getCuboidScheduler().getAllCuboidIds();
+ if (currentCuboidSet.equals(cuboidsRecommend)) {
+ throw new BadRequestException(
+ "The recommend cuboids are the same as the current cuboids. It's no need to do optimization.");
+ }
+ }
+
public JobInstance getJobInstance(String uuid) {
- return getSingleJobInstance(getExecutableManager().getJob(uuid));
+ AbstractExecutable job = getExecutableManager().getJob(uuid);
+ if (job instanceof CheckpointExecutable) {
+ return getCheckpointJobInstance(job);
+ } else {
+ return getSingleJobInstance(job);
+ }
}
public Output getOutput(String id) {
@@ -362,21 +514,90 @@ public class JobService extends BasicService implements InitializingBean {
getExecutableManager().discardJob(job.getId());
return job;
}
- CubeInstance cubeInstance = getCubeManager().getCube(job.getRelatedCube());
+
+ logger.info("Cancel job [" + job.getId() + "] trigger by "
+ + SecurityContextHolder.getContext().getAuthentication().getName());
+ if (job.getStatus() == JobStatusEnum.FINISHED) {
+ throw new IllegalStateException(
+ "The job " + job.getId() + " has already been finished and cannot be discarded.");
+ }
+ if (job.getStatus() == JobStatusEnum.DISCARDED) {
+ return job;
+ }
+
+ AbstractExecutable executable = getExecutableManager().getJob(job.getId());
+ if (executable instanceof CubingJob) {
+ cancelCubingJobInner((CubingJob) executable);
+ } else if (executable instanceof CheckpointExecutable) {
+ cancelCheckpointJobInner((CheckpointExecutable) executable);
+ } else {
+ getExecutableManager().discardJob(executable.getId());
+ }
+ return job;
+ }
+
+ private void cancelCubingJobInner(CubingJob cubingJob) throws IOException {
+ CubeInstance cubeInstance = getCubeManager().getCube(CubingExecutableUtil.getCubeName(cubingJob.getParams()));
// might not a cube job
- final String segmentIds = job.getRelatedSegment();
- for (String segmentId : StringUtils.split(segmentIds)) {
- final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
- if (segment != null && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getTSRange().end.v == 0)) {
- // Remove this segments
+ final String segmentIds = CubingExecutableUtil.getSegmentId(cubingJob.getParams());
+ if (!StringUtils.isEmpty(segmentIds)) {
+ List<CubeSegment> toRemoveSegments = Lists.newLinkedList();
+ for (String segmentId : StringUtils.split(segmentIds)) {
+ final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
+ if (segment != null
+ && (segment.getStatus() == SegmentStatusEnum.NEW || segment.getTSRange().end.v == 0)) {
+ // Remove this segment
+ toRemoveSegments.add(segment);
+ }
+ }
+ if (!toRemoveSegments.isEmpty()) {
CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
- cubeBuilder.setToRemoveSegs(segment);
+ cubeBuilder.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[toRemoveSegments.size()]));
getCubeManager().updateCube(cubeBuilder);
}
}
- getExecutableManager().discardJob(job.getId());
+ getExecutableManager().discardJob(cubingJob.getId());
+ }
- return job;
+ private void cancelCheckpointJobInner(CheckpointExecutable checkpointExecutable) throws IOException {
+ List<String> segmentIdList = Lists.newLinkedList();
+ List<String> jobIdList = Lists.newLinkedList();
+ jobIdList.add(checkpointExecutable.getId());
+ setRelatedIdList(checkpointExecutable, segmentIdList, jobIdList);
+
+ CubeInstance cubeInstance = getCubeManager()
+ .getCube(CubingExecutableUtil.getCubeName(checkpointExecutable.getParams()));
+ if (!segmentIdList.isEmpty()) {
+ List<CubeSegment> toRemoveSegments = Lists.newLinkedList();
+ for (String segmentId : segmentIdList) {
+ final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
+ if (segment != null && segment.getStatus() != SegmentStatusEnum.READY) {
+ toRemoveSegments.add(segment);
+ }
+ }
+
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
+ cubeBuilder.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[toRemoveSegments.size()]));
+ cubeBuilder.setCuboidsRecommend(Sets.<Long> newHashSet()); //Set recommend cuboids to be null
+ getCubeManager().updateCube(cubeBuilder);
+ }
+
+ for (String jobId : jobIdList) {
+ getExecutableManager().discardJob(jobId);
+ }
+ }
+
+ private void setRelatedIdList(CheckpointExecutable checkpointExecutable, List<String> segmentIdList,
+ List<String> jobIdList) {
+ for (AbstractExecutable taskForCheck : checkpointExecutable.getSubTasksForCheck()) {
+ jobIdList.add(taskForCheck.getId());
+ if (taskForCheck instanceof CubingJob) {
+ segmentIdList.addAll(Lists
+ .newArrayList(StringUtils.split(CubingExecutableUtil.getSegmentId(taskForCheck.getParams()))));
+ } else if (taskForCheck instanceof CheckpointExecutable) {
+ setRelatedIdList((CheckpointExecutable) taskForCheck, segmentIdList, jobIdList);
+ }
+ }
}
public JobInstance pauseJob(JobInstance job) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index c41df06..838112f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
@@ -67,6 +68,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
CubeInstance cube = null;
CubeDesc cubeDesc = null;
String segmentID = null;
+ String cuboidModeName = null;
KylinConfig kylinConfig;
Path partitionFilePath;
@@ -78,6 +80,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(OPTION_STATISTICS_ENABLED);
+ options.addOption(OPTION_CUBOID_MODE);
parseOptions(options, args);
partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@ -89,13 +92,27 @@ public class CreateHTableJob extends AbstractHadoopJob {
cubeDesc = cube.getDescriptor();
kylinConfig = cube.getConfig();
segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
CubeSegment cubeSegment = cube.getSegmentById(segmentID);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
byte[][] splitKeys;
if (statsEnabled) {
- final Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
+ Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, null, kylinConfig).getCuboidSizeMap();
+ Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
+ if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
+ Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size());
+ for (Long cuboid : buildingCuboids) {
+ Double cuboidSize = cuboidSizeMap.get(cuboid);
+ if (cuboidSize == null) {
+ logger.warn(cuboid + "cuboid's size is null will replace by 0");
+ cuboidSize = 0.0;
+ }
+ optimizedCuboidSizeMap.put(cuboid, cuboidSize);
+ }
+ cuboidSizeMap = optimizedCuboidSizeMap;
+ }
splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, partitionFilePath.getParent());
} else {
splitKeys = getRegionSplits(conf, partitionFilePath);
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 31cb189..db3f7f4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -31,13 +31,15 @@ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
-import org.apache.kylin.engine.mr.steps.ReducerNumSizing;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,13 +95,15 @@ public class HBaseMROutput2Transition implements IMROutput2 {
}
@Override
- public void configureJobOutput(Job job, String output, CubeSegment segment, int level) throws Exception {
+ public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler,
+ int level) throws Exception {
int reducerNum = 1;
Class mapperClass = job.getMapperClass();
if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == NDCuboidMapper.class) {
- reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), level);
+ reducerNum = MapReduceUtil.getLayeredCubingReduceTaskNum(segment, cuboidScheduler,
+ AbstractHadoopJob.getTotalMapInputMB(job), level);
} else if (mapperClass == InMemCuboidMapper.class) {
- reducerNum = ReducerNumSizing.getInmemCubingReduceTaskNum(segment);
+ reducerNum = MapReduceUtil.getInmemCubingReduceTaskNum(segment, cuboidScheduler);
}
Path outputPath = new Path(output);
FileOutputFormat.setOutputPath(job, outputPath);
@@ -149,7 +153,8 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public void configureJobOutput(Job job, String output, CubeSegment segment) throws Exception {
- int reducerNum = ReducerNumSizing.getLayeredCubingReduceTaskNum(segment, AbstractHadoopJob.getTotalMapInputMB(job), -1);
+ int reducerNum = MapReduceUtil.getLayeredCubingReduceTaskNum(segment, segment.getCuboidScheduler(),
+ AbstractHadoopJob.getTotalMapInputMB(job), -1);
job.setNumReduceTasks(reducerNum);
Path outputPath = new Path(output);
@@ -185,4 +190,30 @@ public class HBaseMROutput2Transition implements IMROutput2 {
throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
}
}
+
+ public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) {
+ return new IMRBatchOptimizeOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId(), CuboidModeEnum.RECOMMEND));
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ steps.addOptimizeGarbageCollectionSteps(jobFlow);
+ }
+
+ @Override
+ public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) {
+ steps.addCheckpointGarbageCollectionSteps(jobFlow);
+ }
+ };
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/020c4e78/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 6f69e8c..13e7dc4 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
@@ -32,6 +33,7 @@ import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.storage.hbase.HBaseConnection;
import com.google.common.base.Preconditions;
@@ -82,7 +84,15 @@ public class HBaseMRSteps extends JobBuilderSupport {
return createCreateHTableStep(jobId, true);
}
+ public HadoopShellExecutable createCreateHTableStepWithStats(String jobId, CuboidModeEnum cuboidMode) {
+ return createCreateHTableStep(jobId, true, cuboidMode);
+ }
+
private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) {
+ return createCreateHTableStep(jobId, withStats, CuboidModeEnum.CURRENT);
+ }
+
+ private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats, CuboidModeEnum cuboidMode) {
HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
StringBuilder cmd = new StringBuilder();
@@ -90,6 +100,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
createHtableStep.setJobParams(cmd.toString());
createHtableStep.setJobClass(CreateHTableJob.class);
@@ -167,6 +178,35 @@ public class HBaseMRSteps extends JobBuilderSupport {
return result;
}
+ public MergeGCStep createOptimizeGCStep() {
+ MergeGCStep result = new MergeGCStep();
+ result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ result.setOldHTables(getOptimizeHTables());
+ return result;
+ }
+
+ public List<CubeSegment> getOptimizeSegments() {
+ CubeInstance cube = (CubeInstance) seg.getRealization();
+ List<CubeSegment> newSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY_PENDING));
+ List<CubeSegment> oldSegments = Lists.newArrayListWithExpectedSize(newSegments.size());
+ for (CubeSegment segment : newSegments) {
+ oldSegments.add(cube.getOriginalSegmentToOptimize(segment));
+ }
+ return oldSegments;
+ }
+
+ public List<String> getOptimizeHTables() {
+ return getOldHTables(getOptimizeSegments());
+ }
+
+ public List<String> getOldHTables(final List<CubeSegment> oldSegments) {
+ final List<String> oldHTables = Lists.newArrayListWithExpectedSize(oldSegments.size());
+ for (CubeSegment segment : oldSegments) {
+ oldHTables.add(segment.getStorageLocationIdentifier());
+ }
+ return oldHTables;
+ }
+
public List<String> getMergingHTables() {
final List<CubeSegment> mergingSegments = ((CubeInstance) seg.getRealization()).getMergingSegments((CubeSegment) seg);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge, target segment " + seg);
@@ -187,6 +227,18 @@ public class HBaseMRSteps extends JobBuilderSupport {
return mergingHDFSPaths;
}
+ public List<String> getOptimizeHDFSPaths() {
+ return getOldHDFSPaths(getOptimizeSegments());
+ }
+
+ public List<String> getOldHDFSPaths(final List<CubeSegment> oldSegments) {
+ final List<String> oldHDFSPaths = Lists.newArrayListWithExpectedSize(oldSegments.size());
+ for (CubeSegment oldSegment : oldSegments) {
+ oldHDFSPaths.add(getJobWorkingDir(oldSegment.getLastBuildJobID()));
+ }
+ return oldHDFSPaths;
+ }
+
public String getHFilePath(String jobId) {
return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/hfile/");
}
@@ -195,6 +247,36 @@ public class HBaseMRSteps extends JobBuilderSupport {
return HBaseConnection.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getRealization().getName() + "/rowkey_stats");
}
+ public void addOptimizeGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+ String jobId = jobFlow.getId();
+
+ List<String> toDeletePaths = new ArrayList<>();
+ toDeletePaths.add(getOptimizationRootPath(jobId));
+
+ HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
+ step.setDeletePaths(toDeletePaths);
+ step.setJobId(jobId);
+
+ jobFlow.addTask(step);
+ }
+
+ public void addCheckpointGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+ String jobId = jobFlow.getId();
+
+ jobFlow.addTask(createOptimizeGCStep());
+
+ List<String> toDeletePaths = new ArrayList<>();
+ toDeletePaths.addAll(getOptimizeHDFSPaths());
+
+ HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
+ step.setDeletePaths(toDeletePaths);
+ step.setJobId(jobId);
+
+ jobFlow.addTask(step);
+ }
+
public void addMergingGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
String jobId = jobFlow.getId();