You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/06/02 10:08:25 UTC
[1/2] incubator-kylin git commit: KYLIN-805 Drop intermediate hive
table and rename to Garbage Collection step
Repository: incubator-kylin
Updated Branches:
refs/heads/0.8.0 26e4ff7e0 -> 36af047a7
KYLIN-805 Drop intermediate hive table and rename to Garbage Collection step
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8ad0eabb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8ad0eabb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8ad0eabb
Branch: refs/heads/0.8.0
Commit: 8ad0eabb33ac4b39b4b98e74311b67e1efe70c66
Parents: 26e4ff7
Author: shaofengshi <sh...@apache.org>
Authored: Tue Jun 2 12:20:29 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Jun 2 16:03:01 2015 +0800
----------------------------------------------------------------------
.../kylin/job/constant/ExecutableConstants.java | 2 +-
.../apache/kylin/job/cube/CubingJobBuilder.java | 16 ++-
.../kylin/job/cube/DropOldHTableStep.java | 97 -------------
.../kylin/job/cube/GarbageCollectionStep.java | 138 +++++++++++++++++++
.../job/hadoop/cube/StorageCleanupJob.java | 5 +-
5 files changed, 154 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8ad0eabb/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 49decd1..91ef2e1 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -57,7 +57,7 @@ public final class ExecutableConstants {
public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
- public static final String STEP_NAME_DROP_OLD_HBASE_TABLE = "Drop Old HTables";
+ public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage collection";
public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8ad0eabb/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index c13415e..1e73436 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -80,6 +80,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId));
}
+
+ final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
+ final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
+ result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable));
+
return result;
}
@@ -131,7 +136,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
// update cube info
result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
- result.addTask(createDropUnusedHTableStep(mergeSegment, mergingHTables));
+ result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null));
return result;
}
@@ -172,7 +177,7 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
// update cube info
result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
- result.addTask(createDropUnusedHTableStep(seg, mergingHTables));
+ result.addTask(createGarbageCollectionStep(seg, mergingHTables, null));
return result;
}
@@ -580,10 +585,11 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
}
- private DropOldHTableStep createDropUnusedHTableStep(CubeSegment seg, List<String> oldHtables) {
- DropOldHTableStep result = new DropOldHTableStep();
- result.setName(ExecutableConstants.STEP_NAME_DROP_OLD_HBASE_TABLE);
+ private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable) {
+ GarbageCollectionStep result = new GarbageCollectionStep();
+ result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
result.setOldHTables(oldHtables);
+ result.setOldHiveTable(hiveIntermediateTable);
return result;
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8ad0eabb/job/src/main/java/org/apache/kylin/job/cube/DropOldHTableStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/DropOldHTableStep.java b/job/src/main/java/org/apache/kylin/job/cube/DropOldHTableStep.java
deleted file mode 100644
index 1066a39..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/DropOldHTableStep.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cube;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- */
-public class DropOldHTableStep extends AbstractExecutable {
-
- private static final String OLD_HTABLES = "oldHTables";
-
- private static final Logger logger = LoggerFactory.getLogger(DropOldHTableStep.class);
-
- public DropOldHTableStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- Configuration conf = HBaseConfiguration.create();
- try {
- HBaseAdmin admin = new HBaseAdmin(conf);
-
- List<String> oldTables = getOldHTables();
-
- for(String table : oldTables) {
- admin.disableTable(table);
- admin.deleteTable(table);
- logger.debug("Dropped htable: " + table);
- }
-
- } catch (IOException e) {
- logger.error("Failed to drop old htables;", e);
- // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
- return new ExecuteResult(ExecuteResult.State.SUCCEED, e.getLocalizedMessage());
- }
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED);
- }
-
- public void setOldHTables(List<String> ids) {
- setParam(OLD_HTABLES, StringUtils.join(ids, ","));
- }
-
- private List<String> getOldHTables() {
- final String ids = getParam(OLD_HTABLES);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id: splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8ad0eabb/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
new file mode 100644
index 0000000..ec8ca3a
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cube;
+
+import com.google.common.collect.Lists;
+import jodd.util.StringUtil;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.job.cmd.ShellCmdOutput;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
+ */
+public class GarbageCollectionStep extends AbstractExecutable {
+
+ private static final String OLD_HTABLES = "oldHTables";
+
+ private static final String OLD_HIVE_TABLE = "oldHiveTable";
+
+ private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
+
+ public GarbageCollectionStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+ ExecuteResult.State state = null;
+ StringBuffer output = new StringBuffer();
+
+ final String hiveTable = this.getOldHiveTable();
+ if (StringUtil.isNotEmpty(hiveTable)) {
+ final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS " + hiveTable + ";\"";
+ ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
+ try {
+ context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
+ return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
+ }
+ }
+
+
+ List<String> oldTables = getOldHTables();
+ if (oldTables != null && oldTables.size() > 0) {
+ Configuration conf = HBaseConfiguration.create();
+ HBaseAdmin admin = null;
+ try {
+ admin = new HBaseAdmin(conf);
+ for (String table : oldTables) {
+ if (admin.tableExists(table)) {
+ if (admin.isTableEnabled(table)) {
+ admin.disableTable(table);
+ }
+
+ admin.deleteTable(table);
+ }
+ logger.debug("Dropped htable: " + table);
+ output.append("HBase table " + table + " is dropped. \n");
+ }
+
+ } catch (IOException e) {
+ output.append("Got error when drop HBase table, exiting... \n");
+ // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
+ return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
+ } finally {
+ if (admin != null)
+ try {
+ admin.close();
+ } catch (IOException e) {
+ logger.error(e.getLocalizedMessage());
+ }
+ }
+ }
+
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ public void setOldHTables(List<String> ids) {
+ setParam(OLD_HTABLES, StringUtils.join(ids, ","));
+ }
+
+ private List<String> getOldHTables() {
+ final String ids = getParam(OLD_HTABLES);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public void setOldHiveTable(String hiveTable) {
+ setParam(OLD_HIVE_TABLE, hiveTable);
+ }
+
+ private String getOldHiveTable() {
+ return getParam(OLD_HIVE_TABLE);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8ad0eabb/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
index 2f80f0e..d88b116 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/StorageCleanupJob.java
@@ -142,7 +142,10 @@ public class StorageCleanupJob extends AbstractHadoopJob {
for (String htableName : allTablesNeedToBeDropped) {
log.info("Deleting HBase table " + htableName);
if (hbaseAdmin.tableExists(htableName)) {
- hbaseAdmin.disableTable(htableName);
+ if (hbaseAdmin.isTableEnabled(htableName)) {
+ hbaseAdmin.disableTable(htableName);
+ }
+
hbaseAdmin.deleteTable(htableName);
log.info("Deleted HBase table " + htableName);
} else {
[2/2] incubator-kylin git commit: KYLIN-807 Avoid write conflict
between job engine and stream cube builder
Posted by sh...@apache.org.
KYLIN-807 Avoid write conflict between job engine and stream cube builder
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/36af047a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/36af047a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/36af047a
Branch: refs/heads/0.8.0
Commit: 36af047a7c3ff7a295a59dbdffc05b55bbf4dd7d
Parents: 8ad0eab
Author: shaofengshi <sh...@apache.org>
Authored: Tue Jun 2 15:15:30 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Tue Jun 2 16:04:03 2015 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 82 ++++++++++++++------
.../apache/kylin/cube/CubeManagerCacheTest.java | 2 +-
.../kylin/job/cube/MergeDictionaryStep.java | 4 +-
.../job/cube/UpdateCubeInfoAfterBuildStep.java | 3 +-
.../kylin/job/streaming/CubeStreamBuilder.java | 3 +-
.../kylin/job/BuildCubeWithEngineTest.java | 4 +-
.../java/org/apache/kylin/job/DeployUtil.java | 2 +-
.../job/hadoop/cube/MergeCuboidMapperTest.java | 5 +-
.../job/streaming/CubeStreamBuilderTest.java | 4 +-
.../kylin/rest/controller/CubeController.java | 5 +-
.../apache/kylin/rest/service/CubeService.java | 14 ++--
.../apache/kylin/rest/service/JobService.java | 4 +-
.../kylin/rest/service/CacheServiceTest.java | 4 +-
13 files changed, 85 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 5c651fc..585e503 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -153,7 +153,7 @@ public class CubeManager implements IRealizationProvider {
DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factColumnsPath);
cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
- updateCube(cubeSeg.getCubeInstance(), false);
+ updateCube(cubeSeg.getCubeInstance(), null, null, Lists.newArrayList(cubeSeg), null, false);
return dictInfo;
}
@@ -191,7 +191,7 @@ public class CubeManager implements IRealizationProvider {
cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
- updateCube(cubeSeg.getCubeInstance(), false);
+ updateCube(cubeSeg.getCubeInstance(), null, null, Lists.newArrayList(cubeSeg), null, false);
return snapshot;
}
@@ -226,7 +226,7 @@ public class CubeManager implements IRealizationProvider {
CubeInstance cube = CubeInstance.create(cubeName, projectName, desc);
cube.setOwner(owner);
- updateCube(cube, false);
+ updateCube(cube, null, null, null, null, false);
ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
return cube;
@@ -243,10 +243,53 @@ public class CubeManager implements IRealizationProvider {
* @return
* @throws IOException
*/
- public CubeInstance updateCube(CubeInstance cube, boolean updateProject) throws IOException {
+ public CubeInstance updateCube(CubeInstance cube, final List<CubeSegment> toAddSegs, final List<CubeSegment> toRemoveSegs, final List<CubeSegment> toUpdateSegs, RealizationStatusEnum newStatus, boolean updateProject) throws IOException {
+ return updateCube(cube, toAddSegs, toRemoveSegs, toUpdateSegs, newStatus, updateProject, 0);
+ }
+
+ private CubeInstance updateCube(CubeInstance cube, final List<CubeSegment> toAddSegs, final List<CubeSegment> toRemoveSegs, final List<CubeSegment> toUpdateSegs, RealizationStatusEnum newStatus, boolean updateProject, int retry) throws IOException {
+ if (cube == null)
+ throw new IllegalStateException();
logger.info("Updating cube instance '" + cube.getName());
- getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER);
+
+ if (toAddSegs != null && toAddSegs.size() > 0)
+ cube.getSegments().addAll(toAddSegs);
+
+ if (toRemoveSegs != null && toRemoveSegs.size() > 0)
+ cube.getSegments().removeAll(toRemoveSegs);
+
+ if (toUpdateSegs != null && toUpdateSegs.size() > 0) {
+ for (CubeSegment segment : toUpdateSegs) {
+ for (int i = 0; i < cube.getSegments().size(); i++) {
+ if (cube.getSegments().get(i).getName().equals(segment.getName())) {
+ cube.getSegments().set(i, segment);
+ }
+ }
+
+ }
+ }
+
+ Collections.sort(cube.getSegments());
+
+ if (newStatus != null) {
+ cube.setStatus(newStatus);
+ }
+
+ try {
+ getStore().putResource(cube.getResourcePath(), cube, CUBE_SERIALIZER);
+ } catch (IllegalStateException ise) {
+ logger.error("Get error when update cube " + cube.getName(), ise);
+
+ if (retry >= 3) {
+ logger.error("Retried 3 times till got error, abandoning...");
+ throw ise;
+ }
+
+ retry++;
+ cube = updateCube(reloadCubeLocal(cube.getName()), toAddSegs, toRemoveSegs, toUpdateSegs, newStatus, updateProject, retry);
+ }
+
cubeMap.put(cube.getName(), cube);
if (updateProject) {
@@ -271,7 +314,7 @@ public class CubeManager implements IRealizationProvider {
CubeSegment mergeSegment = newSegment(cube, startDate, endDate);
validateNewSegments(cube, mergeSegment);
- saveCubeSegmentChange(cube, Lists.newArrayList(appendSegment, mergeSegment), null);
+ updateCube(cube, Lists.newArrayList(appendSegment, mergeSegment), null, null, null, false);
return new Pair<CubeSegment, CubeSegment>(appendSegment, mergeSegment);
}
@@ -296,7 +339,7 @@ public class CubeManager implements IRealizationProvider {
validateNewSegments(cube, newSegment);
if (saveChange)
- saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+ updateCube(cube, Lists.newArrayList(newSegment), null, null, null, false);
return newSegment;
}
@@ -305,7 +348,7 @@ public class CubeManager implements IRealizationProvider {
checkNoBuildingSegment(cube);
CubeSegment newSegment = newSegment(cube, startDate, endDate);
- saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+ updateCube(cube, Lists.newArrayList(newSegment), null, null, null, false);
return newSegment;
}
@@ -318,23 +361,11 @@ public class CubeManager implements IRealizationProvider {
CubeSegment newSegment = newSegment(cube, range.getFirst(), range.getSecond());
validateNewSegments(cube, newSegment);
- saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+ updateCube(cube, Lists.newArrayList(newSegment), null, null, null, false);
return newSegment;
}
- protected void saveCubeSegmentChange(CubeInstance cube, List<CubeSegment> toAdd, List<CubeSegment> toRemove) throws IOException {
- if (toAdd != null && toAdd.size() > 0)
- cube.getSegments().addAll(toAdd);
-
- if (toRemove != null && toRemove.size() > 0)
- cube.getSegments().removeAll(toRemove);
-
- Collections.sort(cube.getSegments());
- updateCube(cube, false);
-
- }
-
private Pair<Long, Long> alignMergeRange(CubeInstance cube, long startDate, long endDate) {
List<CubeSegment> readySegments = cube.getSegment(SegmentStatusEnum.READY);
if (readySegments.isEmpty()) {
@@ -539,11 +570,14 @@ public class CubeManager implements IRealizationProvider {
throw new IllegalStateException("For cube " + cube + ", segment " + seg + " should be READY but is not");
}
- cube.setSegments(tobe);
- cube.setStatus(RealizationStatusEnum.READY);
+ List<CubeSegment> toRemoveSegs = Lists.newArrayList();
+ for (CubeSegment segment : cube.getSegments()) {
+ if (!tobe.contains(segment))
+ toRemoveSegs.add(segment);
+ }
logger.info("Promoting cube " + cube + ", new segments " + newSegments);
- updateCube(cube, updateProj);
+ updateCube(cube, Lists.newArrayList(newSegments), toRemoveSegs, null, RealizationStatusEnum.READY, updateProj);
}
public void validateNewSegments(CubeInstance cube, CubeSegment... newSegments) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
index 7c74993..6b96bd4 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
@@ -67,7 +67,7 @@ public class CubeManagerCacheTest extends LocalFileMetadataTestCase {
assertEquals(RealizationStatusEnum.DISABLED, createdCube.getStatus());
createdCube.setStatus(RealizationStatusEnum.DESCBROKEN);
- cubeManager.updateCube(createdCube,true);
+ cubeManager.updateCube(createdCube, null, null, null, null, true);
assertEquals(RealizationStatusEnum.DESCBROKEN, cubeManager.getCube("a_whole_new_cube").getStatus());
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
index d0a7db3..82a8984 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
@@ -62,8 +62,8 @@ public class MergeDictionaryStep extends AbstractExecutable {
makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
- mgr.updateCube(cube,false);
+
+ mgr.updateCube(cube, null, null, Lists.newArrayList(newSegment), null, false);
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
logger.error("fail to merge dictionary or lookup snapshots", e);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
index 1472424..e00b36d 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
@@ -19,6 +19,7 @@
package org.apache.kylin.job.cube;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -127,7 +128,7 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
if (segmentReady) {
cubeManager.promoteNewlyBuiltSegments(cube, cube.getSegments().size() == 1, segment);
} else {
- cubeManager.updateCube(cube, false);
+ cubeManager.updateCube(cube, null, null, Lists.newArrayList(segment), null, false);
}
return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index 3986db6..34d4eaf 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -341,8 +341,7 @@ public class CubeStreamBuilder extends StreamBuilder {
CubeInstance cube = CubeManager.getInstance(kylinConfig).reloadCubeLocal(cubeSegment.getCubeInstance().getName());
cube.getSegments().add(cubeSegment);
Collections.sort(cube.getSegments());
-
- CubeManager.getInstance(kylinConfig).updateCube(cube, false);
+ CubeManager.getInstance(kylinConfig).updateCube(cube, Lists.newArrayList(cubeSegment), null, null, null, false);
}
private List<Long> getAllCuboidIds(CubeDesc cubeDesc) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 74c824c..0888994 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -258,8 +258,8 @@ public class BuildCubeWithEngineTest {
private void clearSegment(String cubeName) throws Exception {
CubeInstance cube = cubeManager.getCube(cubeName);
- cube.getSegments().clear();
- cubeManager.updateCube(cube, true);
+ // remove all existing segments
+ cubeManager.updateCube(cube, null, cube.getSegments(), null, null, true);
}
private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/DeployUtil.java b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
index 6d348bd..e238e12 100644
--- a/job/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/job/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -60,7 +60,7 @@ public class DeployUtil {
// update cube desc signature.
for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {
cube.getDescriptor().setSignature(cube.getDescriptor().calculateSignature());
- CubeManager.getInstance(config()).updateCube(cube,true);
+ CubeManager.getInstance(config()).updateCube(cube, null, null, null, null, true);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
index 9f7e5f9..e9cdb96 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/MergeCuboidMapperTest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.job.hadoop.cube;
+import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
@@ -134,11 +135,13 @@ public class MergeCuboidMapperTest extends LocalFileMetadataTestCase {
// cubeManager.saveResource(segment.getCubeInstance());
// cubeManager.afterCubeUpdated(segment.getCubeInstance());
- cubeManager.updateCube(cube,true);
isFirstSegment = false;
}
+
+ cube = cubeManager.updateCube(cube, null, null, cube.getSegments(), null, true);
+
}
@After
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
index f0ef5e1..96b7cb4 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamBuilderTest.java
@@ -50,8 +50,8 @@ public class CubeStreamBuilderTest {
DeployUtil.deployMetadata();
DeployUtil.overrideJobJarLocations();
final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(CUBE_NAME);
- cube.getSegments().clear();
- CubeManager.getInstance(kylinConfig).updateCube(cube, true);
+ // remove all existing segments
+ CubeManager.getInstance(kylinConfig).updateCube(cube, null, cube.getSegments(), null, null, true);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index a7df432..33a2886 100644
--- a/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.UnknownHostException;
import java.util.*;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
@@ -419,10 +420,8 @@ public class CubeController extends BasicController {
CubeSegment segment = deserializeCubeSegment(cubeSegmentRequest);
cubeService.getCubeManager().validateNewSegments(cube, segment);
- cube.getSegments().add(segment);
- Collections.sort(cube.getSegments());
try {
- cubeService.getCubeManager().updateCube(cube, true);
+ cubeService.getCubeManager().updateCube(cube, Lists.newArrayList(segment), null, null, null, true);
} catch (IOException e) {
logger.error("Failed to deal with the request:" + e.getLocalizedMessage(), e);
throw new InternalErrorException("Failed to deal with the request: " + e.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 08e4112..7e38d73 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -150,7 +150,7 @@ public class CubeService extends BasicService {
String owner = SecurityContextHolder.getContext().getAuthentication().getName();
cube.setOwner(owner);
- return getCubeManager().updateCube(cube, true);
+ return getCubeManager().updateCube(cube, null, null, null, null, true);
}
public CubeInstance createCubeAndDesc(String cubeName, String projectName, CubeDesc desc) throws IOException {
@@ -348,7 +348,7 @@ public class CubeService extends BasicService {
cube.setStatus(RealizationStatusEnum.DISABLED);
try {
- return getCubeManager().updateCube(cube, true);
+ return getCubeManager().updateCube(cube, null, null, null, RealizationStatusEnum.DISABLED, true);
} catch (IOException e) {
cube.setStatus(ostatus);
throw e;
@@ -380,12 +380,11 @@ public class CubeService extends BasicService {
throw new JobException("Enable is not allowed with a running job.");
}
if (!cube.getDescriptor().calculateSignature().equals(cube.getDescriptor().getSignature())) {
- this.releaseAllSegments(cube);
+ cube = this.releaseAllSegments(cube);
}
- cube.setStatus(RealizationStatusEnum.READY);
try {
- return getCubeManager().updateCube(cube, true);
+ return getCubeManager().updateCube(cube, null, null, null, RealizationStatusEnum.READY, true);
} catch (IOException e) {
cube.setStatus(ostatus);
throw e;
@@ -520,7 +519,7 @@ public class CubeService extends BasicService {
* @throws IOException
* @throws JobException
*/
- private void releaseAllSegments(CubeInstance cube) throws IOException, JobException {
+ private CubeInstance releaseAllSegments(CubeInstance cube) throws IOException, JobException {
final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null);
for (CubingJob cubingJob : cubingJobs) {
final ExecutableState status = cubingJob.getStatus();
@@ -528,8 +527,7 @@ public class CubeService extends BasicService {
getExecutableManager().discardJob(cubingJob.getId());
}
}
- cube.getSegments().clear();
- CubeManager.getInstance(getConfig()).updateCube(cube, true);
+ return CubeManager.getInstance(getConfig()).updateCube(cube, null, null, cube.getSegments(), null, true);
}
@PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 3c150ef..3b3cbec 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -263,8 +263,8 @@ public class JobService extends BasicService {
CubeInstance cubeInstance = getCubeManager().getCube(jobInstance.getRelatedCube());
final CubeSegment segment = cubeInstance.getSegmentById(segmentId);
if (segment != null && segment.getStatus() == SegmentStatusEnum.NEW) {
- cubeInstance.getSegments().remove(segment);
- getCubeManager().updateCube(cubeInstance, false);
+ // Remove all existing segments
+ getCubeManager().updateCube(cubeInstance, null, cubeInstance.getSegments(), null, null, false);
}
getExecutableManager().discardJob(jobId);
return jobInstance;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/36af047a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
diff --git a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
index 788ce19..e26f387 100644
--- a/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.rest.service;
+import com.google.common.collect.Lists;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.cache.RemoteCacheUpdater;
import org.apache.kylin.common.restclient.Broadcaster;
@@ -228,8 +229,7 @@ public class CacheServiceTest extends LocalFileMetadataTestCase {
assertEquals(0, cubeManagerB.getCube(cubeName).getSegments().size());
CubeSegment segment = new CubeSegment();
segment.setName("test_segment");
- cube.getSegments().add(segment);
- cubeManager.updateCube(cube, true);
+ cube = cubeManager.updateCube(cube, Lists.newArrayList(segment), null, null, null, true);
//one for cube update, one for project update
assertEquals(2, broadcaster.getCounterAndClear());
waitForCounterAndClear(2);