You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/03 09:18:43 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4742 NullPointerException when auto merge segments if exist discard jobs

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 762e7c8  KYLIN-4742 NullPointerException when auto merge segments if exist discard jobs
762e7c8 is described below

commit 762e7c8379e9aec6162afec73437c7df6f8d6eef
Author: rupengwang <wa...@live.cn>
AuthorDate: Thu Sep 3 15:15:56 2020 +0800

    KYLIN-4742 NullPointerException when auto merge segments if exist discard jobs
---
 .../main/java/org/apache/kylin/metadata/MetadataConstants.java   | 9 +++++----
 .../java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java  | 1 +
 .../java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java | 1 +
 .../scala/org/apache/kylin/engine/spark/utils/Repartitioner.java | 2 +-
 .../src/main/java/org/apache/kylin/rest/service/CubeService.java | 3 ++-
 5 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
index 36d20d2..0527f81 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
@@ -51,11 +51,12 @@ public interface MetadataConstants {
     String P_DATA_RANGE_END = "dataRangeEnd";
     String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
     String P_CUBOID_NUMBER = "cuboidsNum";
+    String SEGMENT_NAME = "segmentName";
 
-    public static final String TABLE_EXD_CARDINALITY = "cardinality";
-    public static final String TABLE_EXD_DELIM = "delim";
-    public static final String TABLE_EXD_DEFAULT_VALUE = "unknown";
+    String TABLE_EXD_CARDINALITY = "cardinality";
+    String TABLE_EXD_DELIM = "delim";
+    String TABLE_EXD_DEFAULT_VALUE = "unknown";
 
-    public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
+    String SOURCE_RECORD_COUNT = "sourceRecordCount";
 
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index f4e244f..c2e615d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -86,6 +86,7 @@ public class NSparkCubingJob extends CubingJob {
         job.setParam(CubingExecutableUtil.SEGMENT_ID,
                 segments.stream().map(x -> String.valueOf(x.getUuid())).collect(Collectors.joining(" ")));
         job.setParam(MetadataConstants.P_JOB_ID, jobId);
+        job.setParam(MetadataConstants.SEGMENT_NAME, segments.iterator().next().getName());
         job.setParam(MetadataConstants.P_PROJECT_NAME, job.cube.getProject());
         job.setParam(MetadataConstants.P_CUBE_NAME, job.cube.getName());
         job.setParam(MetadataConstants.P_TARGET_MODEL, job.getTargetSubject());
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
index 7efc946..63e3e7c 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
@@ -78,6 +78,7 @@ public class NSparkMergingJob extends CubingJob {
         job.setParam(MetadataConstants.P_CUBE_NAME, cube.getName());
         job.setParam(MetadataConstants.P_SEGMENT_IDS, String.join(",", job.getTargetSegments()));
         job.setParam(CubingExecutableUtil.SEGMENT_ID, mergedSegment.getUuid());
+        job.setParam(MetadataConstants.SEGMENT_NAME, mergedSegment.getName());
         job.setParam(MetadataConstants.P_DATA_RANGE_START, mergedSegment.getSegRange().start.toString());
         job.setParam(MetadataConstants.P_DATA_RANGE_END, mergedSegment.getSegRange().end.toString());
         job.setParam(MetadataConstants.P_OUTPUT_META_URL, cube.getConfig().getMetadataUrl().toString());
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
index a62ead8..e50bd9c 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -42,7 +42,7 @@ import org.apache.kylin.engine.spark.NSparkCubingEngine;
 
 public class Repartitioner {
     private static String tempDirSuffix = "_temp";
-    protected static final Logger logger = LoggerFactory.getLogger(SparkConfHelper.class);
+    protected static final Logger logger = LoggerFactory.getLogger(Repartitioner.class);
 
     private int MB = 1024 * 1024;
     private int shardSize;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index d286cfd..931a14a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -787,7 +787,8 @@ public class CubeService extends BasicService implements InitializingBean {
         final List<CubingJob> jobInstanceList = jobService.listJobsByRealizationName(cubeName, projectName,
                 EnumSet.of(ExecutableState.DISCARDED));
         for (CubingJob cubingJob : jobInstanceList) {
-            if (cubingJob.getSegmentName().equals(segmentName)) {
+            String jobSegmentName = cubingJob.getSegmentName();
+            if (jobSegmentName != null && jobSegmentName.equals(segmentName)) {
                 logger.debug("Merge job {} has been discarded before, will not merge.", segmentName);
                 return true;
             }