You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/21 02:29:32 UTC
[7/7] incubator-kylin git commit: KYLIN-878 HBase storage abstraction
for cubing flow
KYLIN-878 HBase storage abstraction for cubing flow
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/22dc5734
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/22dc5734
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/22dc5734
Branch: refs/heads/0.8
Commit: 22dc573445447639daf3a7cb165faf84789444a7
Parents: 6171b0a
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Jul 17 10:27:43 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Tue Jul 21 08:28:13 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/mr/KylinMapper.java | 1 +
.../apache/kylin/common/util/BytesSplitter.java | 9 +-
.../apache/kylin/common/util/HadoopUtil.java | 10 +-
.../java/org/apache/kylin/cube/CubeBuilder.java | 100 ---
.../java/org/apache/kylin/cube/CubeManager.java | 20 +-
.../java/org/apache/kylin/cube/CubeSegment.java | 19 +-
.../java/org/apache/kylin/cube/CubeUpdate.java | 101 +++
.../apache/kylin/cube/CubeManagerCacheTest.java | 2 +-
.../org/apache/kylin/cube/CubeManagerTest.java | 10 +-
.../org/apache/kylin/cube/CubeSegmentsTest.java | 4 +-
.../apache/kylin/engine/BuildEngineFactory.java | 20 +-
.../apache/kylin/engine/IBatchCubingEngine.java | 3 +
.../kylin/engine/mr/BatchCubingJobBuilder.java | 145 +---
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 94 +++
.../kylin/engine/mr/BatchMergeJobBuilder.java | 98 +--
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 100 +++
.../kylin/engine/mr/ByteArrayWritable.java | 166 +++++
.../kylin/engine/mr/GarbageCollectionStep.java | 149 -----
.../org/apache/kylin/engine/mr/IMRInput.java | 14 +-
.../kylin/engine/mr/IMRJobFlowParticipant.java | 34 -
.../org/apache/kylin/engine/mr/IMROutput.java | 58 +-
.../org/apache/kylin/engine/mr/IMROutput2.java | 88 +++
.../kylin/engine/mr/JobBuilderSupport.java | 152 ++---
.../kylin/engine/mr/MRBatchCubingEngine.java | 29 +-
.../kylin/engine/mr/MRBatchCubingEngine2.java | 47 ++
.../java/org/apache/kylin/engine/mr/MRUtil.java | 55 ++
.../kylin/engine/mr/MergeDictionaryStep.java | 197 ------
.../engine/mr/UpdateCubeInfoAfterBuildStep.java | 100 ---
.../engine/mr/UpdateCubeInfoAfterMergeStep.java | 135 ----
.../kylin/engine/mr/steps/InMemCuboidJob.java | 104 +++
.../engine/mr/steps/InMemCuboidMapper.java | 122 ++++
.../engine/mr/steps/InMemCuboidReducer.java | 95 +++
.../mr/steps/MapContextGTRecordWriter.java | 96 +++
.../mr/steps/MergeCuboidFromStorageJob.java | 95 +++
.../mr/steps/MergeCuboidFromStorageMapper.java | 197 ++++++
.../engine/mr/steps/MergeDictionaryStep.java | 197 ++++++
.../engine/mr/steps/MergeStatisticsStep.java | 188 ++++++
.../engine/mr/steps/SaveStatisticsStep.java | 109 +++
.../mr/steps/UpdateCubeInfoAfterBuildStep.java | 101 +++
.../mr/steps/UpdateCubeInfoAfterMergeStep.java | 136 ++++
.../kylin/job/constant/BatchConstants.java | 3 +-
.../kylin/job/engine/JobEngineConfig.java | 4 -
.../kylin/job/hadoop/AbstractHadoopJob.java | 6 +-
.../cardinality/ColumnCardinalityMapper.java | 4 +-
.../cardinality/HiveColumnCardinalityJob.java | 4 +-
.../kylin/job/hadoop/cube/CubeHFileMapper.java | 6 +-
.../apache/kylin/job/hadoop/cube/CuboidJob.java | 45 +-
.../kylin/job/hadoop/cube/CuboidReducer.java | 2 +-
.../job/hadoop/cube/FactDistinctColumnsJob.java | 4 +-
.../cube/FactDistinctColumnsMapperBase.java | 4 +-
.../job/hadoop/cube/HiveToBaseCuboidMapper.java | 28 +-
.../job/hadoop/cube/IIToBaseCuboidMapper.java | 109 ---
.../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 130 ----
.../job/hadoop/cubev2/InMemCuboidMapper.java | 123 ----
.../job/hadoop/cubev2/InMemCuboidReducer.java | 96 ---
.../job/hadoop/cubev2/InMemKeyValueCreator.java | 77 ---
.../hadoop/cubev2/MapContextGTRecordWriter.java | 97 ---
.../hadoop/cubev2/MergeCuboidFromHBaseJob.java | 120 ----
.../cubev2/MergeCuboidFromHBaseMapper.java | 242 -------
.../job/hadoop/cubev2/MergeStatisticsStep.java | 188 ------
.../job/hadoop/cubev2/SaveStatisticsStep.java | 112 ----
.../kylin/job/streaming/CubeStreamConsumer.java | 6 +-
.../apache/kylin/source/hive/HiveMRInput.java | 2 +-
.../java/org/apache/kylin/storage/IStorage.java | 28 -
.../apache/kylin/storage/StorageFactory2.java | 34 +
.../kylin/storage/hbase/HBaseMROutput.java | 38 +-
.../kylin/storage/hbase/HBaseMROutput2.java | 259 ++++++++
.../kylin/storage/hbase/HBaseMRSteps.java | 138 ++++
.../kylin/storage/hbase/HBaseStorage.java | 4 +-
.../storage/hbase/InMemKeyValueCreator.java | 73 ++
.../apache/kylin/storage/hbase/MergeGCStep.java | 121 ++++
.../kylin/job/BuildCubeWithEngineTest.java | 4 +-
.../java/org/apache/kylin/job/DeployUtil.java | 4 +-
.../hadoop/cube/HiveToBaseCuboidMapperTest.java | 6 +-
.../job/hadoop/cube/MergeCuboidMapperTest.java | 4 +-
.../job/streaming/CubeStreamConsumerTest.java | 4 +-
.../kylin/metadata/measure/MeasureCodec.java | 5 -
.../apache/kylin/metadata/model/IBuildable.java | 28 +
.../apache/kylin/source/TableSourceFactory.java | 7 +-
.../kylin/query/enumerator/OLAPEnumerator.java | 4 +-
.../kylin/rest/controller/CubeController.java | 4 +-
.../apache/kylin/rest/service/CubeService.java | 12 +-
.../apache/kylin/rest/service/JobService.java | 4 +-
.../kylin/rest/service/CacheServiceTest.java | 2 +-
.../kylin/storage/ICachableStorageEngine.java | 34 -
.../kylin/storage/ICachableStorageQuery.java | 33 +
.../java/org/apache/kylin/storage/IStorage.java | 28 +
.../kylin/storage/StorageEngineFactory.java | 83 ---
.../apache/kylin/storage/StorageFactory.java | 85 +++
.../AbstractCacheFledgedStorageEngine.java | 6 +-
.../cache/CacheFledgedDynamicStorageEngine.java | 4 +-
.../cache/CacheFledgedStaticStorageEngine.java | 4 +-
.../kylin/storage/cube/CubeStorageEngine.java | 371 -----------
.../kylin/storage/cube/CubeStorageQuery.java | 371 +++++++++++
.../kylin/storage/hbase/CubeStorageEngine.java | 663 -------------------
.../kylin/storage/hbase/CubeStorageQuery.java | 663 +++++++++++++++++++
.../hbase/InvertedIndexStorageEngine.java | 83 ---
.../hbase/InvertedIndexStorageQuery.java | 82 +++
.../storage/hybrid/HybridStorageEngine.java | 4 +-
.../kylin/storage/test/DynamicCacheTest.java | 4 +-
.../kylin/storage/test/ITStorageTest.java | 4 +-
.../kylin/storage/test/StaticCacheTest.java | 4 +-
102 files changed, 4403 insertions(+), 3718 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java b/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
index a933782..fd0b337 100644
--- a/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
+++ b/common/src/main/java/org/apache/kylin/common/mr/KylinMapper.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.Mapper;
/**
*/
public class KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
+
protected void bindCurrentConfiguration(Configuration conf) {
HadoopUtil.setCurrentConfiguration(conf);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
index bd16246..7249dcf 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesSplitter.java
@@ -28,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * @author xjiang
*/
public class BytesSplitter {
private static final Logger logger = LoggerFactory.getLogger(BytesSplitter.class);
@@ -79,6 +78,14 @@ public class BytesSplitter {
return bufferSize;
}
+
+ public void setBuffers(byte[][] buffers) {
+ for (int i = 0; i < buffers.length; i++) {
+ splitBuffers[i].value = buffers[i];
+ splitBuffers[i].length = buffers[i].length;
+ }
+ this.bufferSize = buffers.length;
+ }
public byte inferByteRowDelimiter(byte[] bytes, int byteLen, int expectedSplits) throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 8ee1440..fb6b9bb 100644
--- a/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -29,6 +29,7 @@ import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -140,7 +141,6 @@ public class HadoopUtil {
}
/**
- *
* @param table the identifier of hive table, in format <db_name>.<table_name>
* @return a string array with 2 elements: {"db_name", "table_name"}
*/
@@ -151,4 +151,12 @@ public class HadoopUtil {
return new String[] { database, tableName };
}
+
+ public static void deletePath(Configuration conf, Path path) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ if (fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java b/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
deleted file mode 100644
index a828252..0000000
--- a/cube/src/main/java/org/apache/kylin/cube/CubeBuilder.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube;
-
-import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-
-/**
- */
-public class CubeBuilder {
- private CubeInstance cubeInstance;
- private CubeSegment[] toAddSegs = null;
- private CubeSegment[] toRemoveSegs = null;
- private CubeSegment[] toUpdateSegs = null;
- private RealizationStatusEnum status;
- private String owner;
- private int cost = -1;
-
- public CubeBuilder(CubeInstance cubeInstance) {
- this.cubeInstance = cubeInstance;
- }
-
- public CubeInstance getCubeInstance() {
- return cubeInstance;
- }
-
- public CubeBuilder setCubeInstance(CubeInstance cubeInstance) {
- this.cubeInstance = cubeInstance;
- return this;
- }
-
- public CubeSegment[] getToAddSegs() {
- return toAddSegs;
- }
-
- public CubeBuilder setToAddSegs(CubeSegment... toAddSegs) {
- this.toAddSegs = toAddSegs;
- return this;
- }
-
- public CubeSegment[] getToRemoveSegs() {
- return toRemoveSegs;
- }
-
- public CubeBuilder setToRemoveSegs(CubeSegment... toRemoveSegs) {
- this.toRemoveSegs = toRemoveSegs;
- return this;
- }
-
- public CubeSegment[] getToUpdateSegs() {
- return toUpdateSegs;
- }
-
- public CubeBuilder setToUpdateSegs(CubeSegment... toUpdateSegs) {
- this.toUpdateSegs = toUpdateSegs;
- return this;
- }
-
- public RealizationStatusEnum getStatus() {
- return status;
- }
-
- public CubeBuilder setStatus(RealizationStatusEnum status) {
- this.status = status;
- return this;
- }
-
- public String getOwner() {
- return owner;
- }
-
- public CubeBuilder setOwner(String owner) {
- this.owner = owner;
- return this;
- }
-
- public int getCost() {
- return cost;
- }
-
- public CubeBuilder setCost(int cost) {
- this.cost = cost;
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index e85a5fe..cc61dac 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -172,7 +172,7 @@ public class CubeManager implements IRealizationProvider {
if (dictInfo != null) {
cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
- CubeBuilder cubeBuilder = new CubeBuilder(cubeSeg.getCubeInstance());
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
cubeBuilder.setToUpdateSegs(cubeSeg);
updateCube(cubeBuilder);
}
@@ -211,7 +211,7 @@ public class CubeManager implements IRealizationProvider {
SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
cubeSeg.putSnapshotResPath(lookupTable, snapshot.getResourcePath());
- CubeBuilder cubeBuilder = new CubeBuilder(cubeSeg.getCubeInstance());
+ CubeUpdate cubeBuilder = new CubeUpdate(cubeSeg.getCubeInstance());
cubeBuilder.setToUpdateSegs(cubeSeg);
updateCube(cubeBuilder);
@@ -248,13 +248,13 @@ public class CubeManager implements IRealizationProvider {
CubeInstance cube = CubeInstance.create(cubeName, projectName, desc);
cube.setOwner(owner);
- updateCube(new CubeBuilder(cube));
+ updateCube(new CubeUpdate(cube));
ProjectManager.getInstance(config).moveRealizationToProject(RealizationType.CUBE, cubeName, projectName, owner);
return cube;
}
- public CubeInstance updateCube(CubeBuilder cubeBuilder) throws IOException {
+ public CubeInstance updateCube(CubeUpdate cubeBuilder) throws IOException {
return updateCube(cubeBuilder, 0);
}
@@ -283,7 +283,7 @@ public class CubeManager implements IRealizationProvider {
return true;
}
- private CubeInstance updateCube(CubeBuilder cubeBuilder, int retry) throws IOException {
+ private CubeInstance updateCube(CubeUpdate cubeBuilder, int retry) throws IOException {
if (cubeBuilder == null || cubeBuilder.getCubeInstance() == null)
throw new IllegalStateException();
@@ -385,7 +385,7 @@ public class CubeManager implements IRealizationProvider {
validateNewSegments(cube, mergeSegment);
- CubeBuilder cubeBuilder = new CubeBuilder(cube).setToAddSegs(appendSegment, mergeSegment);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube).setToAddSegs(appendSegment, mergeSegment);
updateCube(cubeBuilder);
return new Pair<CubeSegment, CubeSegment>(appendSegment, mergeSegment);
@@ -414,7 +414,7 @@ public class CubeManager implements IRealizationProvider {
if (saveChange) {
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToAddSegs(newSegment);
updateCube(cubeBuilder);
}
@@ -425,7 +425,7 @@ public class CubeManager implements IRealizationProvider {
checkNoBuildingSegment(cube);
CubeSegment newSegment = newSegment(cube, startDate, endDate);
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToAddSegs(newSegment);
updateCube(cubeBuilder);
@@ -456,7 +456,7 @@ public class CubeManager implements IRealizationProvider {
validateNewSegments(cube, false, newSegment);
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToAddSegs(newSegment);
updateCube(cubeBuilder);
@@ -684,7 +684,7 @@ public class CubeManager implements IRealizationProvider {
logger.info("Promoting cube " + cube + ", new segments " + Arrays.toString(newSegments) + ", to remove segments " + toRemoveSegs);
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToRemoveSegs(toRemoveSegs.toArray(new CubeSegment[toRemoveSegs.size()])).setToUpdateSegs(newSegments).setStatus(RealizationStatusEnum.READY);
updateCube(cubeBuilder);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 035b5eb..a26c2c9 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
import com.fasterxml.jackson.annotation.JsonBackReference;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
+
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.dict.IDictionaryAware;
+import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -37,7 +39,7 @@ import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
+public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IBuildable {
@JsonBackReference
private CubeInstance cubeInstance;
@@ -344,4 +346,19 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware {
public static String getStatisticsResourcePath(String cubeName, String cubeSegmentId) {
return ResourceStore.CUBE_STATISTICS_ROOT + "/" + cubeName + "/" + cubeSegmentId + ".seq";
}
+
+ @Override
+ public int getSourceType() {
+ return 0;
+ }
+
+ @Override
+ public int getEngineType() {
+ return 0;
+ }
+
+ @Override
+ public int getStorageType() {
+ return 0;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java b/cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
new file mode 100644
index 0000000..dffaa48
--- /dev/null
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeUpdate.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.cube;
+
+import org.apache.kylin.metadata.realization.RealizationStatusEnum;
+
+/**
+ * Hold changes to a cube so that they can be applied as one unit.
+ */
+public class CubeUpdate {
+ private CubeInstance cubeInstance;
+ private CubeSegment[] toAddSegs = null;
+ private CubeSegment[] toRemoveSegs = null;
+ private CubeSegment[] toUpdateSegs = null;
+ private RealizationStatusEnum status;
+ private String owner;
+ private int cost = -1;
+
+ public CubeUpdate(CubeInstance cubeInstance) {
+ this.cubeInstance = cubeInstance;
+ }
+
+ public CubeInstance getCubeInstance() {
+ return cubeInstance;
+ }
+
+ public CubeUpdate setCubeInstance(CubeInstance cubeInstance) {
+ this.cubeInstance = cubeInstance;
+ return this;
+ }
+
+ public CubeSegment[] getToAddSegs() {
+ return toAddSegs;
+ }
+
+ public CubeUpdate setToAddSegs(CubeSegment... toAddSegs) {
+ this.toAddSegs = toAddSegs;
+ return this;
+ }
+
+ public CubeSegment[] getToRemoveSegs() {
+ return toRemoveSegs;
+ }
+
+ public CubeUpdate setToRemoveSegs(CubeSegment... toRemoveSegs) {
+ this.toRemoveSegs = toRemoveSegs;
+ return this;
+ }
+
+ public CubeSegment[] getToUpdateSegs() {
+ return toUpdateSegs;
+ }
+
+ public CubeUpdate setToUpdateSegs(CubeSegment... toUpdateSegs) {
+ this.toUpdateSegs = toUpdateSegs;
+ return this;
+ }
+
+ public RealizationStatusEnum getStatus() {
+ return status;
+ }
+
+ public CubeUpdate setStatus(RealizationStatusEnum status) {
+ this.status = status;
+ return this;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public CubeUpdate setOwner(String owner) {
+ this.owner = owner;
+ return this;
+ }
+
+ public int getCost() {
+ return cost;
+ }
+
+ public CubeUpdate setCost(int cost) {
+ this.cost = cost;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
index 62c628d..4226319 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerCacheTest.java
@@ -66,7 +66,7 @@ public class CubeManagerCacheTest extends LocalFileMetadataTestCase {
assertEquals(0, createdCube.getSegments().size());
assertEquals(RealizationStatusEnum.DISABLED, createdCube.getStatus());
createdCube.setStatus(RealizationStatusEnum.DESCBROKEN);
- CubeBuilder cubeBuilder = new CubeBuilder(createdCube);
+ CubeUpdate cubeBuilder = new CubeUpdate(createdCube);
cubeManager.updateCube(cubeBuilder);
assertEquals(RealizationStatusEnum.DESCBROKEN, cubeManager.getCube("a_whole_new_cube").getStatus());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
index 64c4ea5..fbfad73 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeManagerTest.java
@@ -94,7 +94,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
cube.setAutoMergeTimeRanges(new long[] {2000, 6000});
- mgr.updateCube(new CubeBuilder(cube));
+ mgr.updateCube(new CubeUpdate(cube));
assertTrue(cube.needAutoMerge());
@@ -109,7 +109,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
CubeSegment seg2 = mgr.appendSegments(cube, 2000);
seg2.setStatus(SegmentStatusEnum.READY);
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
mgr.updateCube(cubeBuilder);
@@ -140,7 +140,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
CubeInstance cube = mgr.getCube("test_kylin_cube_with_slr_empty");
cube.setAutoMergeTimeRanges(new long[] {2000, 6000});
- mgr.updateCube(new CubeBuilder(cube));
+ mgr.updateCube(new CubeUpdate(cube));
assertTrue(cube.needAutoMerge());
@@ -155,7 +155,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
CubeSegment seg3 = mgr.appendSegments(cube, 2000, 4000, false, false);
seg3.setStatus(SegmentStatusEnum.READY);
- CubeBuilder cubeBuilder = new CubeBuilder(cube);
+ CubeUpdate cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToAddSegs(seg3);
cubeBuilder.setToUpdateSegs(seg1);
@@ -172,7 +172,7 @@ public class CubeManagerTest extends LocalFileMetadataTestCase {
CubeSegment seg4 = mgr.appendSegments(cube, 4000, 8000, false, false);
seg4.setStatus(SegmentStatusEnum.READY);
- cubeBuilder = new CubeBuilder(cube);
+ cubeBuilder = new CubeUpdate(cube);
cubeBuilder.setToAddSegs(seg4);
mgr.updateCube(cubeBuilder);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index e3f8638..319ce5f 100644
--- a/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -177,13 +177,13 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
CubeSegment seg3 = mgr.appendSegments(cube, 2000, 3000, false, false);
seg3.setStatus(SegmentStatusEnum.READY);
- CubeBuilder builder = new CubeBuilder(cube).setToAddSegs(seg3);
+ CubeUpdate builder = new CubeUpdate(cube).setToAddSegs(seg3);
mgr.updateCube(builder);
assertEquals(2, cube.getSegments().size());
CubeSegment seg2 = mgr.appendSegments(cube, 1000, 2000, false, false);
- builder = new CubeBuilder(cube).setToAddSegs(seg2);
+ builder = new CubeUpdate(cube).setToAddSegs(seg2);
mgr.updateCube(builder);
assertEquals(3, cube.getSegments().size());
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
index c536deb..d24c99c 100644
--- a/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
+++ b/job/src/main/java/org/apache/kylin/engine/BuildEngineFactory.java
@@ -18,22 +18,36 @@
package org.apache.kylin.engine;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.MRBatchCubingEngine;
+import org.apache.kylin.engine.mr.MRBatchCubingEngine2;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
public class BuildEngineFactory {
- private static final IBatchCubingEngine defaultBatch = new MRBatchCubingEngine();
+ private static IBatchCubingEngine defaultBatchEngine;
+
+ public static IBatchCubingEngine defaultBatchEngine() {
+ if (defaultBatchEngine == null) {
+ KylinConfig conf = KylinConfig.getInstanceFromEnv();
+ if (conf.isCubingInMem()) {
+ defaultBatchEngine = new MRBatchCubingEngine2();
+ } else {
+ defaultBatchEngine = new MRBatchCubingEngine();
+ }
+ }
+ return defaultBatchEngine;
+ }
/** Build a new cube segment, typically its time range appends to the end of current cube. */
public static DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return defaultBatch.createBatchCubingJob(newSegment, submitter);
+ return defaultBatchEngine().createBatchCubingJob(newSegment, submitter);
}
/** Merge multiple small segments into a big one. */
public static DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
- return defaultBatch.createBatchMergeJob(mergeSegment, submitter);
+ return defaultBatchEngine().createBatchMergeJob(mergeSegment, submitter);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
index 55a080c..904f557 100644
--- a/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
+++ b/job/src/main/java/org/apache/kylin/engine/IBatchCubingEngine.java
@@ -29,4 +29,7 @@ public interface IBatchCubingEngine {
/** Merge multiple small segments into a big one. */
public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter);
+ public Class<?> getSourceInterface();
+
+ public Class<?> getStorageInterface();
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
index 239ce64..a39ac74 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder.java
@@ -19,111 +19,58 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.job.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
public class BatchCubingJobBuilder extends JobBuilderSupport {
private final IMRBatchCubingInputSide inputSide;
-
+ private final IMRBatchCubingOutputSide outputSide;
+
public BatchCubingJobBuilder(CubeSegment newSegment, String submitter) {
super(newSegment, submitter);
- this.inputSide = MRBatchCubingEngine.getBatchCubingInputSide(seg);
+ this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide(seg);
}
public CubingJob build() {
- final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
- final CubeJoinedFlatTableDesc flatHiveTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
-
+
// Phase 2: Build Dictionary
- result.addTask(createFactDistinctColumnsStep(flatHiveTableDesc, jobId));
+ result.addTask(createFactDistinctColumnsStep(jobId));
result.addTask(createBuildDictionaryStep(jobId));
-
+
// Phase 3: Build Cube
- if (config.isInMemCubing()) {
- result.addTask(createSaveStatisticsStep(jobId));
-
- // create htable step
- result.addTask(createCreateHTableStep(jobId));
- result.addTask(createInMemCubingStep(flatHiveTableDesc, result.getId()));
- // bulk load step
- result.addTask(createBulkLoadStep(jobId));
- } else {
- final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
- final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
- final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
-
- // base cuboid step
- result.addTask(createBaseCuboidStep(flatHiveTableDesc, cuboidOutputTempPath, jobId));
-
- // n dim cuboid steps
- for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
- int dimNum = totalRowkeyColumnsCount - i;
- result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
- }
- result.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
- // create htable step
- result.addTask(createCreateHTableStep(jobId));
- // generate hfiles step
- result.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
- // bulk load step
- result.addTask(createBulkLoadStep(jobId));
+ final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
+ final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
+ final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+ // base cuboid step
+ result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
+ // n dim cuboid steps
+ for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnsCount - i;
+ result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
}
+ outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
- inputSide.addStepPhase4_UpdateMetadataAndCleanup(result);
-
- return result;
- }
-
- private MapReduceExecutable createFactDistinctColumnsStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- MapReduceExecutable result = new MapReduceExecutable();
- result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
- result.setMapReduceJobClass(FactDistinctColumnsJob.class);
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(config.isInMemCubing()));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
- appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
- result.setMapReduceParams(cmd.toString());
return result;
}
- private HadoopShellExecutable createBuildDictionaryStep(String jobId) {
- // base cuboid job
- HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
- buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
-
- buildDictionaryStep.setJobParams(cmd.toString());
- buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
- return buildDictionaryStep;
- }
-
- private MapReduceExecutable createBaseCuboidStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String[] cuboidOutputTempPath, String jobId) {
+ private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
// base cuboid job
MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
@@ -134,7 +81,7 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
+ appendExecCmdParameters(cmd, "input", ""); // marks flat table input
appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "level", "0");
@@ -165,52 +112,6 @@ public class BatchCubingJobBuilder extends JobBuilderSupport {
return ndCuboidStep;
}
- private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
- SaveStatisticsStep result = new SaveStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setStatisticsPath(getStatisticsPath(jobId));
- return result;
- }
-
- private MapReduceExecutable createInMemCubingStep(CubeJoinedFlatTableDesc flatHiveTableDesc, String jobId) {
- // base cuboid job
- MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
-
- baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFlatHiveTableLocation(flatHiveTableDesc, jobId));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "level", "0");
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
- baseCuboidStep.setMapReduceParams(cmd.toString());
- baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
- baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
- return baseCuboidStep;
- }
-
- private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
- final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
- updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
- updateCubeInfoStep.setSegmentId(seg.getUuid());
- updateCubeInfoStep.setCubingJobId(jobId);
- return updateCubeInfoStep;
- }
-
- private String getFlatHiveTableLocation(CubeJoinedFlatTableDesc flatTableDesc, String jobId) {
- return getJobWorkingDir(jobId) + "/" + flatTableDesc.getTableName();
- }
-
private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
String[] paths = new String[groupRowkeyColumnsCount + 1];
for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
new file mode 100644
index 0000000..b6f264e
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+public class BatchCubingJobBuilder2 extends JobBuilderSupport {
+
+ private final IMRBatchCubingInputSide inputSide;
+ private final IMRBatchCubingOutputSide2 outputSide;
+
+ public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
+ super(newSegment, submitter);
+ this.inputSide = MRUtil.getBatchCubingInputSide(seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createBuildJob(seg, submitter, config);
+ final String jobId = result.getId();
+
+ // Phase 1: Create Flat Table
+ inputSide.addStepPhase1_CreateFlatTable(result);
+
+ // Phase 2: Build Dictionary
+ result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+ result.addTask(createBuildDictionaryStep(jobId));
+ result.addTask(createSaveStatisticsStep(jobId));
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 3: Build Cube
+ result.addTask(createInMemCubingStep(jobId));
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 4: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
+ inputSide.addStepPhase4_Cleanup(result);
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
+ SaveStatisticsStep result = new SaveStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setStatisticsPath(getStatisticsPath(jobId));
+ return result;
+ }
+
+ private MapReduceExecutable createInMemCubingStep(String jobId) {
+ // base cuboid job
+ MapReduceExecutable cubeStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, seg);
+
+ cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
+
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "jobflowid", jobId);
+
+ cubeStep.setMapReduceParams(cmd.toString());
+ cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
+ cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
+ return cubeStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
index cb2d8dd..6264ebd 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder.java
@@ -22,23 +22,25 @@ import java.util.List;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
import org.apache.kylin.job.common.MapReduceExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
public class BatchMergeJobBuilder extends JobBuilderSupport {
+ private final IMRBatchMergeOutputSide outputSide;
+
public BatchMergeJobBuilder(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
+ this.outputSide = MRUtil.getBatchMergeOutputSide(seg);
}
public CubingJob build() {
- final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
final String jobId = result.getId();
final String cuboidRootPath = getCuboidRootPath(jobId);
@@ -46,66 +48,23 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
final List<String> mergingSegmentIds = Lists.newArrayList();
final List<String> mergingCuboidPaths = Lists.newArrayList();
- final List<String> mergingHTables = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
- mergingHTables.add(merging.getStorageLocationIdentifier());
}
+ // Phase 1: Merge Dictionary
result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-
- if (config.isInMemCubing()) {
-
- String mergedStatisticsFolder = getStatisticsPath(jobId);
- result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
- // create htable step
- result.addTask(createCreateHTableStep(jobId));
-
- String formattedTables = StringUtil.join(mergingHTables, ",");
- result.addTask(createMergeCuboidDataFromHBaseStep(formattedTables, jobId));
-
- } else {
- // merge cuboid
- String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
- result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
-
- // convert htable
- result.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
- // create htable step
- result.addTask(createCreateHTableStep(jobId));
- // generate hfiles step
- result.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
- }
-
- // bulk load step
- result.addTask(createBulkLoadStep(jobId));
-
- // update cube info
- result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
- result.addTask(createGarbageCollectionStep(mergingHTables, null));
+ // Phase 2: Merge Cube Files
+ String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+ result.addTask(createMergeCuboidDataStep(seg, formattedPath, cuboidRootPath));
+ outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
- return result;
- }
-
- private MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
- MergeDictionaryStep result = new MergeDictionaryStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- return result;
- }
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+ outputSide.addStepPhase3_Cleanup(result);
- private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
- MergeStatisticsStep result = new MergeStatisticsStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setMergedStatisticsPath(mergedStatisticsFolder);
return result;
}
@@ -126,35 +85,4 @@ public class BatchMergeJobBuilder extends JobBuilderSupport {
return mergeCuboidDataStep;
}
-
- private MapReduceExecutable createMergeCuboidDataFromHBaseStep(String inputTableNames, String jobId) {
- MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
- mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", inputTableNames);
- appendExecCmdParameters(cmd, "output", getHFilePath(jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
- mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
- mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
- return mergeCuboidDataStep;
- }
-
- private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
- UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
- result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setCubingJobId(jobId);
- return result;
- }
-
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
new file mode 100644
index 0000000..8a5cb02
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
+import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
+import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
+import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class BatchMergeJobBuilder2 extends JobBuilderSupport {
+
+ private final IMRBatchCubingOutputSide2 outputSide;
+
+ public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
+ super(mergeSegment, submitter);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2(seg);
+ }
+
+ public CubingJob build() {
+ final CubingJob result = CubingJob.createMergeJob(seg, submitter, config);
+ final String jobId = result.getId();
+
+ final List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
+ Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
+ final List<String> mergingSegmentIds = Lists.newArrayList();
+ final List<String> mergingHTables = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingSegmentIds.add(merging.getUuid());
+ mergingHTables.add(merging.getStorageLocationIdentifier());
+ }
+
+ // Phase 1: Merge Dictionary
+ result.addTask(createMergeDictionaryStep(mergingSegmentIds));
+ result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, getStatisticsPath(jobId)));
+ outputSide.addStepPhase2_BuildDictionary(result);
+
+ // Phase 2: Merge Cube
+ String formattedTables = StringUtil.join(mergingHTables, ",");
+ result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
+ outputSide.addStepPhase3_BuildCube(result);
+
+ // Phase 3: Update Metadata & Cleanup
+ result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
+ outputSide.addStepPhase4_Cleanup(result);
+
+ return result;
+ }
+
+ private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
+ MergeStatisticsStep result = new MergeStatisticsStep();
+ result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
+ result.setCubeName(seg.getCubeInstance().getName());
+ result.setSegmentId(seg.getUuid());
+ result.setMergingSegmentIds(mergingSegmentIds);
+ result.setMergedStatisticsPath(mergedStatisticsFolder);
+ return result;
+ }
+
+ private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+ MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
+ mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, seg);
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
+ appendExecCmdParameters(cmd, "jobflowid", jobId);
+
+ mergeCuboidDataStep.setMapReduceParams(cmd.toString());
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
+ mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+ return mergeCuboidDataStep;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
new file mode 100644
index 0000000..88bb68c
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
@@ -0,0 +1,166 @@
+package org.apache.kylin.engine.mr;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.kylin.common.util.Bytes;
+
+public class ByteArrayWritable implements WritableComparable<ByteArrayWritable> {
+
+ private byte[] data;
+ private int offset;
+ private int length;
+
+ public ByteArrayWritable() {
+ this(null, 0, 0);
+ }
+
+ public ByteArrayWritable(int capacity) {
+ this(new byte[capacity], 0, capacity);
+ }
+
+ public ByteArrayWritable(byte[] data) {
+ this(data, 0, data == null ? 0 : data.length);
+ }
+
+ public ByteArrayWritable(byte[] data, int offset, int length) {
+ this.data = data;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public byte[] array() {
+ return data;
+ }
+
+ public int offset() {
+ return offset;
+ }
+
+ public int length() {
+ return length;
+ }
+
+ public void set(byte[] array) {
+ set(array, 0, array.length);
+ }
+
+ public void set(byte[] array, int offset, int length) {
+ this.data = array;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public ByteBuffer asBuffer() {
+ if (data == null)
+ return null;
+ else if (offset == 0 && length == data.length)
+ return ByteBuffer.wrap(data);
+ else
+ return ByteBuffer.wrap(data, offset, length).slice();
+ }
+
+ @Override
+ public int hashCode() {
+ if (data == null)
+ return 0;
+ else
+ return Bytes.hashCode(data, offset, length);
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(this.length);
+ out.write(this.data, this.offset, this.length);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.length = in.readInt();
+ this.data = new byte[this.length];
+ in.readFully(this.data, 0, this.length);
+ this.offset = 0;
+ }
+
+ // Below methods copied from BytesWritable
+ /**
+ * Define the sort order of the BytesWritable.
+ * @param that The other bytes writable
+ * @return Positive if left is bigger than right, 0 if they are equal, and
+ * negative if left is smaller than right.
+ */
+ public int compareTo(ByteArrayWritable that) {
+ return WritableComparator.compareBytes(this.data, this.offset, this.length, that.data, that.offset, that.length);
+ }
+
+ /**
+ * Compares the bytes in this object to the specified byte array
+ * @param that
+ * @return Positive if left is bigger than right, 0 if they are equal, and
+ * negative if left is smaller than right.
+ */
+ public int compareTo(final byte[] that) {
+ return WritableComparator.compareBytes(this.data, this.offset, this.length, that, 0, that.length);
+ }
+
+ /**
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object right_obj) {
+ if (right_obj instanceof byte[]) {
+ return compareTo((byte[]) right_obj) == 0;
+ }
+ if (right_obj instanceof ByteArrayWritable) {
+ return compareTo((ByteArrayWritable) right_obj) == 0;
+ }
+ return false;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(3 * this.length);
+ final int endIdx = this.offset + this.length;
+ for (int idx = this.offset; idx < endIdx; idx++) {
+ sb.append(' ');
+ String num = Integer.toHexString(0xff & this.data[idx]);
+ // if it is only one digit, add a leading 0.
+ if (num.length() < 2) {
+ sb.append('0');
+ }
+ sb.append(num);
+ }
+ return sb.length() > 0 ? sb.substring(1) : "";
+ }
+
+ /** A Comparator optimized for ImmutableBytesWritable.
+ */
+ public static class Comparator extends WritableComparator {
+ private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
+
+ /** constructor */
+ public Comparator() {
+ super(ByteArrayWritable.class);
+ }
+
+ /**
+ * @see org.apache.hadoop.io.WritableComparator#compare(byte[], int, int, byte[], int, int)
+ */
+ @Override
+ public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
+ return comparator.compare(b1, s1, l1, b2, s2, l2);
+ }
+ }
+
+ static { // register this comparator
+ WritableComparator.define(ByteArrayWritable.class, new Comparator());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
deleted file mode 100644
index d79f35d..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/GarbageCollectionStep.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
- */
-@Deprecated // only exists for backward compatibility
-public class GarbageCollectionStep extends AbstractExecutable {
-
- private static final String OLD_HTABLES = "oldHTables";
-
- private static final String OLD_HIVE_TABLE = "oldHiveTable";
-
- private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
- public GarbageCollectionStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
- StringBuffer output = new StringBuffer();
-
- final String hiveTable = this.getOldHiveTable();
- if (StringUtils.isNotEmpty(hiveTable)) {
- final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS " + hiveTable + ";\"";
- ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
- try {
- context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
- output.append("Hive table " + hiveTable + " is dropped. \n");
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
- return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
- }
- }
-
-
- List<String> oldTables = getOldHTables();
- if (oldTables != null && oldTables.size() > 0) {
- String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin admin = null;
- try {
- admin = new HBaseAdmin(conf);
- for (String table : oldTables) {
- if (admin.tableExists(table)) {
- HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
- String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
- if (metadataUrlPrefix.equalsIgnoreCase(host)) {
- if (admin.isTableEnabled(table)) {
- admin.disableTable(table);
- }
- admin.deleteTable(table);
- logger.debug("Dropped htable: " + table);
- output.append("HBase table " + table + " is dropped. \n");
- } else {
- logger.debug("Skip htable: " + table);
- output.append("Skip htable: " + table + ". \n");
- }
- }
- }
-
- } catch (IOException e) {
- output.append("Got error when drop HBase table, exiting... \n");
- // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
- return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
- } finally {
- if (admin != null)
- try {
- admin.close();
- } catch (IOException e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- }
-
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- }
-
- public void setOldHTables(List<String> ids) {
- setParam(OLD_HTABLES, StringUtils.join(ids, ","));
- }
-
- private List<String> getOldHTables() {
- final String ids = getParam(OLD_HTABLES);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
- public void setOldHiveTable(String hiveTable) {
- setParam(OLD_HIVE_TABLE, hiveTable);
- }
-
- private String getOldHiveTable() {
- return getParam(OLD_HIVE_TABLE);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 08ed94a..0c39398 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -48,22 +48,22 @@ public interface IMRInput {
/**
* Participate the batch cubing flow as the input side. Responsible for creating
- * intermediate flat table (Phase 1) and clean up if necessary (Phase 4).
+ * intermediate flat table (Phase 1) and clean up any leftover (Phase 4).
*
* - Phase 1: Create Flat Table
- * - Phase 2: Build Dictionary
- * - Phase 3: Build Cube
+ * - Phase 2: Build Dictionary (with FlatTableInputFormat)
+ * - Phase 3: Build Cube (with FlatTableInputFormat)
* - Phase 4: Update Metadata & Cleanup
*/
public interface IMRBatchCubingInputSide {
+ /** Return an InputFormat that reads from the intermediate flat table */
+ public IMRTableInputFormat getFlatTableInputFormat();
+
/** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
/** Add step that does necessary clean up, like delete the intermediate flat table */
- public void addStepPhase4_UpdateMetadataAndCleanup(DefaultChainedExecutable jobFlow);
-
- /** Return an InputFormat that reads from the intermediate flat table */
- public IMRTableInputFormat getFlatTableInputFormat();
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java b/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
deleted file mode 100644
index 6a94920..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMRJobFlowParticipant.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.util.List;
-
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-public interface IMRJobFlowParticipant {
-
- public List<? extends AbstractExecutable> contributePhase1CreateFlatTable(List<? extends AbstractExecutable> steps);
-
- public List<? extends AbstractExecutable> contributePhase2CreateDictionary(List<? extends AbstractExecutable> steps);
-
- public List<? extends AbstractExecutable> contributePhase3BuildCube(List<? extends AbstractExecutable> steps);
-
- public List<? extends AbstractExecutable> contributePhase4UpdateCubeMetadata(List<? extends AbstractExecutable> steps);
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
index 6e3a42c..bc6ee1f 100644
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
@@ -18,9 +18,61 @@
package org.apache.kylin.engine.mr;
-public interface IMROutput {
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
- public IMRJobFlowParticipant createBuildFlowParticipant();
+public interface IMROutput {
- public IMRJobFlowParticipant createMergeFlowParticipant();
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage (Phase 3).
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchCubingOutputSide {
+
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
+ * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ public IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage (Phase 2).
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ public interface IMRBatchMergeOutputSide {
+
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
+ * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
new file mode 100644
index 0000000..974e2fc
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -0,0 +1,88 @@
+package org.apache.kylin.engine.mr;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+
+public interface IMROutput2 {
+
+ /** Return a helper to participate in batch cubing job flow. */
+ public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch cubing flow as the output side.
+ *
+ * - Phase 1: Create Flat Table
+ * - Phase 2: Build Dictionary
+ * - Phase 3: Build Cube (with StorageOutputFormat)
+ * - Phase 4: Update Metadata & Cleanup
+ */
+ public interface IMRBatchCubingOutputSide2 {
+
+ public IMRStorageOutputFormat getStorageOutputFormat();
+
+ /** Add step that executes after build dictionary and before build cube. */
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
+
+ /** Add step that executes after build cube. */
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
+
+ public interface IMRBatchMergeInputSide2 {
+ public IMRStorageInputFormat getStorageInputFormat();
+ }
+
+ @SuppressWarnings("rawtypes")
+ public interface IMRStorageInputFormat {
+
+ public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
+
+ public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
+
+ public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
+ }
+
+ /** Return a helper to participate in batch merge job flow. */
+ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
+
+ /**
+ * Participate the batch merge flow as the output side.
+ *
+ * - Phase 1: Merge Dictionary
+ * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
+ * - Phase 3: Update Metadata & Cleanup
+ */
+ public interface IMRBatchMergeOutputSide2 {
+
+ public IMRStorageOutputFormat getStorageOutputFormat();
+
+ /** Add step that executes after merge dictionary and before merge cube. */
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
+
+ /** Add step that executes after merge cube. */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
+ }
+
+ @SuppressWarnings("rawtypes")
+ public interface IMRStorageOutputFormat {
+ public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
+
+ public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
+ }
+}