You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/05/31 00:09:12 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4996 Merge cuboid statistics in merge job for kylin4

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 2edeaea  KYLIN-4996 Merge cuboid statistics in merge job for kylin4
2edeaea is described below

commit 2edeaea625ce5a1e8bfbc73a72fbafccbfb420ff
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed May 19 15:27:25 2021 +0800

    KYLIN-4996 Merge cuboid statistics in merge job for kylin4
---
 .../kylin/job/constant/ExecutableConstants.java    |   2 +-
 .../kylin/job/execution/ExecutableManager.java     |   7 +-
 .../kylin/engine/spark/job/JobStepFactory.java     |   3 +
 .../apache/kylin/engine/spark/job/JobStepType.java |   2 +-
 .../spark/job/NSparkMergeStatisticsStep.java       | 162 +++++++++++++++++++++
 .../kylin/engine/spark/job/NSparkMergingJob.java   |   3 +
 .../org/apache/kylin/rest/service/JobService.java  |   6 +-
 7 files changed, 176 insertions(+), 9 deletions(-)

diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 0d5e482..4d079d8 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -92,7 +92,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Global Dict - replace intermediate table";
 
     public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
-    //kylin on parquetv2
+    //kylin on parquet v2
     public static final String STEP_NAME_DETECT_RESOURCE = "Detect Resource";
     public static final String STEP_NAME_BUILD_CUBOID_FROM_PARENT_CUBOID = "Build recommend cuboid from parent cuboid";
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 1c9e81c..2643b86 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -271,6 +271,9 @@ public class ExecutableManager {
         AbstractExecutable jobInstance = getJob(jobId);
         String outputStorePath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(jobInstance.getParam(MetadataConstants.P_PROJECT_NAME), stepId);
         ExecutableOutputPO jobOutput = getJobOutputFromHDFS(outputStorePath);
+        if (jobOutput == null) {
+            return null;
+        }
         assertOutputNotNull(jobOutput, outputStorePath);
 
         if (Objects.nonNull(jobOutput.getLogPath())) {
@@ -291,9 +294,7 @@ public class ExecutableManager {
             Path path = new Path(resPath);
             FileSystem fs = HadoopUtil.getWorkingFileSystem();
             if (!fs.exists(path)) {
-                ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
-                executableOutputPO.setContent("job output not found, please check kylin.log");
-                return executableOutputPO;
+                return null;
             }
 
             din = fs.open(path);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
index ead4223..669ca65 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepFactory.java
@@ -45,6 +45,9 @@ public class JobStepFactory {
         case OPTIMIZING:
             step = new NSparkOptimizingStep(OptimizeBuildJob.class.getName());
             break;
+        case MERGE_STATISTICS:
+            step = new NSparkMergeStatisticsStep();
+            break;
         case CLEAN_UP_AFTER_MERGE:
             step = new NSparkUpdateMetaAndCleanupAfterMergeStep();
             break;
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
index 3b4142d..3dbfee2 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/JobStepType.java
@@ -21,7 +21,7 @@ package org.apache.kylin.engine.spark.job;
 public enum JobStepType {
     RESOURCE_DETECT,
 
-    CLEAN_UP_AFTER_MERGE, CUBING, MERGING, OPTIMIZING,
+    CLEAN_UP_AFTER_MERGE, CUBING, MERGING, MERGE_STATISTICS, OPTIMIZING,
 
     FILTER_RECOMMEND_CUBOID
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java
new file mode 100644
index 0000000..1884cd4
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergeStatisticsStep.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.job;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+public class NSparkMergeStatisticsStep extends NSparkExecutable {
+    private static final Logger logger = LoggerFactory.getLogger(NSparkMergeStatisticsStep.class);
+
+    private List<CubeSegment> mergingSegments = Lists.newArrayList();
+    protected Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+
+    public NSparkMergeStatisticsStep() {
+        this.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        String jobId = getParam(MetadataConstants.P_JOB_ID);
+        String cubeId = getParam(MetadataConstants.P_CUBE_ID);
+
+        String mergedSegmentUuid = getParam(MetadataConstants.P_SEGMENT_IDS);
+        final KylinConfig kylinConfig = wrapConfig(context);
+        CubeInstance cube = CubeManager.getInstance(kylinConfig).getCubeByUuid(cubeId);
+        CubeSegment mergedSeg = cube.getSegmentById(mergedSegmentUuid);
+
+        String jobTmpDir = kylinConfig.getJobTmpDir(cube.getProject()) + "/" + jobId;
+        Path statisticsDir = new Path(jobTmpDir + "/" + ResourceStore.CUBE_STATISTICS_ROOT + "/"
+                + cubeId + "/" + mergedSeg.getUuid() + "/");
+
+        mergingSegments = cube.getMergingSegments(mergedSeg);
+
+        Configuration conf = HadoopUtil.getCurrentConfiguration();
+        ResourceStore rs = ResourceStore.getStore(kylinConfig);
+        try {
+            int averageSamplingPercentage = 0;
+            long sourceRecordCount = 0;
+            for (CubeSegment segment : mergingSegments) {
+                String segmentId = segment.getUuid();
+                String fileKey = CubeSegment
+                        .getStatisticsResourcePath(cube.getName(), segmentId);
+                InputStream is = rs.getResource(fileKey).content();
+                File tempFile = null;
+                FileOutputStream tempFileStream = null;
+                try {
+                    tempFile = File.createTempFile(segmentId, ".seq");
+                    tempFileStream = new FileOutputStream(tempFile);
+                    org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+                } finally {
+                    IOUtils.closeStream(is);
+                    IOUtils.closeStream(tempFileStream);
+                }
+
+                FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
+                SequenceFile.Reader reader = null;
+                try {
+                    reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
+                    LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
+                    while (reader.next(key, value)) {
+                        if (key.get() == 0L) {
+                            // sampling percentage;
+                            averageSamplingPercentage += Bytes.toInt(value.getBytes());
+                        } else if (key.get() == -3) {
+                            long perSourceRecordCount = Bytes.toLong(value.getBytes());
+                            if (perSourceRecordCount > 0) {
+                                sourceRecordCount += perSourceRecordCount;
+                            }
+                        } else if (key.get() > 0) {
+                            HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+                            ByteArray byteArray = new ByteArray(value.getBytes());
+                            hll.readRegisters(byteArray.asBuffer());
+
+                            if (cuboidHLLMap.get(key.get()) != null) {
+                                cuboidHLLMap.get(key.get()).merge(hll);
+                            } else {
+                                cuboidHLLMap.put(key.get(), hll);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                } finally {
+                    IOUtils.closeStream(reader);
+                    if (tempFile != null)
+                        tempFile.delete();
+                }
+            }
+            averageSamplingPercentage = averageSamplingPercentage / mergingSegments.size();
+            CubeStatsWriter.writeCuboidStatistics(conf, statisticsDir, cuboidHLLMap,
+                    averageSamplingPercentage, sourceRecordCount);
+            Path statisticsFilePath = new Path(statisticsDir, BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+            FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+            FSDataInputStream is = fs.open(statisticsFilePath);
+            try {
+                // put the statistics to metadata store
+                String statisticsFileName = mergedSeg.getStatisticsResourcePath();
+                rs.putResource(statisticsFileName, is, System.currentTimeMillis());
+            } finally {
+                IOUtils.closeStream(is);
+            }
+
+            return ExecuteResult.createSucceed();
+        } catch (IOException e) {
+            logger.error("fail to merge cuboid statistics", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
index c52c89c..9a53067 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
@@ -87,6 +87,9 @@ public class NSparkMergingJob extends CubingJob {
 
         JobStepFactory.addStep(job, JobStepType.RESOURCE_DETECT, cube);
         JobStepFactory.addStep(job, JobStepType.MERGING, cube);
+        if (KylinConfig.getInstanceFromEnv().isSegmentStatisticsEnabled()) {
+            JobStepFactory.addStep(job, JobStepType.MERGE_STATISTICS, cube);
+        }
         JobStepFactory.addStep(job, JobStepType.CLEAN_UP_AFTER_MERGE, cube);
 
         return job;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 3cfcc78..7619d36 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -501,8 +501,7 @@ public class JobService extends BasicService implements InitializingBean {
 
     public String getJobStepOutput(String jobId, String stepId) {
         ExecutableManager executableManager = getExecutableManager();
-        AbstractExecutable job = executableManager.getJob(jobId);
-        if (job instanceof CheckpointExecutable) {
+        if (executableManager.getOutputFromHDFSByJobId(jobId, stepId) == null) {
             return executableManager.getOutput(stepId).getVerboseMsg();
         }
         return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
@@ -510,8 +509,7 @@ public class JobService extends BasicService implements InitializingBean {
 
     public String getAllJobStepOutput(String jobId, String stepId) {
         ExecutableManager executableManager = getExecutableManager();
-        AbstractExecutable job = executableManager.getJob(jobId);
-        if (job instanceof CheckpointExecutable) {
+        if (executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE) == null) {
             return executableManager.getOutput(stepId).getVerboseMsg();
         }
         return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();