You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/25 07:00:28 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4761 Update some
missing values of new segment when merge segments
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new b341568 KYLIN-4761 Update some missing values of new segment when merge segments
b341568 is described below
commit b3415685d5de9abbf47ee12a5fcdcca4a56accdc
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Sat Sep 19 23:04:22 2020 +0800
KYLIN-4761 Update some missing values of new segment when merge segments
(cherry picked from commit 878874ba1d833a60b941c0655b4338618cc32a80)
---
.../kylin/engine/spark/job/NSparkCubingStep.java | 43 +------
.../kylin/engine/spark/job/NSparkExecutable.java | 8 +-
.../kylin/engine/spark/job/NSparkMergingStep.java | 6 +
.../NSparkUpdateMetaAndCleanupAfterMergeStep.java | 41 +++----
.../merger/AfterMergeOrRefreshResourceMerger.java | 133 --------------------
.../kylin/engine/spark/merger/MetadataMerger.java | 40 ------
.../engine/spark/utils/UpdateMetadataUtil.java | 136 +++++++++++++++++++++
.../kylin/engine/spark/job/CubeMergeJob.java | 44 ++++++-
.../engine/spark/LocalWithSparkSessionTest.java | 12 +-
9 files changed, 215 insertions(+), 248 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
index c3234f1..5c073e1 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
@@ -18,30 +18,23 @@
package org.apache.kylin.engine.spark.job;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
@@ -79,38 +72,8 @@ public class NSparkCubingStep extends NSparkExecutable {
}
@Override
- protected void updateMetaAfterBuilding(KylinConfig config) throws IOException {
- CubeManager cubeManager = CubeManager.getInstance(config);
- CubeInstance currentInstanceCopy = cubeManager.getCube(getCubeName()).latestCopyForWrite();
- KylinConfig kylinDistConfig = MetaDumpUtil.loadKylinConfigFromHdfs(getDistMetaUrl());
- CubeInstance distCube = CubeManager.getInstance(kylinDistConfig).reloadCube(getCubeName());
- CubeUpdate update = new CubeUpdate(currentInstanceCopy);
- Set<String> segmentIds = Sets.newHashSet(org.apache.hadoop.util.StringUtils.split(getParam(MetadataConstants.P_SEGMENT_IDS)));
- CubeSegment toUpdateSegs = distCube.getSegmentById(segmentIds.iterator().next());
-
- List<CubeSegment> tobe = currentInstanceCopy.calculateToBeSegments(toUpdateSegs);
-
- if (!tobe.contains(toUpdateSegs))
- throw new IllegalStateException(
- String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s",
- currentInstanceCopy.toString(), toUpdateSegs.toString(), tobe.toString()));
-
- toUpdateSegs.setStatus(SegmentStatusEnum.READY);
-
- List<CubeSegment> toRemoveSegs = Lists.newArrayList();
- for (CubeSegment segment : currentInstanceCopy.getSegments()) {
- if (!tobe.contains(segment))
- toRemoveSegs.add(segment);
- }
-
- logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs);
-
- update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()]))
- .setToUpdateSegs(toUpdateSegs);
- if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) {
- update.setStatus(RealizationStatusEnum.READY);
- }
- cubeManager.updateCube(update);
+ protected void updateMetaAfterOperation(KylinConfig config) throws IOException {
+ UpdateMetadataUtil.syncLocalMetadataToRemote(config, this);
}
@Override
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 7f5e4f0..f4efb76 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -266,7 +266,7 @@ public class NSparkExecutable extends AbstractExecutable {
CliCommandExecutor exec = new CliCommandExecutor();
exec.execute(cmd, patternedLogger, jobId);
- updateMetaAfterBuilding(config);
+ updateMetaAfterOperation(config);
//Add metrics information to execute result for JobMetricsFacade
getManager().addJobInfo(getId(), getJobMetricsInfo(config));
Map<String, String> extraInfo = makeExtraInfo(patternedLogger.getInfo());
@@ -278,7 +278,7 @@ public class NSparkExecutable extends AbstractExecutable {
}
}
- protected void updateMetaAfterBuilding(KylinConfig config) throws IOException {
+ protected void updateMetaAfterOperation(KylinConfig config) throws IOException {
}
protected Map<String, String> getJobMetricsInfo(KylinConfig config) {
@@ -343,14 +343,14 @@ public class NSparkExecutable extends AbstractExecutable {
protected void appendSparkConf(StringBuilder sb, String key, String value) {
// Multiple parameters in "--conf" need to be enclosed in single quotes
- sb.append(" --conf '").append(key).append("=").append(value).append("' ");
+ sb.append(" --conf '").append(key).append("=").append(value.trim()).append("' ");
}
private ExecuteResult runLocalMode(String appArgs, KylinConfig config) {
try {
Class<? extends Object> appClz = ClassUtil.forName(getSparkSubmitClassName(), Object.class);
appClz.getMethod("main", String[].class).invoke(null, (Object) new String[] { appArgs });
- updateMetaAfterBuilding(config);
+ updateMetaAfterOperation(config);
//Add metrics information to execute result for JobMetricsFacade
getManager().addJobInfo(getId(), getJobMetricsInfo(config));
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java
index 9699f23..90e5953 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java
@@ -18,6 +18,7 @@
package org.apache.kylin.engine.spark.job;
+import java.io.IOException;
import java.util.Arrays;
import java.util.Set;
@@ -25,6 +26,7 @@ import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.metadata.MetadataConstants;
import org.slf4j.Logger;
@@ -59,4 +61,8 @@ public class NSparkMergingStep extends NSparkExecutable {
return true;
}
+ @Override
+ protected void updateMetaAfterOperation(KylinConfig config) throws IOException {
+ UpdateMetadataUtil.syncLocalMetadataToRemote(config, this);
+ }
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
index 2ccd810..d01f714 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
@@ -21,13 +21,12 @@ package org.apache.kylin.engine.spark.job;
import java.io.IOException;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
+import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.ExecuteResult;
@@ -44,35 +43,31 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
String cubeId = getParam(MetadataConstants.P_CUBE_ID);
String mergedSegmentUuid = getParam(CubingExecutableUtil.SEGMENT_ID);
- KylinConfig config = KylinConfig.getInstanceFromEnv();
+ final KylinConfig config = wrapConfig(context);
CubeInstance cube = CubeManager.getInstance(config).getCubeByUuid(cubeId);
- updateMetadataAfterMerge(cubeId);
+ try {
+ // update segments
+ UpdateMetadataUtil.updateMetadataAfterMerge(cubeId, mergedSegmentUuid, config);
+ } catch (IOException e) {
+ throw new ExecuteException("Can not update metadata of cube: " + cube.getName());
+ }
- CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid);
- Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergedSegment);
- // delete segments which were merged
- for (CubeSegment segment : mergingSegments) {
- try {
- PathManager.deleteSegmentParquetStoragePath(cube, segment);
- } catch (IOException e) {
- throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName());
+ if (config.cleanStorageAfterDelOperation()) {
+ CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid);
+ Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergedSegment);
+ // delete segments which were merged
+ for (CubeSegment segment : mergingSegments) {
+ try {
+ PathManager.deleteSegmentParquetStoragePath(cube, segment);
+ } catch (IOException e) {
+ throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName());
+ }
}
}
-
return ExecuteResult.createSucceed();
}
- private void updateMetadataAfterMerge(String cubeId) {
- String buildStepUrl = getParam(MetadataConstants.P_OUTPUT_META_URL);
- KylinConfig buildConfig = KylinConfig.createKylinConfig(this.getConfig());
- buildConfig.setMetadataUrl(buildStepUrl);
- ResourceStore resourceStore = ResourceStore.getStore(buildConfig);
- String mergedSegmentId = getParam(CubingExecutableUtil.SEGMENT_ID);
- AfterMergeOrRefreshResourceMerger merger = new AfterMergeOrRefreshResourceMerger(buildConfig);
- merger.merge(cubeId, mergedSegmentId, resourceStore, getParam(MetadataConstants.P_JOB_TYPE));
- }
-
@Override
public void cleanup(ExecuteResult result) throws ExecuteException {
// delete job tmp dir
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
deleted file mode 100644
index 2fb1812..0000000
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.merger;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.model.IStorageAware;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.Segments;
-
-import com.google.common.collect.Lists;
-
-public class AfterMergeOrRefreshResourceMerger extends MetadataMerger {
-
- public AfterMergeOrRefreshResourceMerger(KylinConfig config) {
- super(config);
- }
-
- @Override
- public void merge(String cubeId, String segmentId, ResourceStore remoteResourceStore, String jobType) {
-
- CubeManager cubeManager = CubeManager.getInstance(getConfig());
- CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId);
- CubeUpdate update = new CubeUpdate(cubeInstance.latestCopyForWrite());
-
- CubeManager distManager = CubeManager.getInstance(remoteResourceStore.getConfig());
- CubeInstance distCube = distManager.getCubeByUuid(cubeId).latestCopyForWrite();
-
- List<CubeSegment> toUpdateSegments = Lists.newArrayList();
-
- CubeSegment mergedSegment = distCube.getSegmentById(segmentId);
- mergedSegment.setStatus(SegmentStatusEnum.READY);
- Map<String, String> additionalInfo = mergedSegment.getAdditionalInfo();
- additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
- mergedSegment.setAdditionalInfo(additionalInfo);
- toUpdateSegments.add(mergedSegment);
-
- List<CubeSegment> toRemoveSegments = getToRemoveSegs(distCube, mergedSegment);
- Collections.sort(toRemoveSegments);
- makeSnapshotForNewSegment(mergedSegment, toRemoveSegments);
-
- if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) {
- Optional<Long> reduce = toRemoveSegments.stream().map(CubeSegment::getSizeKB).filter(size -> size != -1)
- .reduce(Long::sum);
- Optional<Long> inputRecords = toRemoveSegments.stream().map(CubeSegment::getInputRecords).filter(records -> records != -1)
- .reduce(Long::sum);
- if (reduce.isPresent()) {
- long totalSourceSize = reduce.get();
- mergedSegment.setSizeKB(totalSourceSize);
- mergedSegment.setInputRecords(inputRecords.get());
- mergedSegment.setLastBuildTime(System.currentTimeMillis());
- }
- }
-
- update.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[0]));
- update.setToUpdateSegs(toUpdateSegments.toArray(new CubeSegment[0]));
-
- try {
- cubeManager.updateCube(update);
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- List<CubeSegment> getToRemoveSegs(CubeInstance cube, CubeSegment mergedSegment) {
- Segments tobe = cube.calculateToBeSegments(mergedSegment);
-
- if (!tobe.contains(mergedSegment))
- throw new IllegalStateException(
- "For Cube " + cube + ", segment " + mergedSegment + " is expected but not in the tobe " + tobe);
-
- if (mergedSegment.getStatus() == SegmentStatusEnum.NEW)
- mergedSegment.setStatus(SegmentStatusEnum.READY);
-
- List<CubeSegment> toRemoveSegs = Lists.newArrayList();
- for (CubeSegment s : cube.getSegments()) {
- if (!tobe.contains(s))
- toRemoveSegs.add(s);
- }
-
- return toRemoveSegs;
- }
-
- private void makeSnapshotForNewSegment(CubeSegment newSeg, List<CubeSegment> mergingSegments) {
- CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
- for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
- newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
- }
- }
-
- @Override
- public void merge(AbstractExecutable abstractExecutable) {
- String buildStepUrl = abstractExecutable.getParam(MetadataConstants.P_OUTPUT_META_URL);
- KylinConfig buildConfig = KylinConfig.createKylinConfig(this.getConfig());
- buildConfig.setMetadataUrl(buildStepUrl);
- ResourceStore resourceStore = ResourceStore.getStore(buildConfig);
- String cubeId = abstractExecutable.getParam(MetadataConstants.P_CUBE_ID);
- String segmentId = abstractExecutable.getParam(CubingExecutableUtil.SEGMENT_ID);
- merge(cubeId, segmentId, resourceStore, abstractExecutable.getParam(MetadataConstants.P_JOB_TYPE));
- }
-
-}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java
deleted file mode 100644
index b5e2d9e..0000000
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.merger;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-public abstract class MetadataMerger {
- private final KylinConfig config;
-
- protected MetadataMerger(KylinConfig config) {
- this.config = config;
- }
-
- public KylinConfig getConfig() {
- return config;
- }
-
- public abstract void merge(String cubeId, String segmentIds, ResourceStore remoteResourceStore, String jobType);
-
- public abstract void merge(AbstractExecutable abstractExecutable);
-
-}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
new file mode 100644
index 0000000..ab89f44
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.utils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.job.NSparkExecutable;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateMetadataUtil {
+
+ protected static final Logger logger = LoggerFactory.getLogger(UpdateMetadataUtil.class);
+
+ public static void syncLocalMetadataToRemote(KylinConfig config,
+ NSparkExecutable nsparkExecutable) throws IOException {
+ String cubeId = nsparkExecutable.getParam(MetadataConstants.P_CUBE_ID);
+ Set<String> segmentIds = Sets.newHashSet(StringUtils.split(
+ nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID), " "));
+ String segmentId = segmentIds.iterator().next();
+ String remoteResourceStore = nsparkExecutable.getDistMetaUrl();
+ String jobType = nsparkExecutable.getParam(MetadataConstants.P_JOB_TYPE);
+
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance currentInstanceCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite();
+
+ // load the meta from local meta path of this job
+ KylinConfig kylinDistConfig = MetaDumpUtil.loadKylinConfigFromHdfs(remoteResourceStore);
+ CubeInstance distCube = CubeManager.getInstance(kylinDistConfig).getCubeByUuid(cubeId);
+ CubeSegment toUpdateSegs = distCube.getSegmentById(segmentId);
+
+ List<CubeSegment> tobeSegments = currentInstanceCopy.calculateToBeSegments(toUpdateSegs);
+ if (!tobeSegments.contains(toUpdateSegs))
+ throw new IllegalStateException(
+ String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s",
+ currentInstanceCopy.toString(), toUpdateSegs.toString(), tobeSegments.toString()));
+
+ CubeUpdate update = new CubeUpdate(currentInstanceCopy);
+ List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+
+ if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) {
+ toUpdateSegs.getSnapshots().clear();
+ // update the snapshot table path
+ for (Map.Entry<String, String> entry :
+ currentInstanceCopy.getLatestReadySegment().getSnapshots().entrySet()) {
+ toUpdateSegs.putSnapshotResPath(entry.getKey(), entry.getValue());
+ }
+ } else {
+ toUpdateSegs.setStatus(SegmentStatusEnum.READY);
+ for (CubeSegment segment : currentInstanceCopy.getSegments()) {
+ if (!tobeSegments.contains(segment))
+ toRemoveSegs.add(segment);
+ }
+ Collections.sort(toRemoveSegs);
+ if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) {
+ update.setStatus(RealizationStatusEnum.READY);
+ }
+ }
+
+ logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs);
+
+ toUpdateSegs.setLastBuildTime(System.currentTimeMillis());
+ update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0]))
+ .setToUpdateSegs(toUpdateSegs);
+ cubeManager.updateCube(update);
+ }
+
+ public static void updateMetadataAfterMerge(String cubeId, String segmentId,
+ KylinConfig config) throws IOException {
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance currentInstanceCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite();
+
+ CubeSegment toUpdateSegs = currentInstanceCopy.getSegmentById(segmentId);
+
+ List<CubeSegment> tobeSegments = currentInstanceCopy.calculateToBeSegments(toUpdateSegs);
+ if (!tobeSegments.contains(toUpdateSegs))
+ throw new IllegalStateException(
+ String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s",
+ currentInstanceCopy.toString(), toUpdateSegs.toString(), tobeSegments.toString()));
+
+ CubeUpdate update = new CubeUpdate(currentInstanceCopy);
+ List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+
+ toUpdateSegs.setStatus(SegmentStatusEnum.READY);
+ for (CubeSegment segment : currentInstanceCopy.getSegments()) {
+ if (!tobeSegments.contains(segment))
+ toRemoveSegs.add(segment);
+ }
+ Collections.sort(toRemoveSegs);
+ if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) {
+ update.setStatus(RealizationStatusEnum.READY);
+ }
+
+ logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs);
+
+ toUpdateSegs.setLastBuildTime(System.currentTimeMillis());
+ update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0]))
+ .setToUpdateSegs(toUpdateSegs);
+ cubeManager.updateCube(update);
+ }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index 2772118..af46217 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -27,6 +27,7 @@ import java.util.UUID;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
@@ -34,8 +35,8 @@ import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.storage.StorageFactory;
-import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -57,9 +58,12 @@ import scala.collection.JavaConversions;
public class CubeMergeJob extends SparkApplication {
protected static final Logger logger = LoggerFactory.getLogger(CubeMergeJob.class);
+
private BuildLayoutWithUpdate buildLayoutWithUpdate;
+ private Map<Long, CubeMergeAssist> mergeCuboidsAssist;
private List<CubeSegment> mergingSegments = Lists.newArrayList();
private List<SegmentInfo> mergingSegInfos = Lists.newArrayList();
+ private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
@Override
protected void doExecute() throws Exception {
@@ -74,8 +78,10 @@ public class CubeMergeJob extends SparkApplication {
SegmentInfo segInfo = ManagerHub.getSegmentInfo(config, getParam(MetadataConstants.P_CUBE_ID), segment.getUuid());
mergingSegInfos.add(segInfo);
}
- //merge and save segments
+ // merge segments
mergeSegments(cubeId, newSegmentId);
+ // update segment
+ updateSegmentInfo(cubeId, newSegmentId);
}
private void mergeSegments(String cubeId, String segmentId) throws IOException {
@@ -84,7 +90,7 @@ public class CubeMergeJob extends SparkApplication {
CubeSegment mergedSeg = cube.getSegmentById(segmentId);
SegmentInfo mergedSegInfo = ManagerHub.getSegmentInfo(config, getParam(MetadataConstants.P_CUBE_ID), mergedSeg.getUuid());
- Map<Long, CubeMergeAssist> mergeCuboidsAssist = generateMergeAssist(mergingSegInfos, ss);
+ mergeCuboidsAssist = generateMergeAssist(mergingSegInfos, ss);
for (CubeMergeAssist assist : mergeCuboidsAssist.values()) {
SpanningTree spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(mergedSegInfo.toBuildLayouts()));
Dataset<Row> afterMerge = assist.merge(config, cube.getName());
@@ -178,6 +184,7 @@ public class CubeMergeJob extends SparkApplication {
int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss);
layout.setShardNum(partitionNum);
+ cuboidShardNum.put(layoutId, (short)partitionNum);
ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
ss.sparkContext().setJobDescription(null);
QueryExecutionCache.removeQueryExecution(queryExecutionId);
@@ -187,6 +194,37 @@ public class CubeMergeJob extends SparkApplication {
return layout;
}
+ private void updateSegmentInfo(String cubeId, String segmentId) throws IOException {
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cubeCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite();
+ CubeUpdate update = new CubeUpdate(cubeCopy);
+
+ List<CubeSegment> cubeSegments = Lists.newArrayList();
+ CubeSegment segment = cubeCopy.getSegmentById(segmentId);
+ long totalSourceSize = 0l;
+ long totalInputRecords = 0l;
+ long totalInputRecordsSize = 0l;
+ for (CubeMergeAssist assist : mergeCuboidsAssist.values()) {
+ totalSourceSize += assist.getLayout().getByteSize();
+ }
+ for (CubeSegment toRemoveSeg : mergingSegments) {
+ totalInputRecords += toRemoveSeg.getInputRecords();
+ totalInputRecordsSize += toRemoveSeg.getInputRecordsSize();
+ }
+ // Unit: KB
+ segment.setSizeKB(totalSourceSize / 1024);
+ segment.setInputRecords(totalInputRecords);
+ segment.setInputRecordsSize(totalInputRecordsSize);
+ segment.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID));
+ segment.setCuboidShardNums(cuboidShardNum);
+ Map<String, String> additionalInfo = segment.getAdditionalInfo();
+ additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
+ segment.setAdditionalInfo(additionalInfo);
+ cubeSegments.add(segment);
+ update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
+ cubeManager.updateCube(update);
+ }
+
@Override
protected String generateInfo() {
return LogJobInfoUtils.dfMergeJobInfo();
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index 2d26105..1d29074 100644
--- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -203,11 +203,13 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme
NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment, "ADMIN");
execMgr.addJob(mergeJob);
ExecutableState result = wait(mergeJob);
- Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergeSegment);
- for (CubeSegment segment : mergingSegments) {
- String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(),
- segment.getStorageLocationIdentifier());
- Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new Path(HadoopUtil.makeURI(path))));
+ if (config.cleanStorageAfterDelOperation()) {
+ Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergeSegment);
+ for (CubeSegment segment : mergingSegments) {
+ String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(),
+ segment.getStorageLocationIdentifier());
+ Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new Path(HadoopUtil.makeURI(path))));
+ }
}
checkJobTmpPathDeleted(config, mergeJob);
return result;