You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/01/23 09:32:25 UTC
[38/50] [abbrv] incubator-kylin git commit: KYLIN-567, coding done,
ready for testing
KYLIN-567, coding done, ready for testing
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8072fb77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8072fb77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8072fb77
Branch: refs/heads/inverted-index
Commit: 8072fb7729d78017a062d249451f7c61c27ef3ca
Parents: 8da37ea
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jan 23 14:13:32 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Jan 23 14:13:32 2015 +0800
----------------------------------------------------------------------
.../java/com/kylinolap/cube/CubeManager.java | 151 ++++++++++++++---
.../kylinolap/cube/CubeSegmentValidator.java | 161 -------------------
.../java/com/kylinolap/cube/model/CubeDesc.java | 13 +-
.../kylinolap/job/cube/MergeDictionaryStep.java | 14 +-
.../job/cube/UpdateCubeInfoAfterBuildStep.java | 7 +-
.../job/cube/UpdateCubeInfoAfterMergeStep.java | 40 ++---
6 files changed, 164 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/cube/src/main/java/com/kylinolap/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/CubeManager.java b/cube/src/main/java/com/kylinolap/cube/CubeManager.java
index be22ea0..70fb8b6 100644
--- a/cube/src/main/java/com/kylinolap/cube/CubeManager.java
+++ b/cube/src/main/java/com/kylinolap/cube/CubeManager.java
@@ -18,6 +18,7 @@ package com.kylinolap.cube;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
@@ -32,6 +33,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.kylinolap.common.KylinConfig;
import com.kylinolap.common.persistence.JsonSerializer;
@@ -41,6 +43,7 @@ import com.kylinolap.common.restclient.Broadcaster;
import com.kylinolap.common.restclient.CaseInsensitiveStringCache;
import com.kylinolap.cube.model.CubeBuildTypeEnum;
import com.kylinolap.cube.model.CubeDesc;
+import com.kylinolap.cube.model.CubePartitionDesc;
import com.kylinolap.cube.model.DimensionDesc;
import com.kylinolap.dict.DateStrDictionary;
import com.kylinolap.dict.Dictionary;
@@ -244,12 +247,7 @@ public class CubeManager implements IRealizationProvider {
public CubeInstance updateCube(CubeInstance cube) throws IOException {
logger.info("Updating cube instance '" + cube.getName());
-
- // save resource
saveResource(cube);
-
- logger.info("Cube with " + cube.getSegments().size() + " segments is saved");
-
return cube;
}
@@ -294,9 +292,9 @@ public class CubeManager implements IRealizationProvider {
}
}
}
- CubeSegment newSegment = buildSegment(cubeInstance, start, end);
- validateNewSegments(cubeInstance, CubeBuildTypeEnum.MERGE, newSegment);
+ CubeSegment newSegment = newSegment(cubeInstance, start, end);
+ validateNewSegments(cubeInstance, newSegment);
cubeInstance.getSegments().add(newSegment);
Collections.sort(cubeInstance.getSegments());
@@ -315,15 +313,16 @@ public class CubeManager implements IRealizationProvider {
if (cubeInstance.getDescriptor().getCubePartitionDesc().isPartitioned()) {
if (readySegments.isEmpty()) {
startDate = cubeInstance.getDescriptor().getCubePartitionDesc().getPartitionDateStart();
- newSegment = buildSegment(cubeInstance, startDate, endDate);
+ newSegment = newSegment(cubeInstance, startDate, endDate);
} else {
startDate = readySegments.get(readySegments.size() - 1).getDateRangeEnd();
- newSegment = buildSegment(cubeInstance, startDate, endDate);
+ newSegment = newSegment(cubeInstance, startDate, endDate);
}
} else {
- newSegment = buildSegment(cubeInstance, 0, Long.MAX_VALUE);
+ newSegment = newSegment(cubeInstance, 0, Long.MAX_VALUE);
}
- validateNewSegments(cubeInstance, CubeBuildTypeEnum.BUILD, newSegment);
+
+ validateNewSegments(cubeInstance, newSegment);
cubeInstance.getSegments().add(newSegment);
Collections.sort(cubeInstance.getSegments());
@@ -444,13 +443,7 @@ public class CubeManager implements IRealizationProvider {
removeCubeCache(droppedCube);
}
- /**
- * @param cubeInstance
- * @param startDate (pass 0 if full build)
- * @param endDate (pass 0 if full build)
- * @return
- */
- private CubeSegment buildSegment(CubeInstance cubeInstance, long startDate, long endDate) {
+ private CubeSegment newSegment(CubeInstance cubeInstance, long startDate, long endDate) {
CubeSegment segment = new CubeSegment();
String incrementalSegName = CubeSegment.getSegmentName(startDate, endDate);
segment.setUuid(UUID.randomUUID().toString());
@@ -482,19 +475,125 @@ public class CubeManager implements IRealizationProvider {
return tableName;
}
+ public void promoteNewlyBuiltSegments(CubeInstance cube, CubeSegment... newSegments) throws IOException {
+ List<CubeSegment> tobe = calculateToBeSegments(cube);
+
+ for (CubeSegment seg : newSegments) {
+ if (tobe.contains(seg) == false)
+ throw new IllegalStateException("For cube " + cube + ", segment " + seg + " is expected but not in the tobe " + tobe);
+
+ if (StringUtils.isBlank(seg.getStorageLocationIdentifier()))
+ throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing StorageLocationIdentifier");
+
+ if (StringUtils.isBlank(seg.getLastBuildJobID()))
+ throw new IllegalStateException("For cube " + cube + ", segment " + seg + " missing LastBuildJobID");
+
+ seg.setStatus(SegmentStatusEnum.READY);
+ }
+
+ for (CubeSegment seg : tobe) {
+ if (isReady(seg) == false)
+ throw new IllegalStateException("For cube " + cube + ", segment " + seg + " should be READY but is not");
+ }
+
+ cube.setSegments(tobe);
+ cube.setStatus(RealizationStatusEnum.READY);
+
+ logger.info("Promoting cube " + cube + ", new segments " + newSegments);
+ saveResource(cube);
+ }
+
+ private void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
+ List<CubeSegment> tobe = calculateToBeSegments(cube, newSegments);
+ List<CubeSegment> newList = Arrays.asList(newSegments);
+ if (tobe.containsAll(newList) == false) {
+ throw new IllegalStateException("For cube " + cube + ", the new segments " + newList + " do not fit in its current " + cube.getSegments() + "; the resulted tobe is " + tobe);
+ }
+ }
+
/**
+ * Smartly figure out the TOBE segments once all new segments are built.
+ * - Ensures no gap, no overlap
+ * - Favors new segments over the old
+ * - Favors big segments over the small
*/
- private void validateNewSegments(CubeInstance cubeInstance, CubeBuildTypeEnum buildType, CubeSegment newSegment) {
- if (cubeInstance.getDescriptor().getCubePartitionDesc().isPartitioned() == false) {
- // do nothing for non-incremental build
- return;
+ private List<CubeSegment> calculateToBeSegments(CubeInstance cube, CubeSegment... extraSegments) {
+ CubeDesc cubeDesc = cube.getDescriptor();
+ CubePartitionDesc partDesc = cubeDesc.getCubePartitionDesc();
+ if (partDesc.isPartitioned() == false) {
+ return cube.getSegments(); // do nothing for non-incremental build
}
- if (newSegment.getDateRangeEnd() <= newSegment.getDateRangeStart()) {
- throw new IllegalStateException(" end date.");
+
+ List<CubeSegment> tobe = Lists.newArrayList(cube.getSegments());
+ if (extraSegments != null)
+ tobe.addAll(Arrays.asList(extraSegments));
+ if (tobe.size() == 0)
+ return tobe;
+
+ // sort by start time, then end time
+ Collections.sort(tobe);
+
+ // check first segment start time
+ CubeSegment firstSeg = tobe.get(0);
+ if (firstSeg.getDateRangeStart() != partDesc.getPartitionDateStart()) {
+ throw new IllegalStateException("For " + cube + ", the first segment, " + firstSeg + ", must start at " + partDesc.getPartitionDateStart());
}
- CubeSegmentValidator cubeSegmentValidator = CubeSegmentValidator.getCubeSegmentValidator(buildType);
- cubeSegmentValidator.validate(cubeInstance, newSegment);
+ for (int i = 0, j = 1; j < tobe.size();) {
+ CubeSegment is = tobe.get(i);
+ CubeSegment js = tobe.get(j);
+
+ // check i is either ready or new
+ if (!isNew(is) && !isReady(is)) {
+ tobe.remove(i);
+ continue;
+ }
+
+ // check j is either ready or new
+ if (!isNew(js) && !isReady(js)) {
+ tobe.remove(j);
+ continue;
+ }
+
+ // if i, j competes
+ if (is.getDateRangeStart() == js.getDateRangeStart()) {
+ // if both new or ready, favor the bigger segment
+ if (isReady(is) && isReady(js) || isNew(is) && isNew(js)) {
+ if (is.getDateRangeEnd() <= js.getDateRangeEnd()) {
+ tobe.remove(i);
+ } else {
+ tobe.remove(j);
+ }
+ }
+ // otherwise, favor the new segment
+ else if (isNew(is)) {
+ tobe.remove(j);
+ } else {
+ tobe.remove(i);
+ }
+ continue;
+ }
+
+ // if i, j in sequence
+ if (is.getDateRangeEnd() == js.getDateRangeStart()) {
+ i++;
+ j++;
+ continue;
+ }
+
+ // seems j not fitting
+ tobe.remove(j);
+ }
+
+ return tobe;
+ }
+
+ private boolean isReady(CubeSegment seg) {
+ return seg.getStatus() == SegmentStatusEnum.READY;
+ }
+
+ private boolean isNew(CubeSegment seg) {
+ return seg.getStatus() == SegmentStatusEnum.NEW || seg.getStatus() == SegmentStatusEnum.READY_PENDING;
}
private void loadAllCubeInstance() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java b/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java
deleted file mode 100644
index c2c6495..0000000
--- a/cube/src/main/java/com/kylinolap/cube/CubeSegmentValidator.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.cube;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-
-import com.kylinolap.cube.model.CubeBuildTypeEnum;
-import com.kylinolap.cube.model.CubeDesc;
-import com.kylinolap.cube.model.CubePartitionDesc;
-import com.kylinolap.cube.model.DimensionDesc;
-import com.kylinolap.dict.DictionaryManager;
-import com.kylinolap.metadata.model.SegmentStatusEnum;
-import com.kylinolap.metadata.model.TblColRef;
-
-/**
- * @author xduo
- */
-public abstract class CubeSegmentValidator {
-
- private CubeSegmentValidator() {
- }
-
- public static CubeSegmentValidator getCubeSegmentValidator(CubeBuildTypeEnum buildType) {
- switch (buildType) {
- case MERGE:
- return new MergeOperationValidator();
- case BUILD:
- return new BuildOperationValidator();
- default:
- throw new RuntimeException("invalid build type:" + buildType);
- }
- }
-
- abstract void validate(CubeInstance cubeInstance, CubeSegment newSegment);
-
- private static class MergeOperationValidator extends CubeSegmentValidator {
-
- private void checkContingency(CubeInstance cubeInstance, CubeSegment newSegment) {
- if (cubeInstance.getSegments().size() < 2) {
- throw new IllegalStateException("No segments to merge.");
- }
- CubeSegment startSeg = null;
- CubeSegment endSeg = null;
- for (CubeSegment segment : cubeInstance.getSegments()) {
- if (segment.getDateRangeStart() == newSegment.getDateRangeStart()) {
- startSeg = segment;
- }
- if (segment.getDateRangeEnd() == newSegment.getDateRangeEnd()) {
- endSeg = segment;
- }
- }
-
- if (null == startSeg || null == endSeg || startSeg.getDateRangeStart() >= endSeg.getDateRangeStart()) {
- throw new IllegalStateException("Invalid date range.");
- }
- }
-
- private void checkLoopTableConsistency(CubeInstance cube, CubeSegment newSegment) {
-
- DictionaryManager dictMgr = DictionaryManager.getInstance(cube.getConfig());
- List<CubeSegment> segmentList = cube.getMergingSegments(newSegment);
-
- HashSet<TblColRef> cols = new HashSet<TblColRef>();
- CubeDesc cubeDesc = cube.getDescriptor();
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- for (TblColRef col : dim.getColumnRefs()) {
- // include those dictionaries that do not need mergning
- try {
- if (newSegment.getCubeDesc().getRowkey().isUseDictionary(col)) {
- String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
- if (!cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
- cols.add(col);
- }
- }
- } catch (IOException e) {
- throw new IllegalStateException("checkLoopTableConsistency not passed when allocating a new segment.");
- }
- }
- }
-
- // check if all dictionaries on lookup table columns are identical
- for (TblColRef col : cols) {
- String dictOfFirstSegment = null;
- for (CubeSegment segment : segmentList) {
- String temp = segment.getDictResPath(col);
- if (temp == null) {
- throw new IllegalStateException("Dictionary is null on column: " + col + " Segment: " + segment);
- }
-
- if (dictOfFirstSegment == null) {
- dictOfFirstSegment = temp;
- } else {
- if (!dictOfFirstSegment.equalsIgnoreCase(temp)) {
- throw new IllegalStateException("Segments with different dictionaries(on lookup table) cannot be merged");
- }
- }
- }
- }
-
- // check if all segments' snapshot are identical
- CubeSegment firstSegment = null;
- for (CubeSegment segment : segmentList) {
- if (firstSegment == null) {
- firstSegment = segment;
- } else {
- Collection<String> a = firstSegment.getSnapshots().values();
- Collection<String> b = segment.getSnapshots().values();
- if (!((a.size() == b.size()) && a.containsAll(b)))
- throw new IllegalStateException("Segments with different snapshots cannot be merged");
- }
- }
-
- }
-
- @Override
- public void validate(CubeInstance cubeInstance, CubeSegment newSegment) {
- this.checkContingency(cubeInstance, newSegment);
- this.checkLoopTableConsistency(cubeInstance, newSegment);
- }
- }
-
- private static class BuildOperationValidator extends CubeSegmentValidator {
-
- @Override
- void validate(CubeInstance cubeInstance, CubeSegment newSegment) {
- List<CubeSegment> readySegments = cubeInstance.getSegments(SegmentStatusEnum.READY);
- CubePartitionDesc cubePartitionDesc = cubeInstance.getDescriptor().getCubePartitionDesc();
- final long initStartDate = cubePartitionDesc.isPartitioned() ? cubePartitionDesc.getPartitionDateStart() : 0;
- long startDate = initStartDate;
- for (CubeSegment readySegment: readySegments) {
- if (startDate == readySegment.getDateRangeStart() && startDate < readySegment.getDateRangeEnd()) {
- startDate = readySegment.getDateRangeEnd();
- } else {
- throw new IllegalStateException("there is gap in cube segments");
- }
- }
- if (newSegment.getDateRangeStart() == startDate && startDate < newSegment.getDateRangeEnd()) {
- return;
- }
- throw new IllegalStateException("invalid segment date range from " + newSegment.getDateRangeStart() + " to " + newSegment.getDateRangeEnd());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java b/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java
index 999a964..5301081 100644
--- a/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java
+++ b/cube/src/main/java/com/kylinolap/cube/model/CubeDesc.java
@@ -31,11 +31,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import com.kylinolap.common.util.CaseInsensitiveStringMap;
-import com.kylinolap.metadata.model.*;
import org.apache.commons.lang.ArrayUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.hbase.util.Strings;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -47,11 +45,18 @@ import com.kylinolap.common.KylinConfig;
import com.kylinolap.common.persistence.ResourceStore;
import com.kylinolap.common.persistence.RootPersistentEntity;
import com.kylinolap.common.util.Array;
+import com.kylinolap.common.util.CaseInsensitiveStringMap;
import com.kylinolap.common.util.JsonUtil;
import com.kylinolap.metadata.MetadataConstances;
import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.ColumnDesc;
+import com.kylinolap.metadata.model.DataModelDesc;
+import com.kylinolap.metadata.model.DataType;
import com.kylinolap.metadata.model.FunctionDesc;
+import com.kylinolap.metadata.model.JoinDesc;
+import com.kylinolap.metadata.model.MeasureDesc;
import com.kylinolap.metadata.model.ParameterDesc;
+import com.kylinolap.metadata.model.TableDesc;
import com.kylinolap.metadata.model.TblColRef;
/**
@@ -705,7 +710,7 @@ public class CubeDesc extends RootPersistentEntity {
}
// verify holistic count distinct as a dependent measure
- if (m.isHolisticCountDistinct() && Strings.isEmpty(m.getDependentMeasureRef())) {
+ if (m.isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) {
throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java b/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java
index c13bdc0..a243364 100644
--- a/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java
+++ b/job/src/main/java/com/kylinolap/job/cube/MergeDictionaryStep.java
@@ -46,10 +46,14 @@ public class MergeDictionaryStep extends AbstractExecutable {
final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
final List<CubeSegment> mergingSegments = getMergingSegments(cube);
+ Collections.sort(mergingSegments);
+
try {
+ checkLookupSnapshotsMustIncremental(mergingSegments);
+
makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
+
mgr.updateCube(cube);
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
@@ -67,6 +71,11 @@ public class MergeDictionaryStep extends AbstractExecutable {
return result;
}
+ private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
+
+ // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
+ }
+
/**
* For the new segment, we need to create dictionaries for it, too. For
* those dictionaries on fact table, create it by merging underlying
@@ -130,7 +139,8 @@ public class MergeDictionaryStep extends AbstractExecutable {
* @param newSeg
*/
private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
- for (Map.Entry<String, String> entry : mergingSegments.get(0).getSnapshots().entrySet()) {
+ CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
+ for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java
index 5e9acac..1ac2257 100644
--- a/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ b/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterBuildStep.java
@@ -14,8 +14,6 @@ import com.kylinolap.job.exception.ExecuteException;
import com.kylinolap.job.execution.ExecutableContext;
import com.kylinolap.job.execution.ExecuteResult;
import com.kylinolap.job.impl.threadpool.AbstractExecutable;
-import com.kylinolap.metadata.model.SegmentStatusEnum;
-import com.kylinolap.metadata.realization.RealizationStatusEnum;
/**
* Created by qianzhou on 1/4/15.
@@ -102,17 +100,14 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size.");
long size = Long.parseLong(cubeSizeString) / 1024;
-
segment.setLastBuildJobID(getCubingJobId());
segment.setLastBuildTime(System.currentTimeMillis());
segment.setSizeKB(size);
segment.setSourceRecords(sourceCount);
segment.setSourceRecordsSize(sourceSize);
- segment.setStatus(SegmentStatusEnum.READY);
- cube.setStatus(RealizationStatusEnum.READY);
try {
- cubeManager.updateCube(cube);
+ cubeManager.promoteNewlyBuiltSegments(cube, segment);
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
logger.error("fail to update cube after build", e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8072fb77/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java
index 90c22ab..9e0fd63 100644
--- a/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java
+++ b/job/src/main/java/com/kylinolap/job/cube/UpdateCubeInfoAfterMergeStep.java
@@ -1,5 +1,12 @@
package com.kylinolap.job.cube;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.kylinolap.common.KylinConfig;
@@ -12,13 +19,6 @@ import com.kylinolap.job.exception.ExecuteException;
import com.kylinolap.job.execution.ExecutableContext;
import com.kylinolap.job.execution.ExecuteResult;
import com.kylinolap.job.impl.threadpool.AbstractExecutable;
-import com.kylinolap.metadata.model.SegmentStatusEnum;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
/**
* Created by qianzhou on 1/7/15.
@@ -43,10 +43,6 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
final CubeInstance cube = cubeManager.getCube(getCubeName());
- List<String> mergingSegmentIds = getMergingSegmentIds();
- if (mergingSegmentIds.isEmpty()) {
- return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
- }
CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
if (mergedSegment == null) {
return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
@@ -55,30 +51,28 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
Preconditions.checkState(StringUtils.isNotEmpty(cubeSizeString), "Can't get cube segment size.");
long cubeSize = Long.parseLong(cubeSizeString) / 1024;
- List<CubeSegment> toBeRemoved = Lists.newArrayListWithExpectedSize(mergingSegmentIds.size());
- for (CubeSegment segment : cube.getSegments()) {
- if (mergingSegmentIds.contains(segment.getUuid())) {
- toBeRemoved.add(segment);
- }
+ List<String> mergingSegmentIds = getMergingSegmentIds();
+ if (mergingSegmentIds.isEmpty()) {
+ return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
}
-
+
long sourceCount = 0L;
long sourceSize = 0L;
- for (CubeSegment segment : toBeRemoved) {
+ for (String id : mergingSegmentIds) {
+ CubeSegment segment = cube.getSegmentById(id);
sourceCount += segment.getSourceRecords();
sourceSize += segment.getSourceRecordsSize();
}
- //update segment info
+
+ // update segment info
mergedSegment.setSizeKB(cubeSize);
mergedSegment.setSourceRecords(sourceCount);
mergedSegment.setSourceRecordsSize(sourceSize);
mergedSegment.setLastBuildJobID(getCubingJobId());
- mergedSegment.setStatus(SegmentStatusEnum.READY);
mergedSegment.setLastBuildTime(System.currentTimeMillis());
- //remove old segment
- cube.getSegments().removeAll(toBeRemoved);
+
try {
- cubeManager.updateCube(cube);
+ cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
return new ExecuteResult(ExecuteResult.State.SUCCEED);
} catch (IOException e) {
logger.error("fail to update cube after merge", e);