You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/16 15:03:02 UTC
[kylin] 01/02: KYLIN-3441 Merge cube segments in Spark
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4bda69ebbe5f25cfa9f94310efcdbb734df486b3
Author: shaofengshi <sh...@apache.org>
AuthorDate: Wed Jul 11 18:40:35 2018 +0800
KYLIN-3441 Merge cube segments in Spark
---
.../java/org/apache/kylin/common/KylinConfig.java | 4 +
.../org/apache/kylin/common/util/HadoopUtil.java | 7 +
.../java/org/apache/kylin/cube/CubeInstance.java | 10 +
.../org/apache/kylin/cube/DimensionRangeInfo.java | 3 +-
.../apache/kylin/engine/mr/JobBuilderSupport.java | 13 ++
.../kylin/engine/mr/steps/MergeCuboidMapper.java | 196 ++--------------
...ergeCuboidMapper.java => SegmentReEncoder.java} | 178 ++++++++-------
.../kylin/engine/spark/KylinKryoRegistrator.java | 8 +
.../kylin/engine/spark/KylinSparkJobListener.java | 2 +-
.../engine/spark/SparkBatchMergeJobBuilder2.java | 45 +++-
.../kylin/engine/spark/SparkCubingByLayer.java | 28 +--
.../kylin/engine/spark/SparkCubingMerge.java | 251 +++++++++++++++++++++
.../apache/kylin/engine/spark/SparkExecutable.java | 23 +-
.../org/apache/kylin/engine/spark/SparkUtil.java | 63 +++++-
.../template/cube_desc/kylin_sales_cube.json | 3 +-
examples/test_case_data/sandbox/kylin.properties | 8 +-
.../hbase/steps/HBaseMROutput2Transition.java | 36 +--
.../hbase/steps/HBaseSparkOutputTransition.java | 3 -
.../kylin/storage/hbase/steps/HBaseSparkSteps.java | 18 +-
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 42 +---
20 files changed, 570 insertions(+), 371 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 4b3b7c3..e09ce26 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -414,6 +414,10 @@ public class KylinConfig extends KylinConfigBase {
KylinConfig base = base();
if (base != this)
return base.getManager(clz);
+
+ if (managersCache == null) {
+ managersCache = new ConcurrentHashMap<>();
+ }
Object mgr = managersCache.get(clz);
if (mgr != null)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index ed6ef89..3cb43fc 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,4 +187,10 @@ public class HadoopUtil {
}
return result;
}
+
+ public static void deleteHDFSMeta(String metaUrl) throws IOException {
+ String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
+ HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
+ logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
+ }
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index c0f3536..ff86e7d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -707,4 +707,14 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return newCube;
}
+ public static CubeSegment findSegmentWithJobId(String jobID, CubeInstance cubeInstance) {
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ String lastBuildJobID = segment.getLastBuildJobID();
+ if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
+ return segment;
+ }
+ }
+ throw new IllegalStateException("No segment's last build job ID equals " + jobID);
+ }
+
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
index 0b0d1c4..cde8d3f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/DimensionRangeInfo.java
@@ -18,6 +18,7 @@
package org.apache.kylin.cube;
+import java.io.Serializable;
import java.util.Map;
import org.apache.kylin.metadata.datatype.DataTypeOrder;
@@ -31,7 +32,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class DimensionRangeInfo {
+public class DimensionRangeInfo implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(DimensionRangeInfo.class);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index a1b2cfe..42e0f42 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -20,6 +20,8 @@ package org.apache.kylin.engine.mr;
import java.io.IOException;
import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
@@ -57,6 +59,7 @@ public class JobBuilderSupport {
final public static String PathNameCuboidBase = "base_cuboid";
final public static String PathNameCuboidOld = "old";
final public static String PathNameCuboidInMem = "in_memory";
+ final public static Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
public JobBuilderSupport(CubeSegment seg, String submitter) {
Preconditions.checkNotNull(seg, "segment cannot be null");
@@ -324,4 +327,14 @@ public class JobBuilderSupport {
return getRealizationRootPath(jobId) + "/metadata";
}
+
+ public static String extractJobIDFromPath(String path) {
+ Matcher matcher = JOB_NAME_PATTERN.matcher(path);
+ // check the first occurrence
+ if (matcher.find()) {
+ return matcher.group(1);
+ } else {
+ throw new IllegalStateException("Can not extract job ID from file path : " + path);
+ }
+ }
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 0283c21..d63c8b8 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -19,41 +19,20 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyEncoder;
-import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.IMROutput2;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
/**
* @author ysong1, honma
@@ -61,179 +40,34 @@ import com.google.common.collect.Maps;
@SuppressWarnings({ "rawtypes", "unchecked" })
public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
- private KylinConfig config;
- private String cubeName;
- private String segmentID;
- private CubeManager cubeManager;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment mergedCubeSegment;
- private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-
- private Text outputKey = new Text();
-
- private byte[] newKeyBodyBuf;
- private ByteArray newKeyBuf;
- private RowKeySplitter rowKeySplitter;
- private RowKeyEncoderProvider rowKeyEncoderProvider;
-
-
- // for re-encode measures that use dictionary
- private List<Pair<Integer, MeasureIngester>> dictMeasures;
- private Map<TblColRef, Dictionary<String>> oldDicts;
- private Map<TblColRef, Dictionary<String>> newDicts;
- private List<MeasureDesc> measureDescs;
- private BufferedMeasureCodec codec;
- private Object[] measureObjs;
- private Text outputValue;
+ private SegmentReEncoder reEncoder;
+ private Pair<Text, Text> newPair;
@Override
protected void doSetup(Context context) throws IOException, InterruptedException {
super.bindCurrentConfiguration(context.getConfiguration());
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
- segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
-
- config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
+ String segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
- cubeManager = CubeManager.getInstance(config);
- cube = cubeManager.getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- mergedCubeSegment = cube.getSegmentById(segmentID);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
- // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow
- newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ CubeSegment mergedCubeSegment = cube.getSegmentById(segmentID);
// decide which source segment
FileSplit fileSplit = (FileSplit) context.getInputSplit();
- IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat();
- sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
-
- rowKeySplitter = new RowKeySplitter(sourceCubeSegment);
- rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
-
- measureDescs = cubeDesc.getMeasures();
- codec = new BufferedMeasureCodec(measureDescs);
- measureObjs = new Object[measureDescs.size()];
- outputValue = new Text();
-
- dictMeasures = Lists.newArrayList();
- oldDicts = Maps.newHashMap();
- newDicts = Maps.newHashMap();
- for (int i = 0; i < measureDescs.size(); i++) {
- MeasureDesc measureDesc = measureDescs.get(i);
- MeasureType measureType = measureDesc.getFunction().getMeasureType();
- List<TblColRef> columns = measureType.getColumnsNeedDictionary(measureDesc.getFunction());
- boolean needReEncode = false;
- for (TblColRef col : columns) {
- //handle the column that all records is null
- if (sourceCubeSegment.getDictionary(col) == null) {
- continue;
- }
-
- oldDicts.put(col, sourceCubeSegment.getDictionary(col));
- newDicts.put(col, mergedCubeSegment.getDictionary(col));
- if (!sourceCubeSegment.getDictionary(col).equals(mergedCubeSegment.getDictionary(col))) {
- needReEncode = true;
- }
- }
- if (needReEncode) {
- dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
- }
- }
+ IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment)
+ .getOuputFormat();
+ CubeSegment sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
+ reEncoder = new SegmentReEncoder(cubeDesc, sourceCubeSegment, mergedCubeSegment, config);
}
@Override
public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
- long cuboidID = rowKeySplitter.split(key.getBytes());
- Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
- RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
-
- SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
- int bufOffset = 0;
- int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
-
- for (int i = 0; i < cuboid.getColumns().size(); ++i) {
- int useSplit = i + bodySplitOffset;
- TblColRef col = cuboid.getColumns().get(i);
-
- if (cubeDesc.getRowkey().isUseDictionary(col)) {
- // if dictionary on fact table column, needs rewrite
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<String> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
- // handle the dict of all merged segments is null
- if (mergedDict == null) {
- continue;
- }
-
- Dictionary<String> sourceDict;
- // handle the column that all records is null
- if (sourceCubeSegment.getDictionary(col) == null) {
- BytesUtil.writeUnsigned(mergedDict.nullId(), newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- bufOffset += mergedDict.getSizeOfId();
- continue;
- } else {
- sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
- }
-
- while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfId() > newKeyBodyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBodyBuf;
- newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
- }
-
- int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
- int idInMergedDict;
-
- //int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
- String v = sourceDict.getValueFromId(idInSourceDict);
- if (v == null) {
- idInMergedDict = mergedDict.nullId();
- } else {
- idInMergedDict = mergedDict.getIdFromValue(v);
- }
-
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- bufOffset += mergedDict.getSizeOfId();
- } else {
- // keep as it is
- while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBodyBuf;
- newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
- }
-
- System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
- bufOffset += splittedByteses[useSplit].length;
- }
- }
-
- int fullKeySize = rowkeyEncoder.getBytesLength();
- while (newKeyBuf.array().length < fullKeySize) {
- newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
- }
- newKeyBuf.setLength(fullKeySize);
-
- rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
- outputKey.set(newKeyBuf.array(), 0, fullKeySize);
-
- // re-encode measures if dictionary is used
- if (dictMeasures.size() > 0) {
- codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
- for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
- int i = pair.getFirst();
- MeasureIngester ingester = pair.getSecond();
- measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
- }
- ByteBuffer valueBuf = codec.encode(measureObjs);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
- value = outputValue;
- }
-
- context.write(outputKey, value);
+ newPair = reEncoder.reEncode(key, value);
+ context.write(newPair.getFirst(), newPair.getSecond());
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
old mode 100755
new mode 100644
similarity index 68%
copy from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
copy to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
index 0283c21..856a59f
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
@@ -6,33 +6,31 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
+import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -41,11 +39,6 @@ import org.apache.kylin.cube.kv.RowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.measure.MeasureType;
@@ -56,67 +49,44 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
- * @author ysong1, honma
+ * Re-encode the cuboid from old segment (before merge) to new segment (after merge).
*/
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
-
- private KylinConfig config;
- private String cubeName;
- private String segmentID;
- private CubeManager cubeManager;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment mergedCubeSegment;
- private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-
- private Text outputKey = new Text();
+public class SegmentReEncoder implements Serializable {
+ private volatile transient boolean initialized = false;
+ private CubeSegment mergingSeg;
+ private CubeSegment mergedSeg;
private byte[] newKeyBodyBuf;
private ByteArray newKeyBuf;
private RowKeySplitter rowKeySplitter;
private RowKeyEncoderProvider rowKeyEncoderProvider;
-
// for re-encode measures that use dictionary
private List<Pair<Integer, MeasureIngester>> dictMeasures;
private Map<TblColRef, Dictionary<String>> oldDicts;
private Map<TblColRef, Dictionary<String>> newDicts;
private List<MeasureDesc> measureDescs;
private BufferedMeasureCodec codec;
- private Object[] measureObjs;
- private Text outputValue;
-
- @Override
- protected void doSetup(Context context) throws IOException, InterruptedException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME);
- segmentID = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_ID);
-
- config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- cubeManager = CubeManager.getInstance(config);
- cube = cubeManager.getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- mergedCubeSegment = cube.getSegmentById(segmentID);
+ private CubeDesc cubeDesc;
+ private KylinConfig kylinConfig;
+
+ public SegmentReEncoder(CubeDesc cubeDesc, CubeSegment mergingSeg, CubeSegment mergedSeg, KylinConfig kylinConfig) {
+ this.cubeDesc = cubeDesc;
+ this.mergingSeg = mergingSeg;
+ this.mergedSeg = mergedSeg;
+ this.kylinConfig = kylinConfig;
+ init();
+ }
- // int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+ private void init() {
newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];// size will auto-grow
newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
- // decide which source segment
- FileSplit fileSplit = (FileSplit) context.getInputSplit();
- IMROutput2.IMRMergeOutputFormat outputFormat = MRUtil.getBatchMergeOutputSide2(mergedCubeSegment).getOuputFormat();
- sourceCubeSegment = outputFormat.findSourceSegment(fileSplit, cube);
-
- rowKeySplitter = new RowKeySplitter(sourceCubeSegment);
- rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
+ rowKeySplitter = new RowKeySplitter(mergingSeg);
+ rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedSeg);
measureDescs = cubeDesc.getMeasures();
codec = new BufferedMeasureCodec(measureDescs);
- measureObjs = new Object[measureDescs.size()];
- outputValue = new Text();
dictMeasures = Lists.newArrayList();
oldDicts = Maps.newHashMap();
@@ -128,13 +98,13 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
boolean needReEncode = false;
for (TblColRef col : columns) {
//handle the column that all records is null
- if (sourceCubeSegment.getDictionary(col) == null) {
+ if (mergingSeg.getDictionary(col) == null) {
continue;
}
- oldDicts.put(col, sourceCubeSegment.getDictionary(col));
- newDicts.put(col, mergedCubeSegment.getDictionary(col));
- if (!sourceCubeSegment.getDictionary(col).equals(mergedCubeSegment.getDictionary(col))) {
+ oldDicts.put(col, mergingSeg.getDictionary(col));
+ newDicts.put(col, mergedSeg.getDictionary(col));
+ if (!mergingSeg.getDictionary(col).equals(mergedSeg.getDictionary(col))) {
needReEncode = true;
}
}
@@ -142,10 +112,71 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
}
}
+ initialized = true;
+ }
+
+ /**
+ * Re-encode with both dimension and measure in encoded (Text) format.
+ * @param key
+ * @param value
+ * @return
+ * @throws IOException
+ */
+ public Pair<Text, Text> reEncode(Text key, Text value) throws IOException {
+ if (initialized == false) {
+ throw new IllegalStateException("Not initialized");
+ }
+ Object[] measureObjs = new Object[measureDescs.size()];
+ // re-encode measures if dictionary is used
+ if (dictMeasures.size() > 0) {
+ codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+ for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+ int i = pair.getFirst();
+ MeasureIngester ingester = pair.getSecond();
+ measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+ }
+
+ ByteBuffer valueBuf = codec.encode(measureObjs);
+ byte[] resultValue = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, resultValue, 0, valueBuf.position());
+
+ return Pair.newPair(processKey(key), new Text(resultValue));
+ } else {
+ return Pair.newPair(processKey(key), value);
+ }
}
- @Override
- public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
+ /**
+ * Re-encode with measures in Object[] format.
+ * @param key
+ * @param value
+ * @return
+ * @throws IOException
+ */
+ public Pair<Text, Object[]> reEncode2(Text key, Text value) throws IOException {
+ if (initialized == false) {
+ throw new IllegalStateException("Not initialized");
+ }
+
+ Object[] measureObjs = new Object[measureDescs.size()];
+ codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+ // re-encode measures if dictionary is used
+ if (dictMeasures.size() > 0) {
+ for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+ int i = pair.getFirst();
+ MeasureIngester ingester = pair.getSecond();
+ measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+ }
+
+ ByteBuffer valueBuf = codec.encode(measureObjs);
+ byte[] resultValue = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, resultValue, 0, valueBuf.position());
+
+ }
+ return Pair.newPair(processKey(key), measureObjs);
+ }
+
+ private Text processKey(Text key) throws IOException {
long cuboidID = rowKeySplitter.split(key.getBytes());
Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
@@ -160,8 +191,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
if (cubeDesc.getRowkey().isUseDictionary(col)) {
// if dictionary on fact table column, needs rewrite
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<String> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+ DictionaryManager dictMgr = DictionaryManager.getInstance(kylinConfig);
+ Dictionary<String> mergedDict = dictMgr.getDictionary(mergedSeg.getDictResPath(col));
// handle the dict of all merged segments is null
if (mergedDict == null) {
@@ -170,12 +201,12 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
Dictionary<String> sourceDict;
// handle the column that all records is null
- if (sourceCubeSegment.getDictionary(col) == null) {
+ if (mergingSeg.getDictionary(col) == null) {
BytesUtil.writeUnsigned(mergedDict.nullId(), newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
bufOffset += mergedDict.getSizeOfId();
continue;
} else {
- sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+ sourceDict = dictMgr.getDictionary(mergingSeg.getDictResPath(col));
}
while (sourceDict.getSizeOfValue() > newKeyBodyBuf.length - bufOffset || //
@@ -186,7 +217,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
}
- int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0, splittedByteses[useSplit].length);
+ int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0,
+ splittedByteses[useSplit].length);
int idInMergedDict;
//int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
@@ -207,7 +239,8 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
}
- System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset, splittedByteses[useSplit].length);
+ System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset,
+ splittedByteses[useSplit].length);
bufOffset += splittedByteses[useSplit].length;
}
}
@@ -219,21 +252,10 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
newKeyBuf.setLength(fullKeySize);
rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
- outputKey.set(newKeyBuf.array(), 0, fullKeySize);
- // re-encode measures if dictionary is used
- if (dictMeasures.size() > 0) {
- codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
- for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
- int i = pair.getFirst();
- MeasureIngester ingester = pair.getSecond();
- measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
- }
- ByteBuffer valueBuf = codec.encode(measureObjs);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
- value = outputValue;
- }
+ byte[] resultKey = new byte[fullKeySize];
+ System.arraycopy(newKeyBuf.array(), 0, resultKey, 0, fullKeySize);
- context.write(outputKey, value);
+ return new Text(resultKey);
}
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
index 90c52c2..a13f96a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -96,6 +96,14 @@ public class KylinKryoRegistrator implements KryoRegistrator {
kyroClasses.add(Class.class);
+ //shaded classes
+ addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MutableRoaringArray");
+ addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MutableRoaringBitmap");
+ addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MappeableArrayContainer");
+ addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.MappeableBitmapContainer");
+ addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.ImmutableRoaringBitmap");
+ addClassQuitely(kyroClasses, "org.apache.kylin.job.shaded.org.roaringbitmap.buffer.ImmutableRoaringArray");
+
addClassQuitely(kyroClasses, "com.google.common.collect.EmptyImmutableList");
addClassQuitely(kyroClasses, "java.nio.HeapShortBuffer");
addClassQuitely(kyroClasses, "java.nio.HeapLongBuffer");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
index 7976c3b..b8821b6 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinSparkJobListener.java
@@ -29,7 +29,7 @@ public class KylinSparkJobListener extends SparkListener implements Serializable
@Override
public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
- if (taskEnd.taskMetrics().outputMetrics() != null) {
+ if (taskEnd != null && taskEnd.taskMetrics() != null && taskEnd.taskMetrics().outputMetrics() != null) {
metrics.bytesWritten += taskEnd.taskMetrics().outputMetrics().bytesWritten();
metrics.recordsWritten += taskEnd.taskMetrics().outputMetrics().recordsWritten();
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index b68f6e86..97861a3 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -18,11 +18,17 @@
package org.apache.kylin.engine.spark;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +67,9 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
+ // merge cube
+ result.addTask(createMergeCuboidDataStep(cubeSegment, mergingSegments, jobId));
+
// Phase 2: Merge Cube Files
outputSide.addStepPhase2_BuildCube(seg, mergingSegments, result);
@@ -70,5 +79,39 @@ public class SparkBatchMergeJobBuilder2 extends JobBuilderSupport {
return result;
}
-
+
+ public SparkExecutable createMergeCuboidDataStep(CubeSegment seg, List<CubeSegment> mergingSegments, String jobID) {
+
+ final List<String> mergingCuboidPaths = Lists.newArrayList();
+ for (CubeSegment merging : mergingSegments) {
+ mergingCuboidPaths.add(getCuboidRootPath(merging));
+ }
+ String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+ String outputPath = getCuboidRootPath(jobID);
+
+ final SparkExecutable sparkExecutable = new SparkExecutable();
+ sparkExecutable.setClassName(SparkCubingMerge.class.getName());
+ sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
+ sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), seg.getUuid());
+ sparkExecutable.setParam(SparkCubingMerge.OPTION_INPUT_PATH.getOpt(), formattedPath);
+ sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(),
+ getSegmentMetadataUrl(seg.getConfig(), jobID));
+ sparkExecutable.setParam(SparkCubingMerge.OPTION_OUTPUT_PATH.getOpt(), outputPath);
+
+ sparkExecutable.setJobId(jobID);
+ sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
+
+ StringBuilder jars = new StringBuilder();
+
+ StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
+ sparkExecutable.setJars(jars.toString());
+
+ return sparkExecutable;
+ }
+
+ public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
+ Map<String, String> param = new HashMap<>();
+ param.put("path", getDumpMetadataPath(jobId));
+ return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
+ }
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index f842fd5..0cc8a3b 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,7 +17,6 @@
*/
package org.apache.kylin.engine.spark;
-import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -34,7 +33,6 @@ import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
@@ -47,7 +45,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
import org.apache.kylin.cube.model.CubeDesc;
@@ -137,7 +134,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
JavaSparkContext sc = new JavaSparkContext(conf);
+ sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
@@ -221,7 +220,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
JavaPairRDD<ByteArray, Object[]>[] allRDDs = new JavaPairRDD[totalLevels + 1];
int level = 0;
- int partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
+ int partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
// aggregate to calculate base cuboid
allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, partition).persist(storageLevel);
@@ -230,7 +229,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
// aggregate to ND cuboids
for (level = 1; level <= totalLevels; level++) {
- partition = estimateRDDPartitionNum(level, cubeStatsReader, envConfig);
+ partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
.reduceByKey(reducerFunction2, partition).persist(storageLevel);
if (envConfig.isSparkSanityCheckEnabled() == true) {
@@ -241,7 +240,8 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
allRDDs[totalLevels].unpersist();
logger.info("Finished on calculating all level cuboids.");
- // deleteHDFSMeta(metaUrl);
+ logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+ HadoopUtil.deleteHDFSMeta(metaUrl);
}
protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) throws Exception {
@@ -249,15 +249,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
job.setOutputValueClass(Text.class);
}
- protected int estimateRDDPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
- double baseCuboidSize = statsReader.estimateLayerSize(level);
- float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
- int partition = (int) (baseCuboidSize / rddCut);
- partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
- partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
- logger.info("Partition for spark cubing: {}", partition);
- return partition;
- }
protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config,
CubeSegment segment, int level) {
@@ -417,7 +408,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
private String metaUrl;
private CubeSegment cubeSegment;
private CubeDesc cubeDesc;
- private CuboidScheduler cuboidScheduler;
private NDCuboidBuilder ndCuboidBuilder;
private RowKeySplitter rowKeySplitter;
private volatile transient boolean initialized = false;
@@ -435,7 +425,6 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
this.cubeSegment = cubeInstance.getSegmentById(segmentId);
this.cubeDesc = cubeInstance.getDescriptor();
- this.cuboidScheduler = cubeSegment.getCuboidScheduler();
this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
this.rowKeySplitter = new RowKeySplitter(cubeSegment);
}
@@ -508,9 +497,4 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
return count;
}
- protected void deleteHDFSMeta(String metaUrl) throws IOException {
- String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
- HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
- logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
- }
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
new file mode 100644
index 0000000..8d78920
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.StringSplitter;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.engine.mr.steps.SegmentReEncoder;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function2;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import scala.Tuple2;
+
+/**
+ */
+public class SparkCubingMerge extends AbstractApplication implements Serializable {
+
+ protected static final Logger logger = LoggerFactory.getLogger(SparkCubingMerge.class);
+
+ public static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+ .isRequired(true).withDescription("Cube Name").create(BatchConstants.ARG_CUBE_NAME);
+ public static final Option OPTION_SEGMENT_ID = OptionBuilder.withArgName("segment").hasArg().isRequired(true)
+ .withDescription("Cube Segment Id").create("segmentId");
+ public static final Option OPTION_META_URL = OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+ .withDescription("HDFS metadata url").create("metaUrl");
+ public static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
+ .isRequired(true).withDescription("HFile output path").create(BatchConstants.ARG_OUTPUT);
+ public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
+ .isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
+
+ private Options options;
+
+ public SparkCubingMerge() {
+ options = new Options();
+ options.addOption(OPTION_META_URL);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ }
+
+ @Override
+ protected Options getOptions() {
+ return options;
+ }
+
+ @Override
+ protected void execute(OptionsHelper optionsHelper) throws Exception {
+ final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+ final String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
+ final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+ final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+
+ Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1") };
+
+ SparkConf conf = new SparkConf().setAppName("Merge segments for cube:" + cubeName + ", segment " + segmentId);
+ //serialization conf
+ conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
+ conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+
+ JavaSparkContext sc = new JavaSparkContext(conf);
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
+ sc.sc().addSparkListener(jobListener);
+
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+ final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
+ final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+ final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+ final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ final CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, envConfig);
+
+ logger.info("Input path: {}", inputPath);
+ logger.info("Output path: {}", outputPath);
+
+ String[] inputFolders = StringSplitter.split(inputPath, ",");
+
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ List<JavaPairRDD<Text, Object[]>> mergingSegs = Lists.newArrayList();
+ for (int i = 0; i < inputFolders.length; i++) {
+ String path = inputFolders[i];
+ CubeSegment sourceSegment = findSourceSegment(path, cubeInstance);
+
+ // each folder will be a rdd, union them
+ List<JavaPairRDD> cuboidRdds = SparkUtil.parseInputPath(path, fs, sc, Text.class, Text.class);
+ JavaPairRDD<Text, Text> segRdd = sc.union(cuboidRdds.toArray(new JavaPairRDD[cuboidRdds.size()]));
+
+ // re-encode with new dictionaries
+ JavaPairRDD<Text, Object[]> newEcoddedRdd = segRdd.mapToPair(new ReEncodCuboidFunction(cubeName,
+ sourceSegment.getUuid(), cubeSegment.getUuid(), metaUrl, sConf));
+ mergingSegs.add(newEcoddedRdd);
+ }
+
+ final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
+
+ // reduce
+ JavaPairRDD<Text, Object[]> mergedRdd = sc.union(mergingSegs.toArray(new JavaPairRDD[mergingSegs.size()]))
+ .reduceByKey(new Function2<Object[], Object[], Object[]>() {
+ @Override
+ public Object[] call(Object[] input1, Object[] input2) throws Exception {
+ Object[] measureObjs = new Object[input1.length];
+ aggregators.aggregate(input1, input2, measureObjs);
+ return measureObjs;
+ }
+ }, SparkUtil.estimateTotalPartitionNum(cubeStatsReader, envConfig));
+
+ Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
+ confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+ final Job job = Job.getInstance(confOverwrite);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+ mergedRdd.mapToPair(
+ new PairFunction<Tuple2<Text, Object[]>, org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+ private volatile transient boolean initialized = false;
+ BufferedMeasureCodec codec;
+
+ @Override
+ public Tuple2<org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> call(
+ Tuple2<Text, Object[]> tuple2) throws Exception {
+
+ if (initialized == false) {
+ synchronized (SparkCubingByLayer.class) {
+ if (initialized == false) {
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new BufferedMeasureCodec(desc.getMeasures());
+ initialized = true;
+ }
+ }
+ }
+ ByteBuffer valueBuf = codec.encode(tuple2._2());
+ byte[] encodedBytes = new byte[valueBuf.position()];
+ System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
+ return new Tuple2<>(tuple2._1(), new org.apache.hadoop.io.Text(encodedBytes));
+ }
+
+ }).saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ // output the data size to console, job engine will parse and save the metric
+ // please note: this mechanism won't work when spark.submit.deployMode=cluster
+ logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+ HadoopUtil.deleteHDFSMeta(metaUrl);
+ }
+
+ static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
+ private volatile transient boolean initialized = false;
+ private String cubeName;
+ private String sourceSegmentId;
+ private String mergedSegmentId;
+ private String metaUrl;
+ private SerializableConfiguration conf;
+ private transient KylinConfig kylinConfig;
+ private transient SegmentReEncoder segmentReEncoder = null;
+ private transient Pair<Text, Object[]> encodedPari = null;
+
+ ReEncodCuboidFunction(String cubeName, String sourceSegmentId, String mergedSegmentId, String metaUrl,
+ SerializableConfiguration conf) {
+ this.cubeName = cubeName;
+ this.sourceSegmentId = sourceSegmentId;
+ this.mergedSegmentId = mergedSegmentId;
+ this.metaUrl = metaUrl;
+ this.conf = conf;
+ }
+
+ private void init() {
+ this.kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ final CubeInstance cube = CubeManager.getInstance(kylinConfig).getCube(cubeName);
+ final CubeDesc cubeDesc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cube.getDescName());
+ final CubeSegment sourceSeg = cube.getSegmentById(sourceSegmentId);
+ final CubeSegment mergedSeg = cube.getSegmentById(mergedSegmentId);
+ this.segmentReEncoder = new SegmentReEncoder(cubeDesc, sourceSeg, mergedSeg, kylinConfig);
+ }
+
+ @Override
+ public Tuple2<Text, Object[]> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+ if (initialized == false) {
+ synchronized (ReEncodCuboidFunction.class) {
+ if (initialized == false) {
+ init();
+ initialized = true;
+ }
+ }
+ }
+ encodedPari = segmentReEncoder.reEncode2(textTextTuple2._1, textTextTuple2._2);
+ return new Tuple2(encodedPari.getFirst(), encodedPari.getSecond());
+ }
+ }
+
+ private CubeSegment findSourceSegment(String filePath, CubeInstance cube) {
+ String jobID = JobBuilderSupport.extractJobIDFromPath(filePath);
+ return CubeInstance.findSegmentWithJobId(jobID, cube);
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 90442a4..961fd6c 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -20,7 +20,9 @@ package org.apache.kylin.engine.spark;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -44,6 +46,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.Segments;
import org.slf4j.LoggerFactory;
/**
@@ -132,9 +135,17 @@ public class SparkExecutable extends AbstractExecutable {
String segmentID = this.getParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt());
CubeSegment segment = cube.getSegmentById(segmentID);
+ Segments<CubeSegment> mergingSeg = cube.getMergingSegments(segment);
try {
- attachSegmentMetadataWithDict(segment);
+ if (mergingSeg == null || mergingSeg.size() == 0) {
+ attachSegmentMetadataWithDict(segment);
+ } else {
+ List<CubeSegment> allRelatedSegs = new ArrayList();
+ allRelatedSegs.add(segment);
+ allRelatedSegs.addAll(mergingSeg);
+ attachSegmentsMetadataWithDict(allRelatedSegs);
+ }
} catch (IOException e) {
throw new ExecuteException("meta dump failed");
}
@@ -197,6 +208,16 @@ public class SparkExecutable extends AbstractExecutable {
dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segment.getConfig());
}
+ private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) throws IOException {
+ Set<String> dumpList = new LinkedHashSet<>();
+ dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+ for (CubeSegment segment : segments) {
+ dumpList.addAll(segment.getDictionaryPaths());
+ dumpList.add(segment.getStatisticsResourcePath());
+ }
+ dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) segments.get(0).getConfig());
+ }
+
private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig)
throws IOException {
File tmp = File.createTempFile("kylin_job_meta", "");
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index a4e17c3..da207ee 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -18,17 +18,24 @@
package org.apache.kylin.engine.spark;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.GenericOptionsParser;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.IMROutput2;
-import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import com.google.common.collect.Lists;
public class SparkUtil {
@@ -37,10 +44,6 @@ public class SparkUtil {
return SourceManager.createEngineAdapter(seg, ISparkInput.class).getBatchCubingInputSide(flatDesc);
}
- private static TableDesc getTableDesc(String tableName, String prj) {
- return TableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName, prj);
- }
-
public static ISparkOutput.ISparkBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
return StorageFactory.createEngineAdapter(seg, ISparkOutput.class).getBatchCubingOutputSide(seg);
}
@@ -57,7 +60,47 @@ public class SparkUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchOptimizeOutputSide(seg);
}
- private static synchronized GenericOptionsParser getParser(Configuration conf, String[] args) throws Exception {
- return new GenericOptionsParser(conf, args);
+ public static List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc, Class keyClass,
+ Class valueClass) throws IOException {
+ List<JavaPairRDD> inputRDDs = Lists.newArrayList();
+ Path inputHDFSPath = new Path(inputPath);
+ FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
+ boolean hasDir = false;
+ for (FileStatus stat : fileStatuses) {
+ if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
+ hasDir = true;
+ inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), keyClass, valueClass));
+ }
+ }
+
+ if (!hasDir) {
+ inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), keyClass, valueClass));
+ }
+
+ return inputRDDs;
}
+
+
+ public static int estimateLayerPartitionNum(int level, CubeStatsReader statsReader, KylinConfig kylinConfig) {
+ double baseCuboidSize = statsReader.estimateLayerSize(level);
+ float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+ int partition = (int) (baseCuboidSize / rddCut);
+ partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+ partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+ return partition;
+ }
+
+
+ public static int estimateTotalPartitionNum(CubeStatsReader statsReader, KylinConfig kylinConfig) {
+ double totalSize = 0;
+ for (double x: statsReader.getCuboidSizeMap().values()) {
+ totalSize += x;
+ }
+ float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
+ int partition = (int) (totalSize / rddCut);
+ partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
+ partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+ return partition;
+ }
+
}
diff --git a/examples/sample_cube/template/cube_desc/kylin_sales_cube.json b/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
index 124fc73..aff174e 100644
--- a/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
+++ b/examples/sample_cube/template/cube_desc/kylin_sales_cube.json
@@ -275,7 +275,8 @@
"engine_type" : %default_engine_type%,
"storage_type" : %default_storage_type%,
"override_kylin_properties" : {
- "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true"
+ "kylin.cube.aggrgroup.is-mandatory-only-valid" : "true",
+ "kylin.engine.spark.rdd-partition-cut-mb" : "500"
},
"cuboid_black_list" : [ ],
"parent_forward" : 3,
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index a3aa0f4..edced8f 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -159,10 +159,10 @@ kylin.security.acl.admin-role=admin
# Help info, format{name|displayName|link}, optional
kylin.web.help.length=4
-kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs21/tutorial/kylin_sample.html
-kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs21/tutorial/odbc.html
-kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs21/tutorial/tableau_91.html
-kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs21/howto/howto_optimize_cubes.html
+kylin.web.help.0=start|Getting Started|http://kylin.apache.org/docs/tutorial/kylin_sample.html
+kylin.web.help.1=odbc|ODBC Driver|http://kylin.apache.org/docs/tutorial/odbc.html
+kylin.web.help.2=tableau|Tableau Guide|http://kylin.apache.org/docs/tutorial/tableau_91.html
+kylin.web.help.3=onboard|Cube Design Tutorial|http://kylin.apache.org/docs/howto/howto_optimize_cubes.html
#allow user to export query result
kylin.web.export-allow-admin=true
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 94a60f8..a68643f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -20,8 +20,6 @@ package org.apache.kylin.storage.hbase.steps;
import java.util.List;
import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
@@ -35,6 +33,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.MapReduceUtil;
import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
@@ -139,8 +138,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
}
@Override
- public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
+ public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
+ DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(
+ steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
@@ -157,9 +158,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
};
}
- public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat{
-
- private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
+ public static class HBaseMergeMROutputFormat implements IMRMergeOutputFormat {
@Override
public void configureJobInput(Job job, String input) throws Exception {
@@ -181,29 +180,10 @@ public class HBaseMROutput2Transition implements IMROutput2 {
@Override
public CubeSegment findSourceSegment(FileSplit fileSplit, CubeInstance cube) {
String filePath = fileSplit.getPath().toString();
- String jobID = extractJobIDFromPath(filePath);
- return findSegmentWithUuid(jobID, cube);
- }
-
- private static String extractJobIDFromPath(String path) {
- Matcher matcher = JOB_NAME_PATTERN.matcher(path);
- // check the first occurrence
- if (matcher.find()) {
- return matcher.group(1);
- } else {
- throw new IllegalStateException("Can not extract job ID from file path : " + path);
- }
+ String jobID = JobBuilderSupport.extractJobIDFromPath(filePath);
+ return CubeInstance.findSegmentWithJobId(jobID, cube);
}
- private static CubeSegment findSegmentWithUuid(String jobID, CubeInstance cubeInstance) {
- for (CubeSegment segment : cubeInstance.getSegments()) {
- String lastBuildJobID = segment.getLastBuildJobID();
- if (lastBuildJobID != null && lastBuildJobID.equalsIgnoreCase(jobID)) {
- return segment;
- }
- }
- throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
- }
}
public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
index 08f58af..e6c3ee8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkOutputTransition.java
@@ -21,7 +21,6 @@ package org.apache.kylin.storage.hbase.steps;
import java.util.List;
import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
import org.apache.kylin.engine.spark.ISparkOutput;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.slf4j.Logger;
@@ -80,8 +79,6 @@ public class HBaseSparkOutputTransition implements ISparkOutput {
@Override
public void addStepPhase2_BuildCube(CubeSegment seg, List<CubeSegment> mergingSegments,
DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(
- steps.createMergeCuboidDataStep(seg, mergingSegments, jobFlow.getId(), MergeCuboidJob.class));
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index c2794ff..622a0e8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -46,19 +46,25 @@ public class HBaseSparkSteps extends HBaseJobSteps {
sparkExecutable.setParam(SparkCubeHFile.OPTION_META_URL.getOpt(),
jobBuilder2.getSegmentMetadataUrl(seg.getConfig(), jobId));
sparkExecutable.setParam(SparkCubeHFile.OPTION_OUTPUT_PATH.getOpt(), getHFilePath(jobId));
- sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(), getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
+ sparkExecutable.setParam(SparkCubeHFile.OPTION_PARTITION_FILE_PATH.getOpt(),
+ getRowkeyDistributionOutputPath(jobId) + "/part-r-00000_hfile");
sparkExecutable.setJobId(jobId);
StringBuilder jars = new StringBuilder();
StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.KeyValue.class));
- StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class));
- StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.regionserver.BloomType.class));
- StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.class)); //hbase-protocal.jar
- StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar(org.apache.hadoop.hbase.CompatibilityFactory.class)); //hbase-hadoop-compact.jar
+ StringUtil.appendWithSeparator(jars,
+ ClassUtil.findContainingJar(org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.class));
+ StringUtil.appendWithSeparator(jars,
+ ClassUtil.findContainingJar(org.apache.hadoop.hbase.regionserver.BloomType.class));
+ StringUtil.appendWithSeparator(jars,
+ ClassUtil.findContainingJar(org.apache.hadoop.hbase.protobuf.generated.HFileProtos.class)); //hbase-protocal.jar
+ StringUtil.appendWithSeparator(jars,
+ ClassUtil.findContainingJar(org.apache.hadoop.hbase.CompatibilityFactory.class)); //hbase-hadoop-compact.jar
StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar
StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("org.apache.htrace.Trace", null)); // htrace-core.jar
- StringUtil.appendWithSeparator(jars, ClassUtil.findContainingJar("com.yammer.metrics.core.MetricsRegistry", null)); // metrics-core.jar
+ StringUtil.appendWithSeparator(jars,
+ ClassUtil.findContainingJar("com.yammer.metrics.core.MetricsRegistry", null)); // metrics-core.jar
StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
sparkExecutable.setJars(jars.toString());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index a23156c..0136d4e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -30,7 +30,6 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
@@ -42,8 +41,8 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
@@ -57,6 +56,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.spark.KylinSparkJobListener;
+import org.apache.kylin.engine.spark.SparkUtil;
import org.apache.kylin.measure.MeasureCodec;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.spark.Partitioner;
@@ -94,8 +94,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
private Options options;
- private KylinSparkJobListener jobListener;
-
public SparkCubeHFile() {
options = new Options();
options.addOption(OPTION_INPUT_PATH);
@@ -104,7 +102,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
options.addOption(OPTION_META_URL);
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_PARTITION_FILE_PATH);
- jobListener = new KylinSparkJobListener();
}
@Override
@@ -130,6 +127,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
+ KylinSparkJobListener jobListener = new KylinSparkJobListener();
JavaSparkContext sc = new JavaSparkContext(conf);
sc.sc().addSparkListener(jobListener);
final FileSystem fs = partitionFilePath.getFileSystem(sc.hadoopConfiguration());
@@ -161,7 +159,7 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
logger.info("Input path: {}", inputPath);
logger.info("Output path: {}", outputPath);
- List<JavaPairRDD> inputRDDs = parseInputPath(inputPath, fs, sc);
+ List<JavaPairRDD> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
final JavaPairRDD<Text, Text> allCuboidFile = sc.union(inputRDDs.toArray(new JavaPairRDD[inputRDDs.size()]));
final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
if (quickPath) {
@@ -236,33 +234,15 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
logger.debug(ioe.getMessage());
}
- hfilerdd2.saveAsNewAPIHadoopFile(outputPath, ImmutableBytesWritable.class, KeyValue.class,
- HFileOutputFormat2.class, job.getConfiguration());
+ FileOutputFormat.setOutputPath(job, new Path(outputPath));
+ hfilerdd2.saveAsNewAPIHadoopDataset(job.getConfiguration());
// output the data size to console, job engine will parse and save the metric
// please note: this mechanism won't work when spark.submit.deployMode=cluster
- System.out.println("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
- deleteHDFSMeta(metaUrl);
+ logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
+ HadoopUtil.deleteHDFSMeta(metaUrl);
}
- private List<JavaPairRDD> parseInputPath(String inputPath, FileSystem fs, JavaSparkContext sc) throws IOException {
- List<JavaPairRDD> inputRDDs = Lists.newArrayList();
- Path inputHDFSPath = new Path(inputPath);
- FileStatus[] fileStatuses = fs.listStatus(inputHDFSPath);
- boolean hasDir = false;
- for (FileStatus stat : fileStatuses) {
- if (stat.isDirectory() && !stat.getPath().getName().startsWith("_")) {
- hasDir = true;
- inputRDDs.add(sc.sequenceFile(stat.getPath().toString(), Text.class, Text.class));
- }
- }
-
- if (!hasDir) {
- inputRDDs.add(sc.sequenceFile(inputHDFSPath.toString(), Text.class, Text.class));
- }
-
- return inputRDDs;
- }
static class HFilePartitioner extends Partitioner {
private List<RowKeyWritable> keys;
@@ -298,10 +278,4 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
}
}
- protected void deleteHDFSMeta(String metaUrl) throws IOException {
- String realHdfsPath = StorageURL.valueOf(metaUrl).getParameter("path");
- HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), true);
- logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
- }
-
}