You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:28 UTC
[kylin] 04/06: KYLIN-3623 Convert cuboid to Parquet in MR
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit bbf9d256436a921299526e14025f5cbf6071a619
Author: Yichen Zhou <zh...@gmail.com>
AuthorDate: Tue Nov 13 14:51:35 2018 +0800
KYLIN-3623 Convert cuboid to Parquet in MR
---
core-common/pom.xml | 2 +-
.../org/apache/kylin/common/KylinConfigBase.java | 16 +
.../apache/kylin/cube/cuboid/CuboidScheduler.java | 29 +-
.../java/org/apache/kylin/gridtable/GTRecord.java | 1 +
.../kylin/engine/mr/common/BatchConstants.java | 1 +
.../java/org/apache/spark/sql/KylinSession.scala | 1 -
.../java/org/apache/spark/sql/SparderEnv.scala | 8 -
storage-parquet/pom.xml | 12 +
.../kylin/storage/parquet/cube/CubeSparkRPC.java | 12 +-
.../storage/parquet/spark/ParquetPayload.java | 1 +
.../parquet/steps/ConvertToParquetReducer.java | 98 ++++++
.../parquet/steps/CuboidToPartitionMapping.java | 165 ++++++++++
.../storage/parquet/steps/MRCubeParquetJob.java | 168 ++++++++++
.../storage/parquet/steps/ParquetConvertor.java | 283 +++++++++++++++++
.../storage/parquet/steps/ParquetJobSteps.java | 7 +-
.../storage/parquet/steps/ParquetMROutput.java | 78 ++++-
.../storage/parquet/steps/ParquetMRSteps.java | 59 ++++
.../storage/parquet/steps/ParquetSparkOutput.java | 4 +-
.../storage/parquet/steps/ParquetSparkSteps.java | 2 +-
.../storage/parquet/steps/SparkCubeParquet.java | 338 ++-------------------
.../java/org/apache/kylin/ext/ItClassLoader.java | 4 +-
.../org/apache/kylin/ext/ItSparkClassLoader.java | 10 +-
.../org/apache/kylin/ext/SparkClassLoader.java | 25 +-
.../org/apache/kylin/ext/TomcatClassLoader.java | 19 +-
24 files changed, 966 insertions(+), 377 deletions(-)
diff --git a/core-common/pom.xml b/core-common/pom.xml
index 594e39b..9e1e3ac 100644
--- a/core-common/pom.xml
+++ b/core-common/pom.xml
@@ -104,4 +104,4 @@
<scope>test</scope>
</dependency>
</dependencies>
-</project>
+</project>
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6092834..e3e3e03 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1121,6 +1121,22 @@ abstract public class KylinConfigBase implements Serializable {
}
// ============================================================================
+ // STORAGE.Parquet
+ // ============================================================================
+
+ public float getParquetFileSizeMB() {
+ return Integer.parseInt(getOptional("kylin.storage.parquet.file-size-mb", "100"));
+ }
+
+ public int getParquetMinPartitions() {
+ return Integer.parseInt(getOptional("kylin.storage.parquet.min-partitions", "1"));
+ }
+
+ public int getParquetMaxPartitions() {
+ return Integer.parseInt(getOptional("kylin.storage.parquet.max-partitions", "5000"));
+ }
+
+ // ============================================================================
// ENGINE.MR
// ============================================================================
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 17096f6..c1791e1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -20,8 +20,10 @@ package org.apache.kylin.cube.cuboid;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.cube.model.AggregationGroup;
import org.apache.kylin.cube.model.CubeDesc;
@@ -71,6 +73,8 @@ abstract public class CuboidScheduler {
private transient List<List<Long>> cuboidsByLayer;
+ private transient Map<Long, Integer> cuboidLayerMap;
+
public long getBaseCuboidId() {
return Cuboid.getBaseCuboidId(cubeDesc);
}
@@ -125,7 +129,30 @@ abstract public class CuboidScheduler {
public int getBuildLevel() {
return getCuboidsByLayer().size() - 1;
}
-
+
+ public Map<Long, Integer> getCuboidLayerMap() {
+ if (cuboidLayerMap != null){
+ return cuboidLayerMap;
+ }
+ cuboidLayerMap = Maps.newHashMap();
+
+ if (cuboidsByLayer == null) {
+ cuboidsByLayer = getCuboidsByLayer();
+ }
+
+ for (int layerIndex = 0; layerIndex <= getBuildLevel(); layerIndex++){
+ List<Long> layeredCuboids = cuboidsByLayer.get(layerIndex);
+ for (Long cuboidId : layeredCuboids){
+ cuboidLayerMap.put(cuboidId, layerIndex);
+ }
+ }
+
+ int size = getAllCuboidIds().size();
+ int totalNum = cuboidLayerMap.size();
+ Preconditions.checkState(totalNum == size, "total Num: " + totalNum + " actual size: " + size);
+ return cuboidLayerMap;
+ }
+
/** Returns the key for what this cuboid scheduler responsible for. */
public String getCuboidCacheKey() {
return cubeDesc.getName();
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index d7be088..24278c4 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -113,6 +113,7 @@ public class GTRecord implements Comparable<GTRecord> {
}
/** set record to the codes of specified values, reuse given space to hold the codes */
+ @SuppressWarnings("checkstyle:BooleanExpressionComplexity")
public GTRecord setValuesParquet(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
assert selectedCols.cardinality() == values.length;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 66da1b2..33264c8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -107,6 +107,7 @@ public interface BatchConstants {
String ARG_HBASE_CONF_PATH = "hbaseConfPath";
String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
String ARG_COUNTER_OUPUT = "counterOutput";
+ String ARG_CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
/**
* logger and counter
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala b/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
index 20f86e5..72c4ba5 100644
--- a/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
+++ b/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
@@ -107,7 +107,6 @@ object KylinSession extends Logging {
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
SparkSession.setDefaultSession(null)
- SparkSession.sqlListener.set(null)
}
})
UdfManager.create(session)
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala b/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
index ecaa5c8..209576d 100644
--- a/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
+++ b/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
@@ -57,14 +57,6 @@ object SparderEnv extends Logging {
getSparkSession.sparkContext.conf.get(key)
}
- def getActiveJobs(): Int = {
- SparderEnv.getSparkSession.sparkContext.jobProgressListener.activeJobs.size
- }
-
- def getFailedJobs(): Int = {
- SparderEnv.getSparkSession.sparkContext.jobProgressListener.failedJobs.size
- }
-
def getAsyncResultCore: Int = {
val sparkConf = getSparkSession.sparkContext.getConf
val instances = sparkConf.get("spark.executor.instances").toInt
diff --git a/storage-parquet/pom.xml b/storage-parquet/pom.xml
index 3778031..88ad6f9 100644
--- a/storage-parquet/pom.xml
+++ b/storage-parquet/pom.xml
@@ -103,6 +103,18 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+
</dependencies>
<build>
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
index 5009a51..62c294d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
@@ -73,11 +73,6 @@ public class CubeSparkRPC implements IGTStorage {
JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "");
-<<<<<<< HEAD
-=======
- String cubooidRootPath = jobBuilderSupport.getCuboidRootPath();
-
->>>>>>> 198041d63... KYLIN-3625 Init query
List<List<Long>> layeredCuboids = cubeSegment.getCuboidScheduler().getCuboidsByLayer();
int level = 0;
for (List<Long> levelCuboids : layeredCuboids) {
@@ -87,13 +82,8 @@ public class CubeSparkRPC implements IGTStorage {
level++;
}
-<<<<<<< HEAD
- String dataFolderName;
String parquetRootPath = jobBuilderSupport.getParquetOutputPath();
- dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(parquetRootPath, level) + "/" + cuboid.getId();
-=======
- String dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(cubooidRootPath, level) + "/" + cuboid.getId();
->>>>>>> 198041d63... KYLIN-3625 Init query
+ String dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(parquetRootPath, level) + "/" + cuboid.getId();
builder.setGtScanRequest(scanRequest.toByteArray()).setGtScanRequestId(scanReqId)
.setKylinProperties(KylinConfig.getInstanceFromEnv().exportAllToString())
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
index 9096679..3129b8e 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
@@ -37,6 +37,7 @@ public class ParquetPayload {
private long startTime;
private int storageType;
+ @SuppressWarnings("checkstyle:ParameterNumber")
private ParquetPayload(byte[] gtScanRequest, String gtScanRequestId, String kylinProperties, String realizationId,
String segmentId, String dataFolderName, int maxRecordLength, List<Integer> parquetColumns,
boolean isUseII, String realizationType, String queryId, boolean spillEnabled, long maxScanBytes,
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
new file mode 100644
index 0000000..c778ee0
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.parquet.steps;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.parquet.example.data.Group;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Created by Yichen on 11/14/18.
+ */
+public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritable, Group> {
+ private ParquetConvertor convertor;
+ private MultipleOutputs<NullWritable, Group> mos;
+ private CubeSegment cubeSegment;
+
+ @Override
+ protected void doSetup(Context context) throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ super.bindCurrentConfiguration(conf);
+ mos = new MultipleOutputs(context);
+
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ String segmentId = conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID);
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ cubeSegment = cube.getSegmentById(segmentId);
+ Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSegment.getCubeDesc());
+ final IDimensionEncodingMap dimEncMap = cubeSegment.getDimensionEncodingMap();
+ SerializableConfiguration sConf = new SerializableConfiguration(conf);
+
+ Map<TblColRef, String> colTypeMap = Maps.newHashMap();
+ Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
+ ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cube.getDescriptor(), colTypeMap, meaTypeMap);
+ convertor = new ParquetConvertor(cubeName, segmentId, kylinConfig, sConf, colTypeMap, meaTypeMap);
+ }
+
+ @Override
+ protected void doReduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+ long cuboidId = Bytes.toLong(key.getBytes(), RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+ int layerNumber = cubeSegment.getCuboidScheduler().getCuboidLayerMap().get(cuboidId);
+ int partitionId = context.getTaskAttemptID().getTaskID().getId();
+
+ for (Text value : values) {
+ try {
+ Group group = convertor.parseValueToGroup(key, value);
+ String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel("", layerNumber)
+ + "/" + ParquetJobSteps.getCuboidOutputFileName(cuboidId, partitionId);
+ mos.write(MRCubeParquetJob.BY_LAYER_OUTPUT, null, group, output);
+ } catch (IOException e){
+ throw new IOException(e);
+ }
+ }
+ }
+
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ mos.close();
+ }
+}
\ No newline at end of file
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
new file mode 100644
index 0000000..7fcf95d
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.parquet.steps;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Created by Yichen on 11/12/18.
+ */
+public class CuboidToPartitionMapping implements Serializable {
+ private static final Logger logger = LoggerFactory.getLogger(CuboidToPartitionMapping.class);
+
+ private Map<Long, List<Integer>> cuboidPartitions;
+ private int partitionNum;
+
+ public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
+ this.cuboidPartitions = cuboidPartitions;
+ int partitions = 0;
+ for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+ partitions = partitions + entry.getValue().size();
+ }
+ this.partitionNum = partitions;
+ }
+
+ public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig) throws IOException {
+ cuboidPartitions = Maps.newHashMap();
+
+ Set<Long> allCuboidIds = cubeSeg.getCuboidScheduler().getAllCuboidIds();
+
+ CalculatePartitionId(cubeSeg, kylinConfig, allCuboidIds);
+ }
+
+ public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
+ cuboidPartitions = Maps.newHashMap();
+
+ List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
+
+ CalculatePartitionId(cubeSeg, kylinConfig, layeredCuboids);
+ }
+
+ private void CalculatePartitionId(CubeSegment cubeSeg, KylinConfig kylinConfig, Collection<Long> cuboidIds) throws IOException {
+ int position = 0;
+ CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
+ for (Long cuboidId : cuboidIds) {
+ int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
+ List<Integer> positions = Lists.newArrayListWithCapacity(partition);
+
+ for (int i = position; i < position + partition; i++) {
+ positions.add(i);
+ }
+
+ cuboidPartitions.put(cuboidId, positions);
+ position = position + partition;
+ }
+
+ this.partitionNum = position;
+ }
+
+ public String serialize() throws JsonProcessingException {
+ return JsonUtil.writeValueAsString(cuboidPartitions);
+ }
+
+ public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
+ Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
+ return new CuboidToPartitionMapping(cuboidPartitions);
+ }
+
+ public int getNumPartitions() {
+ return this.partitionNum;
+ }
+
+ public long getCuboidIdByPartition(int partition) {
+ for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+ if (entry.getValue().contains(partition)) {
+ return entry.getKey();
+ }
+ }
+
+ throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
+ }
+
+ public int getPartitionByKey(byte[] key) {
+ long cuboidId = Bytes.toLong(key, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
+ List<Integer> partitions = cuboidPartitions.get(cuboidId);
+ int partitionKey = mod(key, RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN, key.length, partitions.size());
+
+ return partitions.get(partitionKey);
+ }
+
+ private int mod(byte[] src, int start, int end, int total) {
+ int sum = Bytes.hashBytes(src, start, end - start);
+ int mod = sum % total;
+ if (mod < 0)
+ mod += total;
+
+ return mod;
+ }
+
+ public int getPartitionNumForCuboidId(long cuboidId) {
+ return cuboidPartitions.get(cuboidId).size();
+ }
+
+ public String getPartitionFilePrefix(int partition) {
+ long cuboid = getCuboidIdByPartition(partition);
+ int partNum = partition % getPartitionNumForCuboidId(cuboid);
+ String prefix = ParquetJobSteps.getCuboidOutputFileName(cuboid, partNum);
+
+ return prefix;
+ }
+
+ private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
+ double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
+ float rddCut = kylinConfig.getParquetFileSizeMB();
+ int partition = (int) (cuboidSize / rddCut);
+ partition = Math.max(kylinConfig.getParquetMinPartitions(), partition);
+ partition = Math.min(kylinConfig.getParquetMaxPartitions(), partition);
+
+ logger.info("cuboid:{}, est_size:{}, partitions:{}", cuboidId, cuboidSize, partition);
+ return partition;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
+ sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
+ }
+
+ return sb.toString();
+ }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
new file mode 100644
index 0000000..7113a7a
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.parquet.example.data.Group;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.parquet.hadoop.example.ExampleOutputFormat;
+import org.apache.parquet.schema.MessageType;
+
+import java.io.IOException;
+
+
+/**
+ * Created by Yichen on 11/9/18.
+ */
+public class MRCubeParquetJob extends AbstractHadoopJob {
+
+ protected static final Logger logger = LoggerFactory.getLogger(MRCubeParquetJob.class);
+
+ final static String BY_LAYER_OUTPUT = "ByLayer";
+ private Options options;
+
+ public MRCubeParquetJob(){
+ options = new Options();
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_SEGMENT_ID);
+ options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_OUTPUT_PATH);
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ parseOptions(options, args);
+
+ final Path inputPath = new Path(getOptionValue(OPTION_INPUT_PATH));
+ final Path outputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ final String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ logger.info("CubeName: ", cubeName);
+ final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+
+ CubeSegment cubeSegment = cube.getSegmentById(segmentId);
+ logger.info("Input path: {}", inputPath);
+ logger.info("Output path: {}", outputPath);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ setJobClasspath(job, cube.getConfig());
+ CuboidToPartitionMapping cuboidToPartitionMapping= new CuboidToPartitionMapping(cubeSegment, kylinConfig);
+ String jsonStr = cuboidToPartitionMapping.serialize();
+ logger.info("Total Partition: {}", cuboidToPartitionMapping.getNumPartitions());
+
+ final IDimensionEncodingMap dimEncMap = cubeSegment.getDimensionEncodingMap();
+
+ Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSegment.getCubeDesc());
+
+ MessageType schema = ParquetConvertor.cuboidToMessageType(baseCuboid, dimEncMap, cubeSegment.getCubeDesc());
+ logger.info("Schema: {}", schema.toString());
+
+ try {
+
+ job.getConfiguration().set(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING, jsonStr);
+
+
+ addInputDirs(inputPath.toString(), job);
+ FileOutputFormat.setOutputPath(job, outputPath);
+
+ job.setJobName("Converting Parquet File for: " + cubeName + " segment " + segmentId);
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+
+ job.setPartitionerClass(CuboidPartitioner.class);
+
+ job.setOutputKeyClass(NullWritable.class);
+ job.setOutputValueClass(Group.class);
+ job.setReducerClass(ConvertToParquetReducer.class);
+ job.setNumReduceTasks(cuboidToPartitionMapping.getNumPartitions());
+
+ MultipleOutputs.addNamedOutput(job, BY_LAYER_OUTPUT, ExampleOutputFormat.class, NullWritable.class, Group.class);
+
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentId);
+ ExampleOutputFormat.setSchema(job, schema);
+ LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class);
+ attachCubeMetadataWithDict(cube, job.getConfiguration());
+
+ this.deletePath(job.getConfiguration(), outputPath);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ return waitForCompletion(job);
+
+ } finally {
+ if (job != null) {
+ cleanupTempConfFile(job.getConfiguration());
+ }
+ }
+ }
+
+ public static class CuboidPartitioner extends Partitioner<Text, Text> implements Configurable {
+ private Configuration conf;
+ private CuboidToPartitionMapping mapping;
+
+ @Override
+ public void setConf(Configuration configuration) {
+ this.conf = configuration;
+ try {
+ mapping = CuboidToPartitionMapping.deserialize(conf.get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public int getPartition(Text key, Text value, int numPartitions) {
+ return mapping.getPartitionByKey(key.getBytes());
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ MRCubeParquetJob job = new MRCubeParquetJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
new file mode 100644
index 0000000..9b9578d
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.kv.RowKeyDecoderParquet;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dimension.AbstractDateDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.dimension.FixedLenDimEnc;
+import org.apache.kylin.dimension.FixedLenHexDimEnc;
+import org.apache.kylin.dimension.IDimensionEncodingMap;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.basic.BasicMeasureType;
+import org.apache.kylin.measure.basic.BigDecimalIngester;
+import org.apache.kylin.measure.basic.DoubleIngester;
+import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.io.api.Binary;
+
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by Yichen on 11/9/18.
+ */
+public class ParquetConvertor {
+ private static final Logger logger = LoggerFactory.getLogger(ParquetConvertor.class);
+
+ public static final String FIELD_CUBOID_ID = "cuboidId";
+ public static final String DATATYPE_DECIMAL = "decimal";
+ public static final String DATATYPE_INT = "int";
+ public static final String DATATYPE_LONG = "long";
+ public static final String DATATYPE_DOUBLE = "double";
+ public static final String DATATYPE_STRING = "string";
+ public static final String DATATYPE_BINARY = "binary";
+
+ private RowKeyDecoder decoder;
+ private BufferedMeasureCodec measureCodec;
+ private Map<TblColRef, String> colTypeMap;
+ private Map<MeasureDesc, String> meaTypeMap;
+ private BigDecimalSerializer serializer;
+ private GroupFactory factory;
+ private List<MeasureDesc> measureDescs;
+
+ public ParquetConvertor(String cubeName, String segmentId, KylinConfig kConfig, SerializableConfiguration sConf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap){
+ KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
+
+ this.colTypeMap = colTypeMap;
+ this.meaTypeMap = meaTypeMap;
+ serializer = new BigDecimalSerializer(DataType.getType(DATATYPE_DECIMAL));
+
+ CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
+ CubeDesc cubeDesc = cubeInstance.getDescriptor();
+ CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+ measureDescs = cubeDesc.getMeasures();
+ decoder = new RowKeyDecoderParquet(cubeSegment);
+ factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(sConf.get()));
+ measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+ }
+
+ protected Group parseValueToGroup(Text rawKey, Text rawValue) throws IOException{
+ Group group = factory.newGroup();
+
+ long cuboidId = decoder.decode(rawKey.getBytes());
+ List<String> values = decoder.getValues();
+ List<TblColRef> columns = decoder.getColumns();
+
+ // for check
+ group.append(FIELD_CUBOID_ID, cuboidId);
+
+ for (int i = 0; i < columns.size(); i++) {
+ TblColRef column = columns.get(i);
+ parseColValue(group, column, values.get(i));
+ }
+
+ int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(rawValue.getBytes()));
+
+ int valueOffset = 0;
+ for (int i = 0; i < valueLengths.length; ++i) {
+ MeasureDesc measureDesc = measureDescs.get(i);
+ parseMeaValue(group, measureDesc, rawValue.getBytes(), valueOffset, valueLengths[i]);
+ valueOffset += valueLengths[i];
+ }
+
+ return group;
+ }
+
+ private void parseColValue(final Group group, final TblColRef colRef, final String value) {
+ if (value==null) {
+ logger.error("value is null");
+ return;
+ }
+ switch (colTypeMap.get(colRef)) {
+ case DATATYPE_INT:
+ group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
+ break;
+ case DATATYPE_LONG:
+ group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
+ break;
+ default:
+ group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
+ break;
+ }
+ }
+
+ private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) throws IOException {
+ if (value==null) {
+ logger.error("value is null");
+ return;
+ }
+ switch (meaTypeMap.get(measureDesc)) {
+ case DATATYPE_LONG:
+ group.append(measureDesc.getName(), BytesUtil.readVLong(ByteBuffer.wrap(value, offset, length)));
+ break;
+ case DATATYPE_DOUBLE:
+ group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
+ break;
+ case DATATYPE_DECIMAL:
+ BigDecimal decimal = serializer.deserialize(ByteBuffer.wrap(value, offset, length));
+ decimal = decimal.setScale(4);
+ group.append(measureDesc.getName(), Binary.fromByteArray(decimal.unscaledValue().toByteArray()));
+ break;
+ default:
+ group.append(measureDesc.getName(), Binary.fromByteArray(value, offset, length));
+ break;
+ }
+ }
+
+ protected static MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc) {
+ Types.MessageTypeBuilder builder = Types.buildMessage();
+
+ List<TblColRef> colRefs = cuboid.getColumns();
+
+ builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(FIELD_CUBOID_ID);
+
+ for (TblColRef colRef : colRefs) {
+ DimensionEncoding dimEnc = dimEncMap.get(colRef);
+ colToMessageType(dimEnc, colRef, builder);
+ }
+
+ MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+
+ for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+ MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
+ DataType meaDataType = measureDesc.getFunction().getReturnDataType();
+ MeasureType measureType = measureDesc.getFunction().getMeasureType();
+
+ meaColToMessageType(measureType, measureDesc.getName(), meaDataType, aggrIngesters[i], builder);
+ }
+
+ return builder.named(String.valueOf(cuboid.getId()));
+ }
+
+ protected static void generateTypeMap(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap){
+ List<TblColRef> colRefs = cuboid.getColumns();
+
+ for (TblColRef colRef : colRefs) {
+ DimensionEncoding dimEnc = dimEncMap.get(colRef);
+ addColTypeMap(dimEnc, colRef, colTypeMap);
+ }
+
+ MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+
+ for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
+ MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
+ MeasureType measureType = measureDesc.getFunction().getMeasureType();
+ addMeaColTypeMap(measureType, measureDesc, aggrIngesters[i], meaTypeMap);
+ }
+ }
+ private static String getColName(TblColRef colRef) {
+ return colRef.getTableAlias() + "_" + colRef.getName();
+ }
+
+ private static void addColTypeMap(DimensionEncoding dimEnc, TblColRef colRef, Map<TblColRef, String> colTypeMap) {
+ if (dimEnc instanceof AbstractDateDimEnc) {
+ colTypeMap.put(colRef, DATATYPE_LONG);
+ } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
+ DataType colDataType = colRef.getType();
+ if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
+ colTypeMap.put(colRef, DATATYPE_LONG);
+ } else {
+ // stringFamily && default
+ colTypeMap.put(colRef, DATATYPE_STRING);
+ }
+ } else {
+ colTypeMap.put(colRef, DATATYPE_INT);
+ }
+ }
+
+ private static Map<MeasureDesc, String> addMeaColTypeMap(MeasureType measureType, MeasureDesc measureDesc, MeasureIngester aggrIngester, Map<MeasureDesc, String> meaTypeMap) {
+ if (measureType instanceof BasicMeasureType) {
+ MeasureIngester measureIngester = aggrIngester;
+ if (measureIngester instanceof LongIngester) {
+ meaTypeMap.put(measureDesc, DATATYPE_LONG);
+ } else if (measureIngester instanceof DoubleIngester) {
+ meaTypeMap.put(measureDesc, DATATYPE_DOUBLE);
+ } else if (measureIngester instanceof BigDecimalIngester) {
+ meaTypeMap.put(measureDesc, DATATYPE_DECIMAL);
+ } else {
+ meaTypeMap.put(measureDesc, DATATYPE_BINARY);
+ }
+ } else {
+ meaTypeMap.put(measureDesc, DATATYPE_BINARY);
+ }
+ return meaTypeMap;
+ }
+
+ private static void colToMessageType(DimensionEncoding dimEnc, TblColRef colRef, Types.MessageTypeBuilder builder) {
+ if (dimEnc instanceof AbstractDateDimEnc) {
+ builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+ } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
+ DataType colDataType = colRef.getType();
+ if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
+ builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
+ } else {
+ // stringFamily && default
+ builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
+ }
+ } else {
+ builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
+ }
+ }
+
+ private static void meaColToMessageType(MeasureType measureType, String meaDescName, DataType meaDataType, MeasureIngester aggrIngester, Types.MessageTypeBuilder builder) {
+ if (measureType instanceof BasicMeasureType) {
+ MeasureIngester measureIngester = aggrIngester;
+ if (measureIngester instanceof LongIngester) {
+ builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(meaDescName);
+ } else if (measureIngester instanceof DoubleIngester) {
+ builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(meaDescName);
+ } else if (measureIngester instanceof BigDecimalIngester) {
+ builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(meaDescName);
+ } else {
+ builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(meaDescName);
+ }
+ } else {
+ builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(meaDescName);
+ }
+ }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
index b47a03a..625737a 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
@@ -32,6 +32,9 @@ public abstract class ParquetJobSteps extends JobBuilderSupport {
super(seg, null);
}
-
- abstract public AbstractExecutable createConvertToParquetStep(String jobId);
+ public static String getCuboidOutputFileName(long cuboid, int partNum) {
+ String fileName = "cuboid_" + cuboid + "_part" + partNum;
+ return fileName;
+ }
+ abstract public AbstractExecutable convertToParquetStep(String jobId);
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
index fe85e24..f54a7a0 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
@@ -26,11 +26,18 @@ import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidScheduler;
import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.MapReduceUtil;
+import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
+import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
+import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IEngineAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+
/**
* Created by Yichen on 10/16/18.
@@ -42,14 +49,7 @@ public class ParquetMROutput implements IMROutput2 {
@Override
public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg) {
- boolean useSpark = seg.getCubeDesc().getEngineType() == IEngineAware.ID_SPARK;
-
-
- // TODO need refactor
- if (!useSpark){
- throw new RuntimeException("Cannot adapt to MR engine");
- }
- final ParquetJobSteps steps = new ParquetSparkSteps(seg);
+ final ParquetJobSteps steps = new ParquetMRSteps(seg);
return new IMRBatchCubingOutputSide2() {
@@ -59,6 +59,7 @@ public class ParquetMROutput implements IMROutput2 {
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
}
@Override
@@ -83,21 +84,76 @@ public class ParquetMROutput implements IMROutput2 {
@Override
public void configureJobOutput(Job job, String output, CubeSegment segment, CuboidScheduler cuboidScheduler,
int level) throws Exception {
+ int reducerNum = 1;
+ Class mapperClass = job.getMapperClass();
+
+ //allow user specially set config for base cuboid step
+ if (mapperClass == HiveToBaseCuboidMapper.class) {
+ for (Map.Entry<String, String> entry : segment.getConfig().getBaseCuboidMRConfigOverride().entrySet()) {
+ job.getConfiguration().set(entry.getKey(), entry.getValue());
+ }
+ }
+ if (mapperClass == HiveToBaseCuboidMapper.class || mapperClass == NDCuboidMapper.class) {
+ reducerNum = MapReduceUtil.getLayeredCubingReduceTaskNum(segment, cuboidScheduler,
+ AbstractHadoopJob.getTotalMapInputMB(job), level);
+ } else if (mapperClass == InMemCuboidMapper.class) {
+ reducerNum = MapReduceUtil.getInmemCubingReduceTaskNum(segment, cuboidScheduler);
+ }
Path outputPath = new Path(output);
FileOutputFormat.setOutputPath(job, outputPath);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setNumReduceTasks(reducerNum);
HadoopUtil.deletePath(job.getConfiguration(), outputPath);
}
}
@Override
public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg) {
- return null;
+ final ParquetJobSteps steps = new ParquetMRSteps(seg);
+ return new IMRBatchMergeOutputSide2() {
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ }
+
+ @Override
+ public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+ }
+
+ @Override
+ public IMRMergeOutputFormat getOuputFormat() {
+ return null;
+ }
+ };
}
@Override
public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(CubeSegment seg) {
- return null;
+ final ParquetJobSteps steps = new ParquetMRSteps(seg);
+
+ return new IMRBatchOptimizeOutputSide2() {
+ @Override
+ public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ }
+
+ @Override
+ public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) {
+ }
+
+ };
}
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
new file mode 100644
index 0000000..829f074
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.storage.parquet.steps;
+
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+
+/**
+ * Created by Yichen on 11/8/18.
+ */
+public class ParquetMRSteps extends ParquetJobSteps{
+ public ParquetMRSteps(CubeSegment seg) {
+ super(seg);
+ }
+
+ @Override
+ public AbstractExecutable convertToParquetStep(String jobId) {
+ String cuboidRootPath = getCuboidRootPath(jobId);
+ String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
+
+ final MapReduceExecutable mrExecutable = new MapReduceExecutable();
+ mrExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_PARQUET);
+ mrExecutable.setMapReduceJobClass(MRCubeParquetJob.class);
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, inputPath);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getParquetOutputPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Parquet_Generator_" + seg.getRealization().getName()+ "_Step");
+
+ mrExecutable.setMapReduceParams(cmd.toString());
+ mrExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_PARQUET);
+ mrExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+
+ return mrExecutable;
+ }
+
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
index 176afd0..794ede4 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkOutput.java
@@ -36,7 +36,7 @@ public class ParquetSparkOutput implements ISparkOutput {
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createConvertToParquetStep(jobFlow.getId()));
+ jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
}
@@ -58,7 +58,7 @@ public class ParquetSparkOutput implements ISparkOutput {
@Override
public void addStepPhase2_BuildCube(CubeSegment set, List<CubeSegment> mergingSegments, DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createConvertToParquetStep(jobFlow.getId()));
+ jobFlow.addTask(steps.convertToParquetStep(jobFlow.getId()));
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java
index 296bc68..65bd30a 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetSparkSteps.java
@@ -35,7 +35,7 @@ public class ParquetSparkSteps extends ParquetJobSteps {
}
@Override
- public AbstractExecutable createConvertToParquetStep(String jobId) {
+ public AbstractExecutable convertToParquetStep(String jobId) {
String cuboidRootPath = getCuboidRootPath(jobId);
String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/");
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
index def4d8d..4ad7cb3 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
@@ -17,14 +17,10 @@
*/
package org.apache.kylin.storage.parquet.steps;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
@@ -34,54 +30,26 @@ import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.AbstractApplication;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.common.util.OptionsHelper;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyDecoder;
-import org.apache.kylin.cube.kv.RowKeyDecoderParquet;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dimension.AbstractDateDimEnc;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.dimension.FixedLenDimEnc;
-import org.apache.kylin.dimension.FixedLenHexDimEnc;
import org.apache.kylin.dimension.IDimensionEncodingMap;
import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.spark.KylinSparkJobListener;
import org.apache.kylin.engine.spark.SparkUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.measure.basic.BigDecimalIngester;
-import org.apache.kylin.measure.basic.DoubleIngester;
-import org.apache.kylin.measure.basic.LongIngester;
-import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
-import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.GroupFactory;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Types;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
@@ -93,9 +61,6 @@ import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.List;
import java.util.Map;
@@ -172,11 +137,26 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
logger.info("Input path: {}", inputPath);
logger.info("Output path: {}", outputPath);
+ final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
+ final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
+
+ final IDimensionEncodingMap dimEncMap = cubeSegment.getDimensionEncodingMap();
+
+ Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSegment.getCubeDesc());
+ MessageType schema = ParquetConvertor.cuboidToMessageType(baseCuboid, dimEncMap, cubeSegment.getCubeDesc());
+ ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cubeSegment.getCubeDesc(), colTypeMap, meaTypeMap);
+ GroupWriteSupport.setSchema(schema, job.getConfiguration());
+
+ GenerateGroupRDDFunction groupPairFunction = new GenerateGroupRDDFunction(cubeName, cubeSegment.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap);
+
+
+ logger.info("Schema: {}", schema.toString());
+
// Read from cuboid and save to parquet
for (int level = 0; level <= totalLevels; level++) {
String cuboidPath = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(inputPath, level);
allRDDs[level] = SparkUtil.parseInputPath(cuboidPath, fs, sc, Text.class, Text.class);
- saveToParquet(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig);
+ saveToParquet(allRDDs[level], groupPairFunction, cubeSegment, outputPath, level, job, envConfig);
}
logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
@@ -190,45 +170,28 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
}
- protected void saveToParquet(JavaPairRDD<Text, Text> rdd, String metaUrl, String cubeName, CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws Exception {
- final IDimensionEncodingMap dimEncMap = cubeSeg.getDimensionEncodingMap();
-
- Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSeg.getCubeDesc());
-
- final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
- final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
-
- MessageType schema = cuboidToMessageType(baseCuboid, dimEncMap, cubeSeg.getCubeDesc(), colTypeMap, meaTypeMap);
-
- logger.info("Schema: {}", schema.toString());
-
+ protected void saveToParquet(JavaPairRDD<Text, Text> rdd, GenerateGroupRDDFunction groupRDDFunction, CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws IOException {
final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
- JavaPairRDD<Text, Text> repartitionedRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping));
-
String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(parquetOutput, level);
job.setOutputFormatClass(CustomParquetOutputFormat.class);
- GroupWriteSupport.setSchema(schema, job.getConfiguration());
CustomParquetOutputFormat.setOutputPath(job, new Path(output));
CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
- JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping))
- .mapToPair(new GenerateGroupRDDFunction(cubeName, cubeSeg.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap));
+ JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping)).mapToPair(groupRDDFunction);
groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
}
static class CuboidPartitioner extends Partitioner {
private CuboidToPartitionMapping mapping;
- private boolean enableSharding;
- public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping, boolean enableSharding) {
+ public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
this.mapping = cuboidToPartitionMapping;
- this.enableSharding =enableSharding;
}
@Override
@@ -239,121 +202,11 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
@Override
public int getPartition(Object key) {
Text textKey = (Text)key;
- return mapping.getPartitionForCuboidId(textKey.getBytes());
- }
- }
-
- public static class CuboidToPartitionMapping implements Serializable {
- private Map<Long, List<Integer>> cuboidPartitions;
- private int partitionNum;
-
- public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
- this.cuboidPartitions = cuboidPartitions;
- int partitions = 0;
- for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
- partitions = partitions + entry.getValue().size();
- }
- this.partitionNum = partitions;
- }
-
- public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
- cuboidPartitions = Maps.newHashMap();
-
- List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
- CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
-
- int position = 0;
- for (Long cuboidId : layeredCuboids) {
- int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
- List<Integer> positions = Lists.newArrayListWithCapacity(partition);
-
- for (int i = position; i < position + partition; i++) {
- positions.add(i);
- }
-
- cuboidPartitions.put(cuboidId, positions);
- position = position + partition;
- }
-
- this.partitionNum = position;
- }
-
- public String serialize() throws JsonProcessingException {
- return JsonUtil.writeValueAsString(cuboidPartitions);
- }
-
- public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
- Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
- return new CuboidToPartitionMapping(cuboidPartitions);
- }
-
- public int getNumPartitions() {
- return this.partitionNum;
- }
-
- public long getCuboidIdByPartition(int partition) {
- for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
- if (entry.getValue().contains(partition)) {
- return entry.getKey();
- }
- }
-
- throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
- }
-
- public int getPartitionForCuboidId(byte[] key) {
- long cuboidId = Bytes.toLong(key, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
- List<Integer> partitions = cuboidPartitions.get(cuboidId);
- int partitionKey = mod(key, RowConstants.ROWKEY_COL_DEFAULT_LENGTH, key.length, partitions.size());
-
- return partitions.get(partitionKey);
- }
-
- private int mod(byte[] src, int start, int end, int total) {
- int sum = Bytes.hashBytes(src, start, end - start);
- int mod = sum % total;
- if (mod < 0)
- mod += total;
-
- return mod;
- }
-
- public int getPartitionNumForCuboidId(long cuboidId) {
- return cuboidPartitions.get(cuboidId).size();
- }
-
- public String getPartitionFilePrefix(int partition) {
- String prefix = "cuboid_";
- long cuboid = getCuboidIdByPartition(partition);
- int partNum = partition % getPartitionNumForCuboidId(cuboid);
- prefix = prefix + cuboid + "_part" + partNum;
-
- return prefix;
- }
-
- private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
- double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
- float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
- int partition = (int) (cuboidSize / (rddCut * 10));
- partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
- partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
- logger.info("cuboid:{}, est_size:{}, partitions:{}", cuboidId, cuboidSize, partition);
- return partition;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
- sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
- }
-
- return sb.toString();
+ return mapping.getPartitionByKey(textKey.getBytes());
}
}
public static class CustomParquetOutputFormat extends ParquetOutputFormat {
- public static final String CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
@Override
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
@@ -361,7 +214,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
- CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(CUBOID_TO_PARTITION_MAPPING));
+ CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
}
@@ -369,7 +222,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
String jsonStr = cuboidToPartitionMapping.serialize();
- job.getConfiguration().set(CUBOID_TO_PARTITION_MAPPING, jsonStr);
+ job.getConfiguration().set(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING, jsonStr);
}
}
@@ -379,14 +232,10 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
private String segmentId;
private String metaUrl;
private SerializableConfiguration conf;
- private List<MeasureDesc> measureDescs;
- private RowKeyDecoder decoder;
private Map<TblColRef, String> colTypeMap;
private Map<MeasureDesc, String> meaTypeMap;
- private GroupFactory factory;
- private BufferedMeasureCodec measureCodec;
- private BigDecimalSerializer serializer;
- private int count = 0;
+
+ private transient ParquetConvertor convertor;
public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
this.cubeName = cubeName;
@@ -398,17 +247,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
}
private void init() {
- KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
- CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
- CubeDesc cubeDesc = cubeInstance.getDescriptor();
- CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- measureDescs = cubeDesc.getMeasures();
- decoder = new RowKeyDecoderParquet(cubeSegment);
- factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
- measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
- serializer = new BigDecimalSerializer(DataType.getType("decimal"));
- initialized = true;
+ KylinConfig kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
+ convertor = new ParquetConvertor(cubeName, segmentId, kylinConfig, conf, colTypeMap, meaTypeMap);
}
@Override
@@ -421,133 +261,9 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
}
}
}
-
- long cuboid = decoder.decode4Parquet(tuple._1.getBytes());
- List<String> values = decoder.getValues();
- List<TblColRef> columns = decoder.getColumns();
-
- Group group = factory.newGroup();
-
- // for check
- group.append("cuboidId", cuboid);
-
- for (int i = 0; i < columns.size(); i++) {
- TblColRef column = columns.get(i);
- parseColValue(group, column, values.get(i));
- }
-
- count ++;
-
- byte[] encodedBytes = tuple._2().getBytes();
- int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
-
- int valueOffset = 0;
- for (int i = 0; i < valueLengths.length; ++i) {
- MeasureDesc measureDesc = measureDescs.get(i);
- parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i]);
- valueOffset += valueLengths[i];
- }
+ Group group = convertor.parseValueToGroup(tuple._1(), tuple._2());
return new Tuple2<>(null, group);
}
-
- private void parseColValue(final Group group, final TblColRef colRef, final String value) {
- if (value==null) {
- logger.info("value is null");
- return;
- }
- switch (colTypeMap.get(colRef)) {
- case "int":
- group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
- break;
- case "long":
- group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
- break;
- default:
- group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
- break;
- }
- }
-
- private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) throws IOException {
- if (value==null) {
- logger.info("value is null");
- return;
- }
- switch (meaTypeMap.get(measureDesc)) {
- case "long":
- group.append(measureDesc.getName(), BytesUtil.readVLong(ByteBuffer.wrap(value, offset, length)));
- break;
- case "double":
- group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
- break;
- case "decimal":
- BigDecimal decimal = serializer.deserialize(ByteBuffer.wrap(value, offset, length));
- decimal = decimal.setScale(4);
- group.append(measureDesc.getName(), Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray()));
- break;
- default:
- group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
- break;
- }
- }
- }
-
- private MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
- Types.MessageTypeBuilder builder = Types.buildMessage();
-
- List<TblColRef> colRefs = cuboid.getColumns();
-
- builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named("cuboidId");
-
- for (TblColRef colRef : colRefs) {
- DimensionEncoding dimEnc = dimEncMap.get(colRef);
-
- if (dimEnc instanceof AbstractDateDimEnc) {
- builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
- colTypeMap.put(colRef, "long");
- } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
- org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
- builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
- colTypeMap.put(colRef, "string");
- } else {
- builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
- colTypeMap.put(colRef, "int");
- }
- }
-
- MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-
- for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
- MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
- org.apache.kylin.metadata.datatype.DataType meaDataType = measureDesc.getFunction().getReturnDataType();
- MeasureType measureType = measureDesc.getFunction().getMeasureType();
-
- if (measureType instanceof BasicMeasureType) {
- MeasureIngester measureIngester = aggrIngesters[i];
- if (measureIngester instanceof LongIngester) {
- builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "long");
- } else if (measureIngester instanceof DoubleIngester) {
- builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "double");
- } else if (measureIngester instanceof BigDecimalIngester) {
- builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "decimal");
- } else {
- builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "binary");
- }
- } else {
- builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "binary");
- }
- }
-
- return builder.named(String.valueOf(cuboid.getId()));
- }
-
- private String getColName(TblColRef colRef) {
- return colRef.getTableAlias() + "_" + colRef.getName();
}
}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
index 0590999..e0a9508 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
@@ -79,9 +79,9 @@ public class ItClassLoader extends URLClassLoader {
e.printStackTrace();
}
}
- String spark_home = System.getenv("SPARK_HOME");
+ String sparkHome = System.getenv("SPARK_HOME");
try {
- File sparkJar = findFile(spark_home + "/jars", "spark-yarn_.*.jar");
+ File sparkJar = findFile(sparkHome + "/jars", "spark-yarn_.*.jar");
addURL(sparkJar.toURI().toURL());
addURL(new File("../examples/test_case_data/sandbox").toURI().toURL());
} catch (MalformedURLException e) {
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
index c69ef9c..afca915 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
@@ -66,15 +66,15 @@ public class ItSparkClassLoader extends URLClassLoader {
}
public void init() throws MalformedURLException {
- String spark_home = System.getenv("SPARK_HOME");
- if (spark_home == null) {
- spark_home = System.getProperty("SPARK_HOME");
- if (spark_home == null) {
+ String sparkHome = System.getenv("SPARK_HOME");
+ if (sparkHome == null) {
+ sparkHome = System.getProperty("SPARK_HOME");
+ if (sparkHome == null) {
throw new RuntimeException(
"Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
}
}
- File file = new File(spark_home + "/jars");
+ File file = new File(sparkHome + "/jars");
File[] jars = file.listFiles();
for (File jar : jars) {
addURL(jar.toURI().toURL());
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
index dba782b..8fe211e 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
@@ -62,25 +62,25 @@ public class SparkClassLoader extends URLClassLoader {
private static Logger logger = LoggerFactory.getLogger(SparkClassLoader.class);
static {
- String sparkclassloader_spark_cl_preempt_classes = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES");
- if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_classes)) {
- SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkclassloader_spark_cl_preempt_classes, ",");
+ String sparkclassloaderSparkClPreemptClasses = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES");
+ if (!StringUtils.isEmpty(sparkclassloaderSparkClPreemptClasses)) {
+ SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkclassloaderSparkClPreemptClasses, ",");
}
- String sparkclassloader_spark_cl_preempt_files = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES");
- if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_files)) {
- SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkclassloader_spark_cl_preempt_files, ",");
+ String sparkclassloaderSparkClPreemptFiles = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES");
+ if (!StringUtils.isEmpty(sparkclassloaderSparkClPreemptFiles)) {
+ SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkclassloaderSparkClPreemptFiles, ",");
}
- String sparkclassloader_this_cl_precedent_classes = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
- if (!StringUtils.isEmpty(sparkclassloader_this_cl_precedent_classes)) {
- THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_this_cl_precedent_classes, ",");
+ String sparkclassloaderThisClPrecedentClasses = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
+ if (!StringUtils.isEmpty(sparkclassloaderThisClPrecedentClasses)) {
+ THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloaderThisClPrecedentClasses, ",");
}
- String sparkclassloader_parent_cl_precedent_classes = System
+ String sparkclassloaderParentClPrecedentClasses = System
.getenv("SPARKCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
- if (!StringUtils.isEmpty(sparkclassloader_parent_cl_precedent_classes)) {
- PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_parent_cl_precedent_classes, ",");
+ if (!StringUtils.isEmpty(sparkclassloaderParentClPrecedentClasses)) {
+ PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloaderParentClPrecedentClasses, ",");
}
try {
@@ -111,6 +111,7 @@ public class SparkClassLoader extends URLClassLoader {
init();
}
+ @SuppressWarnings("checkstyle:LocalVariableName")
public void init() throws MalformedURLException {
String spark_home = System.getenv("SPARK_HOME");
if (spark_home == null) {
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
index 89717ec..be0ecd9 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
@@ -48,21 +48,21 @@ public class TomcatClassLoader extends ParallelWebappClassLoader {
private static final Set<String> wontFindClasses = new HashSet<>();
static {
- String tomcatclassloader_parent_cl_precedent_classes = System
+ String tomcatclassloaderParentClPrecedentClasses = System
.getenv("TOMCATCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
- if (!StringUtils.isEmpty(tomcatclassloader_parent_cl_precedent_classes)) {
- PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_parent_cl_precedent_classes, ",");
+ if (!StringUtils.isEmpty(tomcatclassloaderParentClPrecedentClasses)) {
+ PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloaderParentClPrecedentClasses, ",");
}
- String tomcatclassloader_this_cl_precedent_classes = System
+ String tomcatclassloaderThisClPrecedentClasses = System
.getenv("TOMCATCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
- if (!StringUtils.isEmpty(tomcatclassloader_this_cl_precedent_classes)) {
- THIS_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_this_cl_precedent_classes, ",");
+ if (!StringUtils.isEmpty(tomcatclassloaderThisClPrecedentClasses)) {
+ THIS_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloaderThisClPrecedentClasses, ",");
}
- String tomcatclassloader_codegen_classes = System.getenv("TOMCATCLASSLOADER_CODEGEN_CLASSES");
- if (!StringUtils.isEmpty(tomcatclassloader_codegen_classes)) {
- CODEGEN_CLASSES = StringUtils.split(tomcatclassloader_codegen_classes, ",");
+ String tomcatclassloaderCodegenClasses = System.getenv("TOMCATCLASSLOADER_CODEGEN_CLASSES");
+ if (!StringUtils.isEmpty(tomcatclassloaderCodegenClasses)) {
+ CODEGEN_CLASSES = StringUtils.split(tomcatclassloaderCodegenClasses, ",");
}
wontFindClasses.add("Class");
@@ -98,6 +98,7 @@ public class TomcatClassLoader extends ParallelWebappClassLoader {
init();
}
+ @SuppressWarnings("checkstyle:LocalVariableName")
public void init() {
String spark_home = System.getenv("SPARK_HOME");
try {