You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/09/25 07:00:28 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4761 Update some missing values of new segment when merge segments

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new b341568  KYLIN-4761 Update some missing values of new segment when merge segments
b341568 is described below

commit b3415685d5de9abbf47ee12a5fcdcca4a56accdc
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Sat Sep 19 23:04:22 2020 +0800

    KYLIN-4761 Update some missing values of new segment when merge segments
    
    (cherry picked from commit 878874ba1d833a60b941c0655b4338618cc32a80)
---
 .../kylin/engine/spark/job/NSparkCubingStep.java   |  43 +------
 .../kylin/engine/spark/job/NSparkExecutable.java   |   8 +-
 .../kylin/engine/spark/job/NSparkMergingStep.java  |   6 +
 .../NSparkUpdateMetaAndCleanupAfterMergeStep.java  |  41 +++----
 .../merger/AfterMergeOrRefreshResourceMerger.java  | 133 --------------------
 .../kylin/engine/spark/merger/MetadataMerger.java  |  40 ------
 .../engine/spark/utils/UpdateMetadataUtil.java     | 136 +++++++++++++++++++++
 .../kylin/engine/spark/job/CubeMergeJob.java       |  44 ++++++-
 .../engine/spark/LocalWithSparkSessionTest.java    |  12 +-
 9 files changed, 215 insertions(+), 248 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
index c3234f1..5c073e1 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkCubingStep.java
@@ -18,30 +18,23 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 
@@ -79,38 +72,8 @@ public class NSparkCubingStep extends NSparkExecutable {
     }
 
     @Override
-    protected void updateMetaAfterBuilding(KylinConfig config) throws IOException {
-        CubeManager cubeManager = CubeManager.getInstance(config);
-        CubeInstance currentInstanceCopy = cubeManager.getCube(getCubeName()).latestCopyForWrite();
-        KylinConfig kylinDistConfig = MetaDumpUtil.loadKylinConfigFromHdfs(getDistMetaUrl());
-        CubeInstance distCube = CubeManager.getInstance(kylinDistConfig).reloadCube(getCubeName());
-        CubeUpdate update = new CubeUpdate(currentInstanceCopy);
-        Set<String> segmentIds = Sets.newHashSet(org.apache.hadoop.util.StringUtils.split(getParam(MetadataConstants.P_SEGMENT_IDS)));
-        CubeSegment toUpdateSegs = distCube.getSegmentById(segmentIds.iterator().next());
-
-        List<CubeSegment> tobe = currentInstanceCopy.calculateToBeSegments(toUpdateSegs);
-
-        if (!tobe.contains(toUpdateSegs))
-            throw new IllegalStateException(
-                    String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s",
-                            currentInstanceCopy.toString(), toUpdateSegs.toString(), tobe.toString()));
-
-        toUpdateSegs.setStatus(SegmentStatusEnum.READY);
-
-        List<CubeSegment> toRemoveSegs = Lists.newArrayList();
-        for (CubeSegment segment : currentInstanceCopy.getSegments()) {
-            if (!tobe.contains(segment))
-                toRemoveSegs.add(segment);
-        }
-
-        logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs);
-
-        update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()]))
-                .setToUpdateSegs(toUpdateSegs);
-        if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) {
-            update.setStatus(RealizationStatusEnum.READY);
-        }
-        cubeManager.updateCube(update);
+    protected void updateMetaAfterOperation(KylinConfig config) throws IOException {
+        UpdateMetadataUtil.syncLocalMetadataToRemote(config, this);
     }
 
     @Override
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 7f5e4f0..f4efb76 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -266,7 +266,7 @@ public class NSparkExecutable extends AbstractExecutable {
 
             CliCommandExecutor exec = new CliCommandExecutor();
             exec.execute(cmd, patternedLogger, jobId);
-            updateMetaAfterBuilding(config);
+            updateMetaAfterOperation(config);
             //Add metrics information to execute result for JobMetricsFacade
             getManager().addJobInfo(getId(), getJobMetricsInfo(config));
             Map<String, String> extraInfo = makeExtraInfo(patternedLogger.getInfo());
@@ -278,7 +278,7 @@ public class NSparkExecutable extends AbstractExecutable {
         }
     }
 
-    protected void updateMetaAfterBuilding(KylinConfig config) throws IOException {
+    protected void updateMetaAfterOperation(KylinConfig config) throws IOException {
     }
 
     protected Map<String, String> getJobMetricsInfo(KylinConfig config) {
@@ -343,14 +343,14 @@ public class NSparkExecutable extends AbstractExecutable {
 
     protected void appendSparkConf(StringBuilder sb, String key, String value) {
         // Multiple parameters in "--conf" need to be enclosed in single quotes
-        sb.append(" --conf '").append(key).append("=").append(value).append("' ");
+        sb.append(" --conf '").append(key).append("=").append(value.trim()).append("' ");
     }
 
     private ExecuteResult runLocalMode(String appArgs, KylinConfig config) {
         try {
             Class<? extends Object> appClz = ClassUtil.forName(getSparkSubmitClassName(), Object.class);
             appClz.getMethod("main", String[].class).invoke(null, (Object) new String[] { appArgs });
-            updateMetaAfterBuilding(config);
+            updateMetaAfterOperation(config);
 
             //Add metrics information to execute result for JobMetricsFacade
             getManager().addJobInfo(getId(), getJobMetricsInfo(config));
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java
index 9699f23..90e5953 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkMergingStep.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.spark.job;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Set;
 
@@ -25,6 +26,7 @@ import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
@@ -59,4 +61,8 @@ public class NSparkMergingStep extends NSparkExecutable {
         return true;
     }
 
+    @Override
+    protected void updateMetaAfterOperation(KylinConfig config) throws IOException {
+        UpdateMetadataUtil.syncLocalMetadataToRemote(config, this);
+    }
 }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
index 2ccd810..d01f714 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkUpdateMetaAndCleanupAfterMergeStep.java
@@ -21,13 +21,12 @@ package org.apache.kylin.engine.spark.job;
 import java.io.IOException;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.engine.spark.merger.AfterMergeOrRefreshResourceMerger;
 import org.apache.kylin.engine.spark.metadata.cube.PathManager;
+import org.apache.kylin.engine.spark.utils.UpdateMetadataUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.ExecuteResult;
@@ -44,35 +43,31 @@ public class NSparkUpdateMetaAndCleanupAfterMergeStep extends NSparkExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         String cubeId = getParam(MetadataConstants.P_CUBE_ID);
         String mergedSegmentUuid = getParam(CubingExecutableUtil.SEGMENT_ID);
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final KylinConfig config = wrapConfig(context);
         CubeInstance cube = CubeManager.getInstance(config).getCubeByUuid(cubeId);
 
-        updateMetadataAfterMerge(cubeId);
+        try {
+            // update segments
+            UpdateMetadataUtil.updateMetadataAfterMerge(cubeId, mergedSegmentUuid, config);
+        } catch (IOException e) {
+            throw new ExecuteException("Can not update metadata of cube: " + cube.getName());
+        }
 
-        CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid);
-        Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergedSegment);
-        // delete segments which were merged
-        for (CubeSegment segment : mergingSegments) {
-            try {
-                PathManager.deleteSegmentParquetStoragePath(cube, segment);
-            } catch (IOException e) {
-                throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName());
+        if (config.cleanStorageAfterDelOperation()) {
+            CubeSegment mergedSegment = cube.getSegmentById(mergedSegmentUuid);
+            Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergedSegment);
+            // delete segments which were merged
+            for (CubeSegment segment : mergingSegments) {
+                try {
+                    PathManager.deleteSegmentParquetStoragePath(cube, segment);
+                } catch (IOException e) {
+                    throw new ExecuteException("Can not delete segment: " + segment.getName() + ", in cube: " + cube.getName());
+                }
             }
         }
-
         return ExecuteResult.createSucceed();
     }
 
-    private void updateMetadataAfterMerge(String cubeId) {
-        String buildStepUrl = getParam(MetadataConstants.P_OUTPUT_META_URL);
-        KylinConfig buildConfig = KylinConfig.createKylinConfig(this.getConfig());
-        buildConfig.setMetadataUrl(buildStepUrl);
-        ResourceStore resourceStore = ResourceStore.getStore(buildConfig);
-        String mergedSegmentId = getParam(CubingExecutableUtil.SEGMENT_ID);
-        AfterMergeOrRefreshResourceMerger merger = new AfterMergeOrRefreshResourceMerger(buildConfig);
-        merger.merge(cubeId, mergedSegmentId, resourceStore, getParam(MetadataConstants.P_JOB_TYPE));
-    }
-
     @Override
     public void cleanup(ExecuteResult result) throws ExecuteException {
         // delete job tmp dir
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
deleted file mode 100644
index 2fb1812..0000000
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/AfterMergeOrRefreshResourceMerger.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.merger;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.model.IStorageAware;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.Segments;
-
-import com.google.common.collect.Lists;
-
-public class AfterMergeOrRefreshResourceMerger extends MetadataMerger {
-
-    public AfterMergeOrRefreshResourceMerger(KylinConfig config) {
-        super(config);
-    }
-
-    @Override
-    public void merge(String cubeId, String segmentId, ResourceStore remoteResourceStore, String jobType) {
-
-        CubeManager cubeManager = CubeManager.getInstance(getConfig());
-        CubeInstance cubeInstance = cubeManager.getCubeByUuid(cubeId);
-        CubeUpdate update = new CubeUpdate(cubeInstance.latestCopyForWrite());
-
-        CubeManager distManager = CubeManager.getInstance(remoteResourceStore.getConfig());
-        CubeInstance distCube = distManager.getCubeByUuid(cubeId).latestCopyForWrite();
-
-        List<CubeSegment> toUpdateSegments = Lists.newArrayList();
-
-        CubeSegment mergedSegment = distCube.getSegmentById(segmentId);
-        mergedSegment.setStatus(SegmentStatusEnum.READY);
-        Map<String, String> additionalInfo = mergedSegment.getAdditionalInfo();
-        additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
-        mergedSegment.setAdditionalInfo(additionalInfo);
-        toUpdateSegments.add(mergedSegment);
-
-        List<CubeSegment> toRemoveSegments = getToRemoveSegs(distCube, mergedSegment);
-        Collections.sort(toRemoveSegments);
-        makeSnapshotForNewSegment(mergedSegment, toRemoveSegments);
-
-        if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) {
-            Optional<Long> reduce = toRemoveSegments.stream().map(CubeSegment::getSizeKB).filter(size -> size != -1)
-                    .reduce(Long::sum);
-            Optional<Long> inputRecords = toRemoveSegments.stream().map(CubeSegment::getInputRecords).filter(records -> records != -1)
-                    .reduce(Long::sum);
-            if (reduce.isPresent()) {
-                long totalSourceSize = reduce.get();
-                mergedSegment.setSizeKB(totalSourceSize);
-                mergedSegment.setInputRecords(inputRecords.get());
-                mergedSegment.setLastBuildTime(System.currentTimeMillis());
-            }
-        }
-
-        update.setToRemoveSegs(toRemoveSegments.toArray(new CubeSegment[0]));
-        update.setToUpdateSegs(toUpdateSegments.toArray(new CubeSegment[0]));
-
-        try {
-            cubeManager.updateCube(update);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-    }
-
-    List<CubeSegment> getToRemoveSegs(CubeInstance cube, CubeSegment mergedSegment) {
-        Segments tobe = cube.calculateToBeSegments(mergedSegment);
-
-        if (!tobe.contains(mergedSegment))
-            throw new IllegalStateException(
-                    "For Cube " + cube + ", segment " + mergedSegment + " is expected but not in the tobe " + tobe);
-
-        if (mergedSegment.getStatus() == SegmentStatusEnum.NEW)
-            mergedSegment.setStatus(SegmentStatusEnum.READY);
-
-        List<CubeSegment> toRemoveSegs = Lists.newArrayList();
-        for (CubeSegment s : cube.getSegments()) {
-            if (!tobe.contains(s))
-                toRemoveSegs.add(s);
-        }
-
-        return toRemoveSegs;
-    }
-
-    private void makeSnapshotForNewSegment(CubeSegment newSeg, List<CubeSegment> mergingSegments) {
-        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
-        for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
-            newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
-        }
-    }
-
-    @Override
-    public void merge(AbstractExecutable abstractExecutable) {
-        String buildStepUrl = abstractExecutable.getParam(MetadataConstants.P_OUTPUT_META_URL);
-        KylinConfig buildConfig = KylinConfig.createKylinConfig(this.getConfig());
-        buildConfig.setMetadataUrl(buildStepUrl);
-        ResourceStore resourceStore = ResourceStore.getStore(buildConfig);
-        String cubeId = abstractExecutable.getParam(MetadataConstants.P_CUBE_ID);
-        String segmentId = abstractExecutable.getParam(CubingExecutableUtil.SEGMENT_ID);
-        merge(cubeId, segmentId, resourceStore, abstractExecutable.getParam(MetadataConstants.P_JOB_TYPE));
-    }
-
-}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java
deleted file mode 100644
index b5e2d9e..0000000
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/merger/MetadataMerger.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.merger;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-public abstract class MetadataMerger {
-    private final KylinConfig config;
-
-    protected MetadataMerger(KylinConfig config) {
-        this.config = config;
-    }
-
-    public KylinConfig getConfig() {
-        return config;
-    }
-
-    public abstract void merge(String cubeId, String segmentIds, ResourceStore remoteResourceStore, String jobType);
-
-    public abstract void merge(AbstractExecutable abstractExecutable);
-
-}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
new file mode 100644
index 0000000..ab89f44
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/UpdateMetadataUtil.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.utils;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.spark.job.NSparkExecutable;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateMetadataUtil {
+
+    protected static final Logger logger = LoggerFactory.getLogger(UpdateMetadataUtil.class);
+
+    public static void syncLocalMetadataToRemote(KylinConfig config,
+                                         NSparkExecutable nsparkExecutable) throws IOException {
+        String cubeId = nsparkExecutable.getParam(MetadataConstants.P_CUBE_ID);
+        Set<String> segmentIds = Sets.newHashSet(StringUtils.split(
+                nsparkExecutable.getParam(CubingExecutableUtil.SEGMENT_ID), " "));
+        String segmentId = segmentIds.iterator().next();
+        String remoteResourceStore = nsparkExecutable.getDistMetaUrl();
+        String jobType = nsparkExecutable.getParam(MetadataConstants.P_JOB_TYPE);
+
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance currentInstanceCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite();
+
+        // load the meta from local meta path of this job
+        KylinConfig kylinDistConfig = MetaDumpUtil.loadKylinConfigFromHdfs(remoteResourceStore);
+        CubeInstance distCube = CubeManager.getInstance(kylinDistConfig).getCubeByUuid(cubeId);
+        CubeSegment toUpdateSegs = distCube.getSegmentById(segmentId);
+
+        List<CubeSegment> tobeSegments = currentInstanceCopy.calculateToBeSegments(toUpdateSegs);
+        if (!tobeSegments.contains(toUpdateSegs))
+            throw new IllegalStateException(
+                    String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s",
+                            currentInstanceCopy.toString(), toUpdateSegs.toString(), tobeSegments.toString()));
+
+        CubeUpdate update = new CubeUpdate(currentInstanceCopy);
+        List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+
+        if (String.valueOf(CubeBuildTypeEnum.MERGE).equals(jobType)) {
+            toUpdateSegs.getSnapshots().clear();
+            // update the snapshot table path
+            for (Map.Entry<String, String> entry :
+                    currentInstanceCopy.getLatestReadySegment().getSnapshots().entrySet()) {
+                toUpdateSegs.putSnapshotResPath(entry.getKey(), entry.getValue());
+            }
+        } else {
+            toUpdateSegs.setStatus(SegmentStatusEnum.READY);
+            for (CubeSegment segment : currentInstanceCopy.getSegments()) {
+                if (!tobeSegments.contains(segment))
+                    toRemoveSegs.add(segment);
+            }
+            Collections.sort(toRemoveSegs);
+            if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) {
+                update.setStatus(RealizationStatusEnum.READY);
+            }
+        }
+
+        logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs);
+
+        toUpdateSegs.setLastBuildTime(System.currentTimeMillis());
+        update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0]))
+                .setToUpdateSegs(toUpdateSegs);
+        cubeManager.updateCube(update);
+    }
+
+    public static void updateMetadataAfterMerge(String cubeId, String segmentId,
+                                                KylinConfig config) throws IOException {
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance currentInstanceCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite();
+
+        CubeSegment toUpdateSegs = currentInstanceCopy.getSegmentById(segmentId);
+
+        List<CubeSegment> tobeSegments = currentInstanceCopy.calculateToBeSegments(toUpdateSegs);
+        if (!tobeSegments.contains(toUpdateSegs))
+            throw new IllegalStateException(
+                    String.format(Locale.ROOT, "For cube %s, segment %s is expected but not in the tobe %s",
+                            currentInstanceCopy.toString(), toUpdateSegs.toString(), tobeSegments.toString()));
+
+        CubeUpdate update = new CubeUpdate(currentInstanceCopy);
+        List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+
+        toUpdateSegs.setStatus(SegmentStatusEnum.READY);
+        for (CubeSegment segment : currentInstanceCopy.getSegments()) {
+            if (!tobeSegments.contains(segment))
+                toRemoveSegs.add(segment);
+        }
+        Collections.sort(toRemoveSegs);
+        if (currentInstanceCopy.getConfig().isJobAutoReadyCubeEnabled()) {
+            update.setStatus(RealizationStatusEnum.READY);
+        }
+
+        logger.info("Promoting cube {}, new segment {}, to remove segments {}", currentInstanceCopy, toUpdateSegs, toRemoveSegs);
+
+        toUpdateSegs.setLastBuildTime(System.currentTimeMillis());
+        update.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[0]))
+                .setToUpdateSegs(toUpdateSegs);
+        cubeManager.updateCube(update);
+    }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index 2772118..af46217 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -27,6 +27,7 @@ import java.util.UUID;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.engine.spark.metadata.SegmentInfo;
 import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
 import org.apache.kylin.engine.spark.metadata.cube.PathManager;
@@ -34,8 +35,8 @@ import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.storage.StorageFactory;
-import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -57,9 +58,12 @@ import scala.collection.JavaConversions;
 
 public class CubeMergeJob extends SparkApplication {
     protected static final Logger logger = LoggerFactory.getLogger(CubeMergeJob.class);
+
     private BuildLayoutWithUpdate buildLayoutWithUpdate;
+    private Map<Long, CubeMergeAssist> mergeCuboidsAssist;
     private List<CubeSegment> mergingSegments = Lists.newArrayList();
     private List<SegmentInfo> mergingSegInfos = Lists.newArrayList();
+    private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
 
     @Override
     protected void doExecute() throws Exception {
@@ -74,8 +78,10 @@ public class CubeMergeJob extends SparkApplication {
             SegmentInfo segInfo = ManagerHub.getSegmentInfo(config, getParam(MetadataConstants.P_CUBE_ID), segment.getUuid());
             mergingSegInfos.add(segInfo);
         }
-        //merge and save segments
+        // merge segments
         mergeSegments(cubeId, newSegmentId);
+        // update segment
+        updateSegmentInfo(cubeId, newSegmentId);
     }
 
     private void mergeSegments(String cubeId, String segmentId) throws IOException {
@@ -84,7 +90,7 @@ public class CubeMergeJob extends SparkApplication {
         CubeSegment mergedSeg = cube.getSegmentById(segmentId);
         SegmentInfo mergedSegInfo = ManagerHub.getSegmentInfo(config, getParam(MetadataConstants.P_CUBE_ID), mergedSeg.getUuid());
 
-        Map<Long, CubeMergeAssist> mergeCuboidsAssist = generateMergeAssist(mergingSegInfos, ss);
+        mergeCuboidsAssist = generateMergeAssist(mergingSegInfos, ss);
         for (CubeMergeAssist assist : mergeCuboidsAssist.values()) {
             SpanningTree spanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(mergedSegInfo.toBuildLayouts()));
             Dataset<Row> afterMerge = assist.merge(config, cube.getName());
@@ -178,6 +184,7 @@ public class CubeMergeJob extends SparkApplication {
 
         int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss);
         layout.setShardNum(partitionNum);
+        cuboidShardNum.put(layoutId, (short)partitionNum);
         ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
         ss.sparkContext().setJobDescription(null);
         QueryExecutionCache.removeQueryExecution(queryExecutionId);
@@ -187,6 +194,37 @@ public class CubeMergeJob extends SparkApplication {
         return layout;
     }
 
+    private void updateSegmentInfo(String cubeId, String segmentId) throws IOException {
+        CubeManager cubeManager = CubeManager.getInstance(config);
+        CubeInstance cubeCopy = cubeManager.getCubeByUuid(cubeId).latestCopyForWrite();
+        CubeUpdate update = new CubeUpdate(cubeCopy);
+
+        List<CubeSegment> cubeSegments = Lists.newArrayList();
+        CubeSegment segment = cubeCopy.getSegmentById(segmentId);
+        long totalSourceSize = 0l;
+        long totalInputRecords = 0l;
+        long totalInputRecordsSize = 0l;
+        for (CubeMergeAssist assist : mergeCuboidsAssist.values()) {
+            totalSourceSize += assist.getLayout().getByteSize();
+        }
+        for (CubeSegment toRemoveSeg : mergingSegments) {
+            totalInputRecords += toRemoveSeg.getInputRecords();
+            totalInputRecordsSize += toRemoveSeg.getInputRecordsSize();
+        }
+        // Unit: KB
+        segment.setSizeKB(totalSourceSize / 1024);
+        segment.setInputRecords(totalInputRecords);
+        segment.setInputRecordsSize(totalInputRecordsSize);
+        segment.setLastBuildJobID(getParam(MetadataConstants.P_JOB_ID));
+        segment.setCuboidShardNums(cuboidShardNum);
+        Map<String, String> additionalInfo = segment.getAdditionalInfo();
+        additionalInfo.put("storageType", "" + IStorageAware.ID_PARQUET);
+        segment.setAdditionalInfo(additionalInfo);
+        cubeSegments.add(segment);
+        update.setToUpdateSegs(cubeSegments.toArray(new CubeSegment[0]));
+        cubeManager.updateCube(update);
+    }
+
     @Override
     protected String generateInfo() {
         return LogJobInfoUtils.dfMergeJobInfo();
diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
index 2d26105..1d29074 100644
--- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
+++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java
@@ -203,11 +203,13 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme
         NSparkMergingJob mergeJob = NSparkMergingJob.merge(mergeSegment,  "ADMIN");
         execMgr.addJob(mergeJob);
         ExecutableState result = wait(mergeJob);
-        Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergeSegment);
-        for (CubeSegment segment : mergingSegments) {
-            String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(),
-                    segment.getStorageLocationIdentifier());
-            Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new Path(HadoopUtil.makeURI(path))));
+        if (config.cleanStorageAfterDelOperation()) {
+            Segments<CubeSegment> mergingSegments = cube.getMergingSegments(mergeSegment);
+            for (CubeSegment segment : mergingSegments) {
+                String path = PathManager.getSegmentParquetStoragePath(cube, segment.getName(),
+                        segment.getStorageLocationIdentifier());
+                Assert.assertFalse(HadoopUtil.getFileSystem(path).exists(new Path(HadoopUtil.makeURI(path))));
+            }
         }
         checkJobTmpPathDeleted(config, mergeJob);
         return result;