You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/03 09:18:43 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4742
NullPointerException when auto merge segments if exist discard jobs
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 762e7c8 KYLIN-4742 NullPointerException when auto merge segments if exist discard jobs
762e7c8 is described below
commit 762e7c8379e9aec6162afec73437c7df6f8d6eef
Author: rupengwang <wa...@live.cn>
AuthorDate: Thu Sep 3 15:15:56 2020 +0800
KYLIN-4742 NullPointerException when auto merge segments if exist discard jobs
---
.../main/java/org/apache/kylin/metadata/MetadataConstants.java | 9 +++++----
.../java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java | 1 +
.../java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java | 1 +
.../scala/org/apache/kylin/engine/spark/utils/Repartitioner.java | 2 +-
.../src/main/java/org/apache/kylin/rest/service/CubeService.java | 3 ++-
5 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
index 36d20d2..0527f81 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/MetadataConstants.java
@@ -51,11 +51,12 @@ public interface MetadataConstants {
String P_DATA_RANGE_END = "dataRangeEnd";
String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
String P_CUBOID_NUMBER = "cuboidsNum";
+ String SEGMENT_NAME = "segmentName";
- public static final String TABLE_EXD_CARDINALITY = "cardinality";
- public static final String TABLE_EXD_DELIM = "delim";
- public static final String TABLE_EXD_DEFAULT_VALUE = "unknown";
+ String TABLE_EXD_CARDINALITY = "cardinality";
+ String TABLE_EXD_DELIM = "delim";
+ String TABLE_EXD_DEFAULT_VALUE = "unknown";
- public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
+ String SOURCE_RECORD_COUNT = "sourceRecordCount";
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
index f4e244f..c2e615d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingJob.java
@@ -86,6 +86,7 @@ public class NSparkCubingJob extends CubingJob {
job.setParam(CubingExecutableUtil.SEGMENT_ID,
segments.stream().map(x -> String.valueOf(x.getUuid())).collect(Collectors.joining(" ")));
job.setParam(MetadataConstants.P_JOB_ID, jobId);
+ job.setParam(MetadataConstants.SEGMENT_NAME, segments.iterator().next().getName());
job.setParam(MetadataConstants.P_PROJECT_NAME, job.cube.getProject());
job.setParam(MetadataConstants.P_CUBE_NAME, job.cube.getName());
job.setParam(MetadataConstants.P_TARGET_MODEL, job.getTargetSubject());
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
index 7efc946..63e3e7c 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingJob.java
@@ -78,6 +78,7 @@ public class NSparkMergingJob extends CubingJob {
job.setParam(MetadataConstants.P_CUBE_NAME, cube.getName());
job.setParam(MetadataConstants.P_SEGMENT_IDS, String.join(",", job.getTargetSegments()));
job.setParam(CubingExecutableUtil.SEGMENT_ID, mergedSegment.getUuid());
+ job.setParam(MetadataConstants.SEGMENT_NAME, mergedSegment.getName());
job.setParam(MetadataConstants.P_DATA_RANGE_START, mergedSegment.getSegRange().start.toString());
job.setParam(MetadataConstants.P_DATA_RANGE_END, mergedSegment.getSegRange().end.toString());
job.setParam(MetadataConstants.P_OUTPUT_META_URL, cube.getConfig().getMetadataUrl().toString());
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
index a62ead8..e50bd9c 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/Repartitioner.java
@@ -42,7 +42,7 @@ import org.apache.kylin.engine.spark.NSparkCubingEngine;
public class Repartitioner {
private static String tempDirSuffix = "_temp";
- protected static final Logger logger = LoggerFactory.getLogger(SparkConfHelper.class);
+ protected static final Logger logger = LoggerFactory.getLogger(Repartitioner.class);
private int MB = 1024 * 1024;
private int shardSize;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index d286cfd..931a14a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -787,7 +787,8 @@ public class CubeService extends BasicService implements InitializingBean {
final List<CubingJob> jobInstanceList = jobService.listJobsByRealizationName(cubeName, projectName,
EnumSet.of(ExecutableState.DISCARDED));
for (CubingJob cubingJob : jobInstanceList) {
- if (cubingJob.getSegmentName().equals(segmentName)) {
+ String jobSegmentName = cubingJob.getSegmentName();
+ if (jobSegmentName != null && jobSegmentName.equals(segmentName)) {
logger.debug("Merge job {} has been discarded before, will not merge.", segmentName);
return true;
}