You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/07/26 02:48:51 UTC
[kylin] branch master updated: KYLIN-3452 Optimize spark cubing
memory footprint
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 312eaae KYLIN-3452 Optimize spark cubing memory footprint
312eaae is described below
commit 312eaae7d469ed90984fc32f11d9b772e5f5bb89
Author: shaofengshi <sh...@apache.org>
AuthorDate: Sun Jul 15 18:01:33 2018 +0800
KYLIN-3452 Optimize spark cubing memory footprint
---
.../src/main/resources/kylin-defaults.properties | 8 +-
.../java/org/apache/kylin/cube/CubeManager.java | 2 +-
.../apache/kylin/cube/common/RowKeySplitter.java | 38 ++++----
.../java/org/apache/kylin/cube/cuboid/Cuboid.java | 2 +-
.../apache/kylin/cube/cuboid/CuboidManager.java | 15 ++++
.../org/apache/kylin/cube/kv/FuzzyMaskEncoder.java | 2 +-
.../org/apache/kylin/cube/kv/RowKeyDecoder.java | 10 +--
.../org/apache/kylin/cube/kv/RowKeyEncoder.java | 4 +-
.../kylin/engine/mr/common/AbstractHadoopJob.java | 11 ++-
.../kylin/engine/mr/common/NDCuboidBuilder.java | 54 +++++++----
.../kylin/engine/mr/steps/NDCuboidMapper.java | 3 +-
.../kylin/engine/mr/steps/SegmentReEncoder.java | 15 ++--
.../mr/steps/UpdateOldCuboidShardMapper.java | 7 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 46 +++++-----
.../kylin/engine/spark/SparkCubingMerge.java | 29 +++---
.../org/apache/kylin/engine/spark/SparkUtil.java | 18 ++++
.../common/coprocessor/CoprocessorProjector.java | 2 +-
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 100 ++++++++++-----------
18 files changed, 208 insertions(+), 158 deletions(-)
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index b391b5e..c01d3a8 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -290,9 +290,11 @@ kylin.engine.spark.max-partition=5000
kylin.engine.spark-conf.spark.master=yarn
#kylin.engine.spark-conf.spark.submit.deployMode=cluster
kylin.engine.spark-conf.spark.yarn.queue=default
-kylin.engine.spark-conf.spark.executor.memory=1G
-kylin.engine.spark-conf.spark.executor.cores=2
-kylin.engine.spark-conf.spark.executor.instances=1
+kylin.engine.spark-conf.spark.driver.memory=2G
+kylin.engine.spark-conf.spark.executor.memory=4G
+kylin.engine.spark-conf.spark.executor.instances=40
+kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=1024
+kylin.engine.spark-conf.spark.shuffle.service.enabled=true
kylin.engine.spark-conf.spark.eventLog.enabled=true
kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 9d377d9..a4ed9c3 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -125,7 +125,7 @@ public class CubeManager implements IRealizationProvider {
private DictionaryAssist dictAssist = new DictionaryAssist();
private CubeManager(KylinConfig cfg) throws IOException {
- logger.info("Initializing CubeManager with config " + config);
+ logger.info("Initializing CubeManager with config " + cfg);
this.config = cfg;
this.cubeMap = new CaseInsensitiveStringCache<CubeInstance>(config, "cube");
this.crud = new CachedCrudAssist<CubeInstance>(getStore(), ResourceStore.CUBE_RESOURCE_ROOT, CubeInstance.class,
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
index d0015a7..264c7a5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/common/RowKeySplitter.java
@@ -18,8 +18,8 @@
package org.apache.kylin.cube.common;
+import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.CubeDimEncMap;
@@ -32,18 +32,17 @@ import org.apache.kylin.metadata.model.TblColRef;
public class RowKeySplitter implements java.io.Serializable {
- private CubeSegment cubeSegment;
private CubeDesc cubeDesc;
private RowKeyColumnIO colIO;
- private SplittedBytes[] splitBuffers;
+ private ByteArray[] splitBuffers;
private int[] splitOffsets;
private int bufferSize;
private boolean enableSharding;
private short shardId;
- public SplittedBytes[] getSplitBuffers() {
+ public ByteArray[] getSplitBuffers() {
return splitBuffers;
}
@@ -67,7 +66,6 @@ public class RowKeySplitter implements java.io.Serializable {
this(cubeSeg, cubeSeg.getCubeDesc().getRowkey().getRowKeyColumns().length + 2, cubeSeg.getConfig().getDimensionEncodingMaxLength());
}
public RowKeySplitter(CubeSegment cubeSeg, int splitLen, int bytesLen) {
- this.cubeSegment = cubeSeg;
this.enableSharding = cubeSeg.isEnableSharding();
this.cubeDesc = cubeSeg.getCubeDesc();
IDimensionEncodingMap dimEncoding = new CubeDimEncMap(cubeSeg);
@@ -78,11 +76,8 @@ public class RowKeySplitter implements java.io.Serializable {
this.colIO = new RowKeyColumnIO(dimEncoding);
- this.splitBuffers = new SplittedBytes[splitLen];
+ this.splitBuffers = new ByteArray[splitLen];
this.splitOffsets = new int[splitLen];
- for (int i = 0; i < splitLen; i++) {
- this.splitBuffers[i] = new SplittedBytes(bytesLen);
- }
this.bufferSize = 0;
}
@@ -93,6 +88,11 @@ public class RowKeySplitter implements java.io.Serializable {
return null;
}
+
+ public long parseCuboid(byte[] bytes) {
+ int offset = enableSharding ? RowConstants.ROWKEY_SHARDID_LEN : 0;
+ return Bytes.toLong(bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
+ }
/**
* @param bytes
* @return cuboid ID
@@ -103,21 +103,18 @@ public class RowKeySplitter implements java.io.Serializable {
if (enableSharding) {
// extract shard
- SplittedBytes shardSplit = this.splitBuffers[this.bufferSize++];
- shardSplit.length = RowConstants.ROWKEY_SHARDID_LEN;
- System.arraycopy(bytes, offset, shardSplit.value, 0, RowConstants.ROWKEY_SHARDID_LEN);
+ ByteArray shardSplit = new ByteArray(bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
+ this.splitBuffers[this.bufferSize++] = shardSplit;
offset += RowConstants.ROWKEY_SHARDID_LEN;
//lastSplittedShard = Bytes.toShort(shardSplit.value, 0, shardSplit.length);
- shardId = Bytes.toShort(shardSplit.value);
+ shardId = Bytes.toShort(shardSplit.array(), shardSplit.offset());
}
// extract cuboid id
- SplittedBytes cuboidIdSplit = this.splitBuffers[this.bufferSize++];
- cuboidIdSplit.length = RowConstants.ROWKEY_CUBOIDID_LEN;
- System.arraycopy(bytes, offset, cuboidIdSplit.value, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+ ByteArray cuboidIdSplit = new ByteArray(bytes, offset, RowConstants.ROWKEY_CUBOIDID_LEN);
+ this.splitBuffers[this.bufferSize++] = cuboidIdSplit;
offset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
- long lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.value, 0, cuboidIdSplit.length);
+ long lastSplittedCuboidId = Bytes.toLong(cuboidIdSplit.array(), cuboidIdSplit.offset(), RowConstants.ROWKEY_CUBOIDID_LEN);
Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, lastSplittedCuboidId);
// rowkey columns
@@ -125,9 +122,8 @@ public class RowKeySplitter implements java.io.Serializable {
splitOffsets[i] = offset;
TblColRef col = cuboid.getColumns().get(i);
int colLength = colIO.getColumnLength(col);
- SplittedBytes split = this.splitBuffers[this.bufferSize++];
- split.length = colLength;
- System.arraycopy(bytes, offset, split.value, 0, colLength);
+ ByteArray split = new ByteArray(bytes, offset, colLength);
+ this.splitBuffers[this.bufferSize++] = split;
offset += colLength;
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
index 7d332bb..699b865 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/Cuboid.java
@@ -54,7 +54,7 @@ public class Cuboid implements Comparable<Cuboid>, Serializable {
// for mandatory cuboid, no need to translate cuboid
public static Cuboid findForMandatory(CubeDesc cube, long cuboidID) {
- return new Cuboid(cube, cuboidID, cuboidID);
+ return CuboidManager.getInstance(cube.getConfig()).findMandatoryId(cube, cuboidID);
}
public static Cuboid findCuboid(CuboidScheduler cuboidScheduler, Set<TblColRef> dimensions,
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
index 6efc87f..648447c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidManager.java
@@ -25,6 +25,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import com.google.common.collect.Maps;
+import org.apache.kylin.cube.model.CubeDesc;
/**
* A cuboid cache.
@@ -65,6 +66,20 @@ public class CuboidManager {
return cuboid;
}
+ public Cuboid findMandatoryId(CubeDesc cubeDesc, long cuboidID) {
+ Map<Long, Cuboid> cubeCache = schedulerCuboidCache.get(cubeDesc.getName());
+ if (cubeCache == null) {
+ cubeCache = Maps.newConcurrentMap();
+ schedulerCuboidCache.put(cubeDesc.getName(), cubeCache);
+ }
+ Cuboid cuboid = cubeCache.get(cuboidID);
+ if (cuboid == null) {
+ cuboid = new Cuboid(cubeDesc, cuboidID, cuboidID);
+ cubeCache.put(cuboidID, cuboid);
+ }
+ return cuboid;
+ }
+
public void clearCache(String cacheKey) {
schedulerCuboidCache.remove(cacheKey);
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
index 0cbb7d2..dd7157d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java
@@ -65,7 +65,7 @@ public class FuzzyMaskEncoder extends RowKeyEncoder {
}
@Override
- protected void fillHeader(byte[] bytes) {
+ public void fillHeader(byte[] bytes) {
int offset = 0;
if (enableSharding) {
Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 923ddb8..71ad4bf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -56,13 +56,13 @@ public class RowKeyDecoder {
long cuboidId = rowKeySplitter.split(bytes);
initCuboid(cuboidId);
- SplittedBytes[] splits = rowKeySplitter.getSplitBuffers();
+ ByteArray[] splits = rowKeySplitter.getSplitBuffers();
int offset = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboid id part
for (int i = 0; i < this.cuboid.getColumns().size(); i++) {
TblColRef col = this.cuboid.getColumns().get(i);
- collectValue(col, splits[offset].value, splits[offset].length);
+ collectValue(col, splits[offset].array(), splits[offset].offset(), splits[offset].length());
offset++;
}
@@ -76,8 +76,8 @@ public class RowKeyDecoder {
this.cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
}
- private void collectValue(TblColRef col, byte[] valueBytes, int length) throws IOException {
- String strValue = colIO.readColumnString(col, valueBytes, 0, length);
+ private void collectValue(TblColRef col, byte[] valueBytes, int offset, int length) throws IOException {
+ String strValue = colIO.readColumnString(col, valueBytes, offset, length);
values.add(strValue);
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
index 40cda76..8a2c484 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java
@@ -158,9 +158,9 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder implements java.io.Seri
return bytes;
}
- protected void fillHeader(byte[] bytes) {
- int offset = 0;
+ public void fillHeader(byte[] bytes) {
+ int offset = 0;
if (enableSharding) {
short shard = calculateShard(bytes);
BytesUtil.writeShort(shard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN);
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index fa3c22e..716eafe 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -75,6 +75,8 @@ import org.apache.kylin.metadata.project.ProjectManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+
@SuppressWarnings("static-access")
public abstract class AbstractHadoopJob extends Configured implements Tool {
private static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
@@ -127,6 +129,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
private static final String MAP_REDUCE_CLASSPATH = "mapreduce.application.classpath";
+ private static final Map<String, KylinConfig> kylinConfigCache = Maps.newConcurrentMap();
+
protected static void runJob(Tool job, String[] args) {
try {
int exitCode = ToolRunner.run(job, args);
@@ -488,6 +492,11 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
if (!uri.contains("@hdfs"))
throw new IllegalArgumentException("meta url should like @hdfs schema");
+ if (kylinConfigCache.get(uri) != null) {
+ logger.info("KylinConfig cached for : {}", uri);
+ return kylinConfigCache.get(uri);
+ }
+
logger.info("Ready to load KylinConfig from uri: {}", uri);
KylinConfig config;
FileSystem fs;
@@ -505,7 +514,7 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
// limitation of MR API. It works because MR task runs its own process. Do not copy.
@SuppressWarnings("unused")
SetAndUnsetThreadLocalConfig shouldAutoClose = KylinConfig.setAndUnsetThreadLocalConfig(config);
-
+ kylinConfigCache.put(uri, config);
return config;
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
index 4d9885e..6ad2619 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/NDCuboidBuilder.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -43,8 +42,7 @@ public class NDCuboidBuilder implements Serializable {
protected CubeSegment cubeSegment;
private RowKeySplitter rowKeySplitter;
private RowKeyEncoderProvider rowKeyEncoderProvider;
- private byte[] newKeyBodyBuf = new byte[RowConstants.ROWKEY_BUFFER_SIZE];
- private ByteArray newKeyBuf = ByteArray.allocate(RowConstants.ROWKEY_BUFFER_SIZE);
+ private ByteArray newKeyBodyBuf = null;
public NDCuboidBuilder(CubeSegment cubeSegment) {
this(cubeSegment, new RowKeyEncoderProvider(cubeSegment));
@@ -56,11 +54,42 @@ public class NDCuboidBuilder implements Serializable {
this.rowKeySplitter = new RowKeySplitter(cubeSegment);
}
+ /**
+ * Build the new key, return a reused ByteArray object. Suitable for MR
+ * @param parentCuboid
+ * @param childCuboid
+ * @param splitBuffers
+ * @return
+ */
+ public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, ByteArray[] splitBuffers) {
+ RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
+ int fullKeySize = rowkeyEncoder.getBytesLength();
+ if (newKeyBodyBuf == null || newKeyBodyBuf.length() < fullKeySize) {
+ newKeyBodyBuf = new ByteArray(fullKeySize);
+ }
+
+ buildKeyInternal(parentCuboid, childCuboid, splitBuffers, newKeyBodyBuf);
+ return new Pair<>(Integer.valueOf(fullKeySize), newKeyBodyBuf);
- public Pair<Integer, ByteArray> buildKey(Cuboid parentCuboid, Cuboid childCuboid, SplittedBytes[] splitBuffers) {
+ }
+
+ /**
+ * Build the new key, return a new ByteArray object each time. Suitable for spark
+ * @param parentCuboid
+ * @param childCuboid
+ * @param splitBuffers
+ * @return
+ */
+ public ByteArray buildKey2(Cuboid parentCuboid, Cuboid childCuboid, ByteArray[] splitBuffers) {
RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
+ int fullKeySize = rowkeyEncoder.getBytesLength();
+ ByteArray newKey = new ByteArray(fullKeySize);
+ buildKeyInternal(parentCuboid, childCuboid, splitBuffers, newKey);
+ return newKey;
+ }
- int offset = 0;
+ private void buildKeyInternal(Cuboid parentCuboid, Cuboid childCuboid, ByteArray[] splitBuffers, ByteArray newKeyBodyBuf) {
+ RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(childCuboid);
// rowkey columns
long mask = Long.highestOneBit(parentCuboid.getId());
@@ -68,28 +97,21 @@ public class NDCuboidBuilder implements Serializable {
long childCuboidId = childCuboid.getId();
long parentCuboidIdActualLength = (long)Long.SIZE - Long.numberOfLeadingZeros(parentCuboid.getId());
int index = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
+ int offset = RowConstants.ROWKEY_SHARDID_LEN + RowConstants.ROWKEY_CUBOIDID_LEN; // skip shard and cuboidId
for (int i = 0; i < parentCuboidIdActualLength; i++) {
if ((mask & parentCuboidId) > 0) {// if the this bit position equals
// 1
if ((mask & childCuboidId) > 0) {// if the child cuboid has this
// column
- System.arraycopy(splitBuffers[index].value, 0, newKeyBodyBuf, offset, splitBuffers[index].length);
- offset += splitBuffers[index].length;
+ System.arraycopy(splitBuffers[index].array(), splitBuffers[index].offset(), newKeyBodyBuf.array(), offset, splitBuffers[index].length());
+ offset += splitBuffers[index].length();
}
index++;
}
mask = mask >> 1;
}
- int fullKeySize = rowkeyEncoder.getBytesLength();
- while (newKeyBuf.array().length < fullKeySize) {
- newKeyBuf = new ByteArray(newKeyBuf.length() * 2);
- }
- newKeyBuf.setLength(fullKeySize);
-
- rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, offset), newKeyBuf);
-
- return new Pair<>(Integer.valueOf(fullKeySize), newKeyBuf);
+ rowkeyEncoder.fillHeader(newKeyBodyBuf.array());
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
index f2ec34c..0581f4e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java
@@ -106,9 +106,10 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
logger.info("Parent cuboid: " + parentCuboid.getId() + "; Children: " + myChildren);
}
+ Pair<Integer, ByteArray> result;
for (Long child : myChildren) {
Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
- Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
+ result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid, rowKeySplitter.getSplitBuffers());
outputKey.set(result.getSecond().array(), 0, result.getFirst());
context.write(outputKey, value);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
index 856a59f..35fa8af 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/SegmentReEncoder.java
@@ -30,7 +30,6 @@ import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.common.RowKeySplitter;
import org.apache.kylin.cube.cuboid.Cuboid;
@@ -181,7 +180,7 @@ public class SegmentReEncoder implements Serializable {
Cuboid cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
- SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
+ ByteArray[] splittedByteses = rowKeySplitter.getSplitBuffers();
int bufOffset = 0;
int bodySplitOffset = rowKeySplitter.getBodySplitOffset();
@@ -217,8 +216,8 @@ public class SegmentReEncoder implements Serializable {
System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
}
- int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].value, 0,
- splittedByteses[useSplit].length);
+ int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[useSplit].array(), splittedByteses[useSplit].offset(),
+ splittedByteses[useSplit].length());
int idInMergedDict;
//int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
@@ -233,15 +232,15 @@ public class SegmentReEncoder implements Serializable {
bufOffset += mergedDict.getSizeOfId();
} else {
// keep as it is
- while (splittedByteses[useSplit].length > newKeyBodyBuf.length - bufOffset) {
+ while (splittedByteses[useSplit].length() > newKeyBodyBuf.length - bufOffset) {
byte[] oldBuf = newKeyBodyBuf;
newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
}
- System.arraycopy(splittedByteses[useSplit].value, 0, newKeyBodyBuf, bufOffset,
- splittedByteses[useSplit].length);
- bufOffset += splittedByteses[useSplit].length;
+ System.arraycopy(splittedByteses[useSplit].array(), splittedByteses[useSplit].offset(), newKeyBodyBuf, bufOffset,
+ splittedByteses[useSplit].length());
+ bufOffset += splittedByteses[useSplit].length();
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
index 35b8924..3d18bd6 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateOldCuboidShardMapper.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
@@ -98,15 +97,15 @@ public class UpdateOldCuboidShardMapper extends KylinMapper<Text, Text, Text, Te
mos.write(outputKey, value, generateFileName(baseOutputPath));
}
- private int buildKey(Cuboid cuboid, SplittedBytes[] splitBuffers) {
+ private int buildKey(Cuboid cuboid, ByteArray[] splitBuffers) {
RowKeyEncoder rowkeyEncoder = rowKeyEncoderProvider.getRowkeyEncoder(cuboid);
int startIdx = rowKeySplitter.getBodySplitOffset(); // skip shard and cuboidId
int endIdx = startIdx + Long.bitCount(cuboid.getId());
int offset = 0;
for (int i = startIdx; i < endIdx; i++) {
- System.arraycopy(splitBuffers[i].value, 0, newKeyBodyBuf, offset, splitBuffers[i].length);
- offset += splitBuffers[i].length;
+ System.arraycopy(splitBuffers[i].array(), splitBuffers[i].offset(), newKeyBodyBuf, offset, splitBuffers[i].length());
+ offset += splitBuffers[i].length();
}
int fullKeySize = rowkeyEncoder.getBytesLength();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index e95e433..cb3af31 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -20,14 +20,12 @@ package org.apache.kylin.engine.spark;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
@@ -38,7 +36,6 @@ import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeDescManager;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -138,21 +135,19 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
JavaSparkContext sc = new JavaSparkContext(conf);
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
-
KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeDesc cubeDesc = cubeInstance.getDescriptor();
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
- confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
- final Job job = Job.getInstance(confOverwrite);
-
logger.info("RDD input path: {}", inputPath);
logger.info("RDD Output path: {}", outputPath);
- setHadoopConf(job, cubeSegment, metaUrl);
+
+ final Job job = Job.getInstance(sConf.get());
+ SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
int countMeasureIndex = 0;
for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
@@ -227,11 +222,13 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, job, envConfig);
+ PairFlatMapFunction flatMapFunction = new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf);
// aggregate to ND cuboids
for (level = 1; level <= totalLevels; level++) {
partition = SparkUtil.estimateLayerPartitionNum(level, cubeStatsReader, envConfig);
- allRDDs[level] = allRDDs[level - 1].flatMapToPair(new CuboidFlatMap(cubeName, segmentId, metaUrl, sConf))
- .reduceByKey(reducerFunction2, partition).persist(storageLevel);
+
+ allRDDs[level] = allRDDs[level - 1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, partition)
+ .persist(storageLevel);
allRDDs[level - 1].unpersist();
if (envConfig.isSparkSanityCheckEnabled() == true) {
sanityCheck(allRDDs[level], totalCount, level, cubeStatsReader, countMeasureIndex);
@@ -241,13 +238,9 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
allRDDs[totalLevels].unpersist();
logger.info("Finished on calculating all level cuboids.");
logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
- HadoopUtil.deleteHDFSMeta(metaUrl);
+ //HadoopUtil.deleteHDFSMeta(metaUrl);
}
- protected void setHadoopConf(Job job, CubeSegment segment, String metaUrl) throws Exception {
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- }
protected JavaPairRDD<ByteArray, Object[]> prepareOutput(JavaPairRDD<ByteArray, Object[]> rdd, KylinConfig config,
@@ -277,6 +270,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
codec = new BufferedMeasureCodec(desc.getMeasures());
initialized = true;
@@ -315,6 +309,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
CubeDesc cubeDesc = cubeInstance.getDescriptor();
CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
@@ -353,6 +348,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
public void init() {
KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
cubeDesc = cubeInstance.getDescriptor();
aggregators = new MeasureAggregators(cubeDesc.getMeasures());
@@ -422,6 +418,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
public void init() {
KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
this.cubeSegment = cubeInstance.getSegmentById(segmentId);
this.cubeDesc = cubeInstance.getDescriptor();
@@ -430,7 +427,7 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
@Override
- public Iterator<Tuple2<ByteArray, Object[]>> call(Tuple2<ByteArray, Object[]> tuple2) throws Exception {
+ public Iterator<Tuple2<ByteArray, Object[]>> call(final Tuple2<ByteArray, Object[]> tuple2) throws Exception {
if (initialized == false) {
synchronized (SparkCubingByLayer.class) {
if (initialized == false) {
@@ -441,26 +438,23 @@ public class SparkCubingByLayer extends AbstractApplication implements Serializa
}
byte[] key = tuple2._1().array();
- long cuboidId = rowKeySplitter.split(key);
- Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
-
- Collection<Long> myChildren = cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId);
+ long cuboidId = rowKeySplitter.parseCuboid(key);
+ final List<Long> myChildren = cubeSegment.getCuboidScheduler().getSpanningCuboid(cuboidId);
// if still empty or null
if (myChildren == null || myChildren.size() == 0) {
return EMTPY_ITERATOR.iterator();
}
+ rowKeySplitter.split(key);
+ final Cuboid parentCuboid = Cuboid.findForMandatory(cubeDesc, cuboidId);
List<Tuple2<ByteArray, Object[]>> tuples = new ArrayList(myChildren.size());
for (Long child : myChildren) {
Cuboid childCuboid = Cuboid.findForMandatory(cubeDesc, child);
- Pair<Integer, ByteArray> result = ndCuboidBuilder.buildKey(parentCuboid, childCuboid,
+ ByteArray result = ndCuboidBuilder.buildKey2(parentCuboid, childCuboid,
rowKeySplitter.getSplitBuffers());
- byte[] newKey = new byte[result.getFirst()];
- System.arraycopy(result.getSecond().array(), 0, newKey, 0, result.getFirst());
-
- tuples.add(new Tuple2<>(new ByteArray(newKey), tuple2._2()));
+ tuples.add(new Tuple2<>(result, tuple2._2()));
}
return tuples.iterator();
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
index 7fd4123..74a2313 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingMerge.java
@@ -24,13 +24,11 @@ import java.util.List;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
import org.apache.kylin.common.util.HadoopUtil;
@@ -116,6 +114,7 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
JavaSparkContext sc = new JavaSparkContext(conf);
+ SparkUtil.modifySparkHadoopConfiguration(sc.sc()); // set dfs.replication=2 and enable compress
KylinSparkJobListener jobListener = new KylinSparkJobListener();
sc.sc().addSparkListener(jobListener);
@@ -131,12 +130,9 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
logger.info("Input path: {}", inputPath);
logger.info("Output path: {}", outputPath);
- Configuration confOverwrite = new Configuration(sc.hadoopConfiguration());
- confOverwrite.set("dfs.replication", "2"); // cuboid intermediate files, replication=2
- final Job job = Job.getInstance(confOverwrite);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(Text.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ final Job job = Job.getInstance(sConf.get());
+
+ SparkUtil.setHadoopConfForCuboid(job, cubeSegment, metaUrl);
final MeasureAggregators aggregators = new MeasureAggregators(cubeDesc.getMeasures());
final Function2 reduceFunction = new Function2<Object[], Object[], Object[]>() {
@@ -157,12 +153,17 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
throws Exception {
if (initialized == false) {
- synchronized (SparkCubingByLayer.class) {
+ synchronized (SparkCubingMerge.class) {
if (initialized == false) {
- KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
- CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
- codec = new BufferedMeasureCodec(desc.getMeasures());
- initialized = true;
+ synchronized (SparkCubingMerge.class) {
+ if (initialized == false) {
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+ KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
+ CubeDesc desc = CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
+ codec = new BufferedMeasureCodec(desc.getMeasures());
+ initialized = true;
+ }
+ }
}
}
}
@@ -231,7 +232,7 @@ public class SparkCubingMerge extends AbstractApplication implements Serializabl
// output the data size to console, job engine will parse and save the metric
// please note: this mechanism won't work when spark.submit.deployMode=cluster
logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
- //HadoopUtil.deleteHDFSMeta(metaUrl);
+ // HadoopUtil.deleteHDFSMeta(metaUrl);
}
static class ReEncodCuboidFunction implements PairFunction<Tuple2<Text, Text>, Text, Object[]> {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index e7c6ee6..31eebc8 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -24,6 +24,9 @@ import java.util.List;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
@@ -33,6 +36,7 @@ import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.storage.StorageFactory;
+import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -114,4 +118,18 @@ public class SparkUtil {
return partition;
}
+
+ public static void setHadoopConfForCuboid(Job job, CubeSegment segment, String metaUrl) throws Exception {
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ }
+
+ public static void modifySparkHadoopConfiguration(SparkContext sc) throws Exception {
+ sc.hadoopConfiguration().set("dfs.replication", "2"); // cuboid intermediate files, replication=2
+ sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress", "true");
+ sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
+ sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.DefaultCodec"); // or org.apache.hadoop.io.compress.SnappyCodec
+ }
+
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index f6332f4..3f8ea53 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -40,7 +40,7 @@ public class CoprocessorProjector {
RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
@Override
- protected void fillHeader(byte[] bytes) {
+ public void fillHeader(byte[] bytes) {
Arrays.fill(bytes, 0, this.getHeaderLength(), (byte) 0xff);
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index c435d9d..fd8459f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -158,42 +158,6 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
logger.info("Input path: {}", inputPath);
logger.info("Output path: {}", outputPath);
-
- JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
- final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
- if (quickPath) {
- hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
- @Override
- public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
- KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
- textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
- return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
- }
- });
- } else {
- hfilerdd = inputRDDs
- .flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
- @Override
- public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
- throws Exception {
-
- List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
- Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
- inputCodec.decode(
- ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
- inputMeasures);
-
- for (int i = 0; i < cfNum; i++) {
- KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
- result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
- outputValue));
- }
-
- return result.iterator();
- }
- });
- }
-
// read partition split keys
List<RowKeyWritable> keys = new ArrayList<>();
try (SequenceFile.Reader reader = new SequenceFile.Reader(fs, partitionFilePath, sc.hadoopConfiguration())) {
@@ -207,40 +171,70 @@ public class SparkCubeHFile extends AbstractApplication implements Serializable
}
logger.info("There are " + keys.size() + " split keys, totally " + (keys.size() + 1) + " hfiles");
-
- final JavaPairRDD<ImmutableBytesWritable, KeyValue> hfilerdd2 = hfilerdd
- .repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
- RowKeyWritable.RowKeyComparator.INSTANCE)
- .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
- @Override
- public Tuple2<ImmutableBytesWritable, KeyValue> call(
- Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
- return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
- rowKeyWritableKeyValueTuple2._2);
- }
- });
-
Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
HadoopUtil.healSickConfig(hbaseConf);
Job job = new Job(hbaseConf, cubeSegment.getStorageLocationIdentifier());
+ job.getConfiguration().set("spark.hadoop.dfs.replication", "3"); // HFile, replication=3
HTable table = new HTable(hbaseConf, cubeSegment.getStorageLocationIdentifier());
try {
HFileOutputFormat2.configureIncrementalLoadMap(job, table);
} catch (IOException ioe) {
// this can be ignored.
- logger.debug(ioe.getMessage());
+ logger.debug(ioe.getMessage(), ioe);
}
FileOutputFormat.setOutputPath(job, new Path(outputPath));
- hfilerdd2.saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+ JavaPairRDD<Text, Text> inputRDDs = SparkUtil.parseInputPath(inputPath, fs, sc, Text.class, Text.class);
+ final JavaPairRDD<RowKeyWritable, KeyValue> hfilerdd;
+ if (quickPath) {
+ hfilerdd = inputRDDs.mapToPair(new PairFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Tuple2<RowKeyWritable, KeyValue> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
+ KeyValue outputValue = keyValueCreators.get(0).create(textTextTuple2._1,
+ textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength());
+ return new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()), outputValue);
+ }
+ });
+ } else {
+ hfilerdd = inputRDDs.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, RowKeyWritable, KeyValue>() {
+ @Override
+ public Iterator<Tuple2<RowKeyWritable, KeyValue>> call(Tuple2<Text, Text> textTextTuple2)
+ throws Exception {
+
+ List<Tuple2<RowKeyWritable, KeyValue>> result = Lists.newArrayListWithExpectedSize(cfNum);
+ Object[] inputMeasures = new Object[cubeDesc.getMeasures().size()];
+ inputCodec.decode(ByteBuffer.wrap(textTextTuple2._2.getBytes(), 0, textTextTuple2._2.getLength()),
+ inputMeasures);
+
+ for (int i = 0; i < cfNum; i++) {
+ KeyValue outputValue = keyValueCreators.get(i).create(textTextTuple2._1, inputMeasures);
+ result.add(new Tuple2<>(new RowKeyWritable(outputValue.createKeyOnly(false).getKey()),
+ outputValue));
+ }
+
+ return result.iterator();
+ }
+ });
+ }
+
+ hfilerdd.repartitionAndSortWithinPartitions(new HFilePartitioner(keys),
+ RowKeyWritable.RowKeyComparator.INSTANCE)
+ .mapToPair(new PairFunction<Tuple2<RowKeyWritable, KeyValue>, ImmutableBytesWritable, KeyValue>() {
+ @Override
+ public Tuple2<ImmutableBytesWritable, KeyValue> call(
+ Tuple2<RowKeyWritable, KeyValue> rowKeyWritableKeyValueTuple2) throws Exception {
+ return new Tuple2<>(new ImmutableBytesWritable(rowKeyWritableKeyValueTuple2._2.getKey()),
+ rowKeyWritableKeyValueTuple2._2);
+ }
+ }).saveAsNewAPIHadoopDataset(job.getConfiguration());
// output the data size to console, job engine will parse and save the metric
// please note: this mechanism won't work when spark.submit.deployMode=cluster
logger.info("HDFS: Number of bytes written=" + jobListener.metrics.getBytesWritten());
- HadoopUtil.deleteHDFSMeta(metaUrl);
+ //HadoopUtil.deleteHDFSMeta(metaUrl);
}
-
static class HFilePartitioner extends Partitioner {
private List<RowKeyWritable> keys;