You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/05/31 00:09:12 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4996 Merge cuboid
statistics in merge job for kylin4
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 2edeaea KYLIN-4996 Merge cuboid statistics in merge job for kylin4
2edeaea is described below
commit 2edeaea625ce5a1e8bfbc73a72fbafccbfb420ff
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed May 19 15:27:25 2021 +0800
KYLIN-4996 Merge cuboid statistics in merge job for kylin4
---
.../kylin/job/constant/ExecutableConstants.java | 2 +-
.../kylin/job/execution/ExecutableManager.java | 7 +-
.../kylin/engine/spark/job/JobStepFactory.java | 3 +
.../apache/kylin/engine/spark/job/JobStepType.java | 2 +-
.../spark/job/NSparkMergeStatisticsStep.java | 162 +++++++++++++++++++++
.../kylin/engine/spark/job/NSparkMergingJob.java | 3 +
.../org/apache/kylin/rest/service/JobService.java | 6 +-
7 files changed, 176 insertions(+), 9 deletions(-)
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 0d5e482..4d079d8 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -92,7 +92,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Global Dict - replace intermediate table";
public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
- //kylin on parquetv2
+ //kylin on parquet v2
public static final String STEP_NAME_DETECT_RESOURCE = "Detect Resource";
public static final String STEP_NAME_BUILD_CUBOID_FROM_PARENT_CUBOID = "Build recommend cuboid from parent cuboid";
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 1c9e81c..2643b86 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -271,6 +271,9 @@ public class ExecutableManager {
AbstractExecutable jobInstance = getJob(jobId);
String outputStorePath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(jobInstance.getParam(MetadataConstants.P_PROJECT_NAME), stepId);
ExecutableOutputPO jobOutput = getJobOutputFromHDFS(outputStorePath);
+ if (jobOutput == null) {
+ return null;
+ }
assertOutputNotNull(jobOutput, outputStorePath);
if (Objects.nonNull(jobOutput.getLogPath())) {
@@ -291,9 +294,7 @@ public class ExecutableManager {
Path path = new Path(resPath);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
if (!fs.exists(path)) {
- ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
- executableOutputPO.setContent("job output not found, please check kylin.log");
- return executableOutputPO;
+ return null;
}
din = fs.open(path);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
index ead4223..669ca65 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
@@ -45,6 +45,9 @@ public class JobStepFactory {
case OPTIMIZING:
step = new NSparkOptimizingStep(OptimizeBuildJob.class.getName());
break;
+ case MERGE_STATISTICS:
+ step = new NSparkMergeStatisticsStep();
+ break;
case CLEAN_UP_AFTER_MERGE:
step = new NSparkUpdateMetaAndCleanupAfterMergeStep();
break;
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
index 3b4142d..3dbfee2 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
@@ -21,7 +21,7 @@ package org.apache.kylin.engine.spark.job;
public enum JobStepType {
RESOURCE_DETECT,
- CLEAN_UP_AFTER_MERGE, CUBING, MERGING, OPTIMIZING,
+ CLEAN_UP_AFTER_MERGE, CUBING, MERGING, MERGE_STATISTICS, OPTIMIZING,
FILTER_RECOMMEND_CUBOID
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java
new file mode 100644
index 0000000..1884cd4
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+public class NSparkMergeStatisticsStep extends NSparkExecutable {
+ private static final Logger logger = LoggerFactory.getLogger(NSparkMergeStatisticsStep.class);
+
+ private List<CubeSegment> mergingSegments = Lists.newArrayList();
+ protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+
+ public NSparkMergeStatisticsStep() {
+ this.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ String jobId = getParam(MetadataConstants.P_JOB_ID);
+ String cubeId = getParam(MetadataConstants.P_CUBE_ID);
+
+ String mergedSegmentUuid = getParam(MetadataConstants.P_SEGMENT_IDS);
+ final KylinConfig kylinConfig = wrapConfig(context);
+ CubeInstance cube = CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeId);
+ CubeSegment mergedSeg = cube.getSegmentById(mergedSegmentUuid);
+
+ String jobTmpDir = kylinConfig.getJobTmpDir(cube.getProject()) + "/" + jobId;
+ Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/"
+ + cubeId + "/" + mergedSeg.getUuid() + "/");
+
+ mergingSegments = cube.getMergingSegments(mergedSeg);
+
+ Configuration conf = HadoopUtil.getCurrentConfiguration();
+ ResourceStore rs = ResourceStore.getStore(kylinConfig);
+ try {
+ int averageSamplingPercentage = 0;
+ long sourceRecordCount = 0;
+ for (CubeSegment segment : mergingSegments) {
+ String segmentId = segment.getUuid();
+ String fileKey = CubeSegment
+ .getStatisticsResourcePath(cube.getName(), segmentId);
+ InputStream is = rs.getResource(fileKey).content();
+ File tempFile = null;
+ FileOutputStream tempFileStream = null;
+ try {
+ tempFile = File.createTempFile(segmentId, ".seq");
+ tempFileStream = new FileOutputStream(tempFile);
+ org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+ } finally {
+ IOUtils.closeStream(is);
+ IOUtils.closeStream(tempFileStream);
+ }
+
+ FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+ SequenceFile.Reader reader = null;
+ try {
+ reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+ LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+ while (reader.next(key, value)) {
+ if (key.get() == 0L) {
+ // sampling percentage;
+ averageSamplingPercentage += Bytes.toInt(value.getBytes());
+ } else if (key.get() == -3) {
+ long perSourceRecordCount = Bytes.toLong(value.getBytes());
+ if (perSourceRecordCount > 0) {
+ sourceRecordCount += perSourceRecordCount;
+ }
+ } else if (key.get() > 0) {
+ HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+ ByteArray byteArray = new ByteArray(value.getBytes());
+ hll.readRegisters(byteArray.asBuffer());
+
+ if (cuboidHLLMap.get(key.get()) != null) {
+ cuboidHLLMap.get(key.get()).merge(hll);
+ } else {
+ cuboidHLLMap.put(key.get(), hll);
+ }
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw e;
+ } finally {
+ IOUtils.closeStream(reader);
+ if (tempFile != null)
+ tempFile.delete();
+ }
+ }
+ averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+ CubeStatsWriter.writeCuboidStatistics(conf, statisticsDir, cuboidHLLMap,
+ averageSamplingPercentage, sourceRecordCount);
+ Path statisticsFilePath = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+ FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+ FSDataInputStream is = fs.open(statisticsFilePath);
+ try {
+ // put the statistics to metadata store
+ String statisticsFileName = mergedSeg.getStatisticsResourcePath();
+ rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+ } finally {
+ IOUtils.closeStream(is);
+ }
+
+ return ExecuteResult.createSucceed();
+ } catch (IOException e) {
+ logger.error("fail to merge cuboid statistics", e);
+ return ExecuteResult.createError(e);
+ }
+ }
+
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
index c52c89c..9a53067 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
@@ -87,6 +87,9 @@ public class NSparkMergingJob extends CubingJob {
JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, cube);
JobStepFactory.addStep(job, JobStepType.MERGING, cube);
+ if (KylinConfig.getInstanceFromEnv().isSegmentStatisticsEnabled()) {
+ JobStepFactory.addStep(job, JobStepType.MERGE_STATISTICS, cube);
+ }
JobStepFactory.addStep(job, JobStepType.CLEAN_UP_AFTER_MERGE, cube);
return job;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 3cfcc78..7619d36 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -501,8 +501,7 @@ public class JobService extends BasicService implements InitializingBean {
public String getJobStepOutput(String jobId, String stepId) {
ExecutableManager executableManager = getExecutableManager();
- AbstractExecutable job = executableManager.getJob(jobId);
- if (job instanceof CheckpointExecutable) {
+ if (executableManager.getOutputFromHDFSByJobId(jobId, stepId) == null) {
return executableManager.getOutput(stepId).getVerboseMsg();
}
return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
@@ -510,8 +509,7 @@ public class JobService extends BasicService implements InitializingBean {
public String getAllJobStepOutput(String jobId, String stepId) {
ExecutableManager executableManager = getExecutableManager();
- AbstractExecutable job = executableManager.getJob(jobId);
- if (job instanceof CheckpointExecutable) {
+ if (executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE) == null) {
return executableManager.getOutput(stepId).getVerboseMsg();
}
return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();