You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:30 UTC
[kylin] 06/06: KYLIN-3625 code review
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4d79e0de1593660fa808fcf842592c5fe8dc9b15
Author: shaofengshi <sh...@apache.org>
AuthorDate: Mon Dec 10 11:40:21 2018 +0800
KYLIN-3625 code review
---
.../org/apache/kylin/common/KylinConfigBase.java | 2 +-
.../src/main/resources/kylin-defaults.properties | 12 +-
.../java/org/apache/kylin/gridtable/GTRecord.java | 45 ---
.../java/org/apache/kylin/gridtable/GTUtil.java | 36 ++
.../filter/BuiltInFunctionTupleFilter.java | 16 +-
.../kylin/metadata/filter/CaseTupleFilter.java | 6 +-
.../kylin/metadata/filter/ColumnTupleFilter.java | 2 +-
.../kylin/metadata/filter/CompareTupleFilter.java | 8 +-
.../kylin/metadata/filter/ConstantTupleFilter.java | 2 +-
.../kylin/metadata/filter/DynamicTupleFilter.java | 2 +-
.../kylin/metadata/filter/ExtractTupleFilter.java | 2 +-
.../kylin/metadata/filter/LogicalTupleFilter.java | 4 +-
.../apache/kylin/metadata/filter/TupleFilter.java | 2 +-
.../metadata/filter/UDF/MassInTupleFilter.java | 2 +-
.../metadata/filter/UnsupportedTupleFilter.java | 2 +-
.../storage/gtrecord/CubeScanRangePlanner.java | 2 +-
.../storage/path/DefaultStoragePathBuilder.java | 1 -
.../kylin/storage/path/IStoragePathBuilder.java | 8 +-
.../apache/kylin/engine/mr/JobBuilderSupport.java | 4 +-
.../engine/spark/SparkCubingByLayerParquet.java | 433 ---------------------
.../apache/kylin/rest/job/StorageCleanupJob.java | 2 +-
.../execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930 | 2 +-
.../org/apache/kylin/source/hive/HiveMRInput.java | 10 +-
.../apache/kylin/source/hive/HiveSparkInput.java | 2 +-
.../apache/kylin/source/hive/HiveMRInputTest.java | 4 +-
.../apache/kylin/source/kafka/KafkaMRInput.java | 4 +-
.../apache/kylin/source/kafka/KafkaSparkInput.java | 2 +-
.../storage/hbase/lookup/HBaseLookupMRSteps.java | 2 +-
.../hbase/steps/HDFSPathGarbageCollectionStep.java | 2 +-
.../kylin/storage/hbase/util/CubeMigrationCLI.java | 4 +-
.../storage/hbase/util/StorageCleanupJob.java | 2 +-
.../kylin/storage/parquet/ParquetStorage.java | 2 +-
.../storage/parquet/cube/CubeStorageQuery.java | 9 -
.../storage/parquet/spark/ParquetPayload.java | 5 +-
.../kylin/storage/parquet/spark/ParquetTask.java | 57 +--
.../storage/parquet/spark/SparkSubmitter.java | 9 +-
.../spark/gtscanner/ParquetRecordGTScanner.java | 54 ++-
.../gtscanner/ParquetRecordGTScanner4Cube.java | 34 +-
.../parquet/steps/ConvertToParquetReducer.java | 21 +-
.../parquet/steps/CuboidToPartitionMapping.java | 25 +-
.../storage/parquet/steps/MRCubeParquetJob.java | 12 +-
.../storage/parquet/steps/ParquetConvertor.java | 9 +-
.../storage/parquet/steps/ParquetJobSteps.java | 6 +-
.../storage/parquet/steps/ParquetMROutput.java | 6 -
.../storage/parquet/steps/ParquetMRSteps.java | 1 -
.../storage/parquet/steps/SparkCubeParquet.java | 61 +--
.../apache/kylin/ext/DebugTomcatClassLoader.java | 2 +-
.../org/apache/kylin/ext/SparkClassLoader.java | 4 +-
.../org/apache/kylin/tool/CubeMigrationCLI.java | 4 +-
webapp/app/js/model/cubeConfig.js | 4 +-
50 files changed, 239 insertions(+), 713 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 2633ddf..e7f7236 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -1864,7 +1864,7 @@ abstract public class KylinConfigBase implements Serializable {
return getOptional("kylin.source.jdbc.adaptor");
}
- public String getStorageSystemPathBuilderClz() {
+ public String getStoragePathBuilder() {
return getOptional("storage.path.builder", "org.apache.kylin.storage.path.DefaultStoragePathBuilder");
}
}
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 8115a50..d0a4ae2 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -357,23 +357,21 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
## for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kylin.storage.columnar.spark-conf" and append here
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dzipkin.collector-hostname=${ZIPKIN_HOSTNAME} -Dzipkin.collector-port=${ZIPKIN_SCRIBE_PORT} -DinfluxDB.address=${INFLUXDB_ADDRESS} -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkap.spark.identifier=${KAP_SPARK_IDENTIFIER} -Dkap.hdfs.working.dir=${KAP_HDFS_WORKING_DIR} -Dkap.metadata.url=${KAP_METADATA_IDENTIFIER} -XX:MaxDirectMemorySize=896M -Dsparder.dict.cache.size=${SPARDER [...]
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
-#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
+kylin.storage.columnar.spark-conf.yarn.am.memory=512m
kylin.storage.columnar.spark-conf.spark.driver.memory=1g
kylin.storage.columnar.spark-conf.spark.executor.memory=1g
-kylin.storage.columnar.spark-conf.spark.yarn.executor.memoryOverhead=512
-kylin.storage.columnar.spark-conf.yarn.am.memory=512m
kylin.storage.columnar.spark-conf.spark.executor.cores=1
kylin.storage.columnar.spark-conf.spark.executor.instances=1
+kylin.storage.columnar.spark-conf.spark.yarn.executor.memoryOverhead=378
+kylin.storage.columnar.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
kylin.storage.columnar.spark-conf.spark.task.maxFailures=1
-kylin.storage.columnar.spark-conf.spark.ui.port=4041
kylin.storage.columnar.spark-conf.spark.locality.wait=0s
kylin.storage.columnar.spark-conf.spark.sql.dialect=hiveql
-kylin.storage.columnar.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
kylin.storage.columnar.spark-conf.hive.execution.engine=MR
kylin.storage.columnar.spark-conf.spark.scheduler.listenerbus.eventqueue.size=100000000
kylin.storage.columnar.spark-conf.spark.master=yarn-client
kylin.storage.columnar.spark-conf.spark.broadcast.compress=false
-
+#kylin.storage.columnar.spark-conf.spark.yarn.archive=hdfs://namenode:8020/kylin/spark/spark-libs.jar
\ No newline at end of file
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 24278c4..8f0e0fb 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -20,17 +20,7 @@ package org.apache.kylin.gridtable;
import com.google.common.base.Preconditions;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.dimension.DictionaryDimEnc;
-import org.apache.kylin.measure.bitmap.BitmapSerializer;
-import org.apache.kylin.measure.dim.DimCountDistincSerializer;
-import org.apache.kylin.measure.extendedcolumn.ExtendedColumnSerializer;
-import org.apache.kylin.measure.hllc.HLLCSerializer;
-import org.apache.kylin.measure.percentile.PercentileSerializer;
-import org.apache.kylin.measure.raw.RawSerializer;
-import org.apache.kylin.measure.topn.TopNCounterSerializer;
-import org.apache.kylin.metadata.datatype.DataTypeSerializer;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -112,41 +102,6 @@ public class GTRecord implements Comparable<GTRecord> {
return this;
}
- /** set record to the codes of specified values, reuse given space to hold the codes */
- @SuppressWarnings("checkstyle:BooleanExpressionComplexity")
- public GTRecord setValuesParquet(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
- assert selectedCols.cardinality() == values.length;
-
- ByteBuffer buf = space.asBuffer();
- int pos = buf.position();
- for (int i = 0; i < selectedCols.trueBitCount(); i++) {
- int c = selectedCols.trueBitAt(i);
-
- DataTypeSerializer serializer = info.codeSystem.getSerializer(c);
- if (serializer instanceof DictionaryDimEnc.DictionarySerializer) {
- int len = serializer.peekLength(buf);
- BytesUtil.writeUnsigned((Integer) values[i], len, buf);
- int newPos = buf.position();
- cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
- pos = newPos;
- } else if (serializer instanceof TopNCounterSerializer ||
- serializer instanceof HLLCSerializer ||
- serializer instanceof BitmapSerializer ||
- serializer instanceof ExtendedColumnSerializer ||
- serializer instanceof PercentileSerializer ||
- serializer instanceof DimCountDistincSerializer ||
- serializer instanceof RawSerializer) {
- cols[c].reset((byte[]) values[i], 0, ((byte[]) values[i]).length);
- } else {
- info.codeSystem.encodeColumnValue(c, values[i], buf);
- int newPos = buf.position();
- cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
- pos = newPos;
- }
- }
- return this;
- }
-
/** decode and return the values of this record */
public Object[] getValues() {
return getValues(info.colAll, new Object[info.getColumnCount()]);
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
index 49c68c5..8821a8f 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -456,4 +456,40 @@ public class GTUtil {
}
}
}
+
+ /** set record to the codes of specified values, reuse given space to hold the codes */
+ public static GTRecord setValuesParquet(GTRecord record, ByteBuffer buf, Map<Integer, Integer> dictCols,
+ Map<Integer, Integer> binaryCols, Map<Integer, Integer> otherCols, Object[] values) {
+
+ int pos = buf.position();
+ int i, c;
+ for (Map.Entry<Integer, Integer> entry : dictCols.entrySet()) {
+ i = entry.getKey();
+ c = entry.getValue();
+ DictionaryDimEnc.DictionarySerializer serializer = (DictionaryDimEnc.DictionarySerializer) record.info.codeSystem
+ .getSerializer(c);
+ int len = serializer.peekLength(buf);
+ BytesUtil.writeUnsigned((Integer) values[i], len, buf);
+ int newPos = buf.position();
+ record.cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+ pos = newPos;
+ }
+
+ for (Map.Entry<Integer, Integer> entry : binaryCols.entrySet()) {
+ i = entry.getKey();
+ c = entry.getValue();
+ record.cols[c].reset((byte[]) values[i], 0, ((byte[]) values[i]).length);
+ }
+
+ for (Map.Entry<Integer, Integer> entry : otherCols.entrySet()) {
+ i = entry.getKey();
+ c = entry.getValue();
+ record.info.codeSystem.encodeColumnValue(c, values[i], buf);
+ int newPos = buf.position();
+ record.cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+ pos = newPos;
+ }
+
+ return record;
+ }
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
index 38cbd66..36b5369 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
@@ -177,18 +177,18 @@ public class BuiltInFunctionTupleFilter extends FunctionTupleFilter {
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
List<? extends TupleFilter> childFilter = this.getChildren();
String op = this.getName();
switch (op) {
case "LIKE":
assert childFilter.size() == 2;
- return childFilter.get(0).toSparkSqlFilter() + toSparkFuncMap.get(op) + childFilter.get(1).toSparkSqlFilter();
+ return childFilter.get(0).toSQL() + toSparkFuncMap.get(op) + childFilter.get(1).toSQL();
case "||":
StringBuilder result = new StringBuilder().append(toSparkFuncMap.get(op)).append("(");
int index = 0;
for (TupleFilter filter : childFilter) {
- result.append(filter.toSparkSqlFilter());
+ result.append(filter.toSQL());
if (index < childFilter.size() - 1) {
result.append(",");
}
@@ -200,17 +200,17 @@ public class BuiltInFunctionTupleFilter extends FunctionTupleFilter {
case "UPPER":
case "CHAR_LENGTH":
assert childFilter.size() == 1;
- return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSparkSqlFilter() + ")";
+ return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSQL() + ")";
case "SUBSTRING":
assert childFilter.size() == 3;
- return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSparkSqlFilter() + "," + childFilter.get(1).toSparkSqlFilter() + "," + childFilter.get(2).toSparkSqlFilter() + ")";
+ return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSQL() + "," + childFilter.get(1).toSQL() + "," + childFilter.get(2).toSQL() + ")";
default:
if (childFilter.size() == 1) {
- return op + "(" + childFilter.get(0).toSparkSqlFilter() + ")";
+ return op + "(" + childFilter.get(0).toSQL() + ")";
} else if (childFilter.size() == 2) {
- return childFilter.get(0).toSparkSqlFilter() + op + childFilter.get(1).toSparkSqlFilter();
+ return childFilter.get(0).toSQL() + op + childFilter.get(1).toSQL();
} else if (childFilter.size() == 3) {
- return op + "(" + childFilter.get(0).toSparkSqlFilter() + "," + childFilter.get(1).toSparkSqlFilter() + "," + childFilter.get(2).toSparkSqlFilter() + ")";
+ return op + "(" + childFilter.get(0).toSQL() + "," + childFilter.get(1).toSQL() + "," + childFilter.get(2).toSQL() + ")";
}
throw new IllegalArgumentException("Operator " + op + " is not supported");
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
index 4305557..16e645b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
@@ -134,16 +134,16 @@ public class CaseTupleFilter extends TupleFilter implements IOptimizeableTupleFi
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
String result = "(case ";
TupleFilter whenFilter;
TupleFilter thenFilter;
for (int i = 0; i < this.getWhenFilters().size(); i++) {
whenFilter = this.getWhenFilters().get(i);
thenFilter = this.getThenFilters().get(i);
- result += " when " + whenFilter.toSparkSqlFilter() + " then " + thenFilter.toSparkSqlFilter();
+ result += " when " + whenFilter.toSQL() + " then " + thenFilter.toSQL();
}
- result += " else " + this.getElseFilter().toSparkSqlFilter();
+ result += " else " + this.getElseFilter().toSQL();
result += " end)";
return result;
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index 09a16f5..caab5a7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -163,7 +163,7 @@ public class ColumnTupleFilter extends TupleFilter {
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
return this.columnRef.getTableAlias() + "_" + this.columnRef.getName();
}
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index b63ac0a..33e9819 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -280,7 +280,7 @@ public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupl
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
List<? extends TupleFilter> childFilter = this.getChildren();
switch (this.getOperator()) {
case EQ:
@@ -290,15 +290,15 @@ public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupl
case GTE:
case LTE:
assert childFilter.size() == 2;
- return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator()) + childFilter.get(1).toSparkSqlFilter();
+ return childFilter.get(0).toSQL() + toSparkOpMap.get(this.getOperator()) + childFilter.get(1).toSQL();
case IN:
case NOTIN:
assert childFilter.size() == 2;
- return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator()) + "(" + childFilter.get(1).toSparkSqlFilter() + ")";
+ return childFilter.get(0).toSQL() + toSparkOpMap.get(this.getOperator()) + "(" + childFilter.get(1).toSQL() + ")";
case ISNULL:
case ISNOTNULL:
assert childFilter.size() == 1;
- return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator());
+ return childFilter.get(0).toSQL() + toSparkOpMap.get(this.getOperator());
default:
throw new IllegalStateException("operator " + this.getOperator() + " not supported: ");
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
index e9ecf16..61d7fb6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
@@ -114,7 +114,7 @@ public class ConstantTupleFilter extends TupleFilter {
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
if (this.equals(TRUE)) {
return "true";
} else if (this.equals(FALSE)) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
index c4490e5..ffc1527 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
@@ -79,7 +79,7 @@ public class DynamicTupleFilter extends TupleFilter {
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
return "1=1";
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
index 36ea021..59c3e5f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -123,7 +123,7 @@ public class ExtractTupleFilter extends TupleFilter {
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
return "1=1";
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
index 99cf3f3..ebea184 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -155,7 +155,7 @@ public class LogicalTupleFilter extends TupleFilter implements IOptimizeableTupl
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
StringBuilder result = new StringBuilder("");
switch (this.getOperator()) {
case AND:
@@ -164,7 +164,7 @@ public class LogicalTupleFilter extends TupleFilter implements IOptimizeableTupl
String op = toSparkOpMap.get(this.getOperator());
int index = 0;
for (TupleFilter filter : this.getChildren()) {
- result.append(filter.toSparkSqlFilter());
+ result.append(filter.toSQL());
if (index < this.getChildren().size() - 1) {
result.append(op);
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 28a8c6c..0d3fb3e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -421,6 +421,6 @@ public abstract class TupleFilter {
}
}
- public abstract String toSparkSqlFilter();
+ public abstract String toSQL();
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
index b153003..ab7f6e0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
@@ -154,7 +154,7 @@ public class MassInTupleFilter extends FunctionTupleFilter {
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
return "1=1";
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
index 143ee6d..3bd147c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
@@ -58,7 +58,7 @@ public class UnsupportedTupleFilter extends TupleFilter {
}
@Override
- public String toSparkSqlFilter() {
+ public String toSQL() {
return "1=1";
}
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 60fe33f..9c21d82 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -111,7 +111,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
this.havingFilter = havingFilter;
if (convertedFilter != null) {
- this.filterPushDownSQL = convertedFilter.toSparkSqlFilter();
+ this.filterPushDownSQL = convertedFilter.toSQL();
logger.info("--filterPushDownSQL--: {}", this.filterPushDownSQL);
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/path/DefaultStoragePathBuilder.java b/core-storage/src/main/java/org/apache/kylin/storage/path/DefaultStoragePathBuilder.java
index 1e50ba0..858a431 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/path/DefaultStoragePathBuilder.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/path/DefaultStoragePathBuilder.java
@@ -34,7 +34,6 @@ public class DefaultStoragePathBuilder implements IStoragePathBuilder {
@Override
public String getJobRealizationRootPath(CubeSegment cubeSegment, String jobId) {
String jobWorkingDir = getJobWorkingDir(cubeSegment.getConfig().getHdfsWorkingDirectory(), jobId);
-
return jobWorkingDir + SLASH + cubeSegment.getRealization().getName();
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/path/IStoragePathBuilder.java b/core-storage/src/main/java/org/apache/kylin/storage/path/IStoragePathBuilder.java
index 7a70f98..5ad20d1 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/path/IStoragePathBuilder.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/path/IStoragePathBuilder.java
@@ -21,11 +21,11 @@ package org.apache.kylin.storage.path;
import org.apache.kylin.cube.CubeSegment;
public interface IStoragePathBuilder {
- public static final String SLASH = "/";
+ String SLASH = "/";
- public String getJobWorkingDir(String workingDir, String jobId);
+ String getJobWorkingDir(String workingDir, String jobId);
- public String getJobRealizationRootPath(CubeSegment cubeSegment, String jobId);
+ String getJobRealizationRootPath(CubeSegment cubeSegment, String jobId);
- public String getRealizationFinalDataPath(CubeSegment cubeSegment);
+ String getRealizationFinalDataPath(CubeSegment cubeSegment);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 8525dcc..934c04e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -77,8 +77,8 @@ public class JobBuilderSupport {
this.config = new JobEngineConfig(seg.getConfig());
this.seg = seg;
this.submitter = submitter;
- String pathBuilderClz = seg.getConfig().getStorageSystemPathBuilderClz();
- this.storagePathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(pathBuilderClz);
+ String pathBuilder = seg.getConfig().getStoragePathBuilder();
+ this.storagePathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(pathBuilder);
}
public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
deleted file mode 100644
index 154c058..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
+++ /dev/null
@@ -1,433 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskID;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowKeyDecoder;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dimension.AbstractDateDimEnc;
-import org.apache.kylin.dimension.DimensionEncoding;
-import org.apache.kylin.dimension.FixedLenDimEnc;
-import org.apache.kylin.dimension.FixedLenHexDimEnc;
-import org.apache.kylin.dimension.IDimensionEncodingMap;
-import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.engine.mr.common.SerializableConfiguration;
-import org.apache.kylin.measure.BufferedMeasureCodec;
-import org.apache.kylin.measure.MeasureIngester;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.basic.BasicMeasureType;
-import org.apache.kylin.measure.basic.BigDecimalIngester;
-import org.apache.kylin.measure.basic.DoubleIngester;
-import org.apache.kylin.measure.basic.LongIngester;
-import org.apache.kylin.measure.percentile.PercentileSerializer;
-import org.apache.kylin.metadata.datatype.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.parquet.example.data.Group;
-import org.apache.parquet.example.data.GroupFactory;
-import org.apache.parquet.example.data.simple.SimpleGroupFactory;
-import org.apache.parquet.hadoop.ParquetOutputFormat;
-import org.apache.parquet.hadoop.example.GroupWriteSupport;
-import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.OriginalType;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Types;
-import org.apache.spark.Partitioner;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.function.PairFunction;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-public class SparkCubingByLayerParquet extends SparkCubingByLayer {
- @Override
- protected void saveToHDFS(JavaPairRDD<ByteArray, Object[]> rdd, String metaUrl, String cubeName, CubeSegment cubeSeg, String hdfsBaseLocation, int level, Job job, KylinConfig kylinConfig) throws Exception {
- final IDimensionEncodingMap dimEncMap = cubeSeg.getDimensionEncodingMap();
-
- Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSeg.getCubeDesc());
-
- final Map<TblColRef, String> colTypeMap = Maps.newHashMap();
- final Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
-
- MessageType schema = cuboidToMessageType(baseCuboid, dimEncMap, cubeSeg.getCubeDesc(), colTypeMap, meaTypeMap);
-
- logger.info("Schema: {}", schema.toString());
-
- final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
-
- logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
-
- JavaPairRDD<ByteArray, Object[]> repartitionedRDD = rdd.repartitionAndSortWithinPartitions(new CuboidPartitioner(cuboidToPartitionMapping));
-
- String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
-
- job.setOutputFormatClass(CustomParquetOutputFormat.class);
- GroupWriteSupport.setSchema(schema, job.getConfiguration());
- CustomParquetOutputFormat.setOutputPath(job, new Path(output));
- CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
- CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
-
- JavaPairRDD<Void, Group> groupRDD = repartitionedRDD.mapToPair(new GenerateGroupRDDFunction(cubeName, cubeSeg.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap));
-
- groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
- }
-
- static class CuboidPartitioner extends Partitioner {
-
- private CuboidToPartitionMapping mapping;
-
- public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
- this.mapping = cuboidToPartitionMapping;
- }
-
- @Override
- public int numPartitions() {
- return mapping.getNumPartitions();
- }
-
- @Override
- public int getPartition(Object key) {
- ByteArray byteArray = (ByteArray) key;
- long cuboidId = Bytes.toLong(byteArray.array(), RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
-
- return mapping.getRandomPartitionForCuboidId(cuboidId);
- }
- }
-
- public static class CuboidToPartitionMapping implements Serializable {
- private Map<Long, List<Integer>> cuboidPartitions;
- private int partitionNum;
-
- public CuboidToPartitionMapping(Map<Long, List<Integer>> cuboidPartitions) {
- this.cuboidPartitions = cuboidPartitions;
- int partitions = 0;
- for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
- partitions = partitions + entry.getValue().size();
- }
- this.partitionNum = partitions;
- }
-
- public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
- cuboidPartitions = Maps.newHashMap();
-
- List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
- CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
-
- int position = 0;
- for (Long cuboidId : layeredCuboids) {
- int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
- List<Integer> positions = Lists.newArrayListWithCapacity(partition);
-
- for (int i = position; i < position + partition; i++) {
- positions.add(i);
- }
-
- cuboidPartitions.put(cuboidId, positions);
- position = position + partition;
- }
-
- this.partitionNum = position;
- }
-
- public String serialize() throws JsonProcessingException {
- return JsonUtil.writeValueAsString(cuboidPartitions);
- }
-
- public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
- Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
- return new CuboidToPartitionMapping(cuboidPartitions);
- }
-
- public int getNumPartitions() {
- return this.partitionNum;
- }
-
- public long getCuboidIdByPartition(int partition) {
- for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
- if (entry.getValue().contains(partition)) {
- return entry.getKey();
- }
- }
-
- throw new IllegalArgumentException("No cuboidId for partition id: " + partition);
- }
-
- public int getRandomPartitionForCuboidId(long cuboidId) {
- List<Integer> partitions = cuboidPartitions.get(cuboidId);
- return partitions.get(new Random().nextInt(partitions.size()));
- }
-
- public int getPartitionNumForCuboidId(long cuboidId) {
- return cuboidPartitions.get(cuboidId).size();
- }
-
- public String getPartitionFilePrefix(int partition) {
- String prefix = "cuboid_";
- long cuboid = getCuboidIdByPartition(partition);
- int partNum = partition % getPartitionNumForCuboidId(cuboid);
- prefix = prefix + cuboid + "_part" + partNum;
-
- return prefix;
- }
-
- private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
- double cuboidSize = cubeStatsReader.estimateCuboidSize(cuboidId);
- float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
- int partition = (int) (cuboidSize / rddCut);
- partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
- partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
- return partition;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
- sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
- }
-
- return sb.toString();
- }
- }
-
- public static class CustomParquetOutputFormat extends ParquetOutputFormat {
- public static final String CUBOID_TO_PARTITION_MAPPING = "cuboidToPartitionMapping";
-
- @Override
- public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
- FileOutputCommitter committer = (FileOutputCommitter)this.getOutputCommitter(context);
- TaskID taskId = context.getTaskAttemptID().getTaskID();
- int partition = taskId.getId();
-
- CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(CUBOID_TO_PARTITION_MAPPING));
-
- return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
- }
-
- public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
- String jsonStr = cuboidToPartitionMapping.serialize();
-
- job.getConfiguration().set(CUBOID_TO_PARTITION_MAPPING, jsonStr);
- }
- }
-
- static class GenerateGroupRDDFunction implements PairFunction<Tuple2<ByteArray, Object[]>, Void, Group> {
- private volatile transient boolean initialized = false;
- private String cubeName;
- private String segmentId;
- private String metaUrl;
- private SerializableConfiguration conf;
- private List<MeasureDesc> measureDescs;
- private RowKeyDecoder decoder;
- private Map<TblColRef, String> colTypeMap;
- private Map<MeasureDesc, String> meaTypeMap;
- private GroupFactory factory;
- private BufferedMeasureCodec measureCodec;
- private PercentileSerializer serializer;
- private ByteBuffer byteBuffer;
-
- public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
- this.cubeName = cubeName;
- this.segmentId = segmentId;
- this.metaUrl = metaurl;
- this.conf = conf;
- this.colTypeMap = colTypeMap;
- this.meaTypeMap = meaTypeMap;
- }
-
- private void init() {
- KylinConfig kConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, metaUrl);
- KylinConfig.setAndUnsetThreadLocalConfig(kConfig);
- CubeInstance cubeInstance = CubeManager.getInstance(kConfig).getCube(cubeName);
- CubeDesc cubeDesc = cubeInstance.getDescriptor();
- CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
- measureDescs = cubeDesc.getMeasures();
- decoder = new RowKeyDecoder(cubeSegment);
- factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
- measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
- serializer = new PercentileSerializer(DataType.getType("percentile(100)"));
-
- }
-
- @Override
- public Tuple2<Void, Group> call(Tuple2<ByteArray, Object[]> tuple) throws Exception {
- if (initialized == false) {
- synchronized (SparkCubingByLayer.class) {
- if (initialized == false) {
- init();
- }
- }
- }
-
- long cuboid = decoder.decode(tuple._1.array());
- List<String> values = decoder.getValues();
- List<TblColRef> columns = decoder.getColumns();
-
- Group group = factory.newGroup();
-
- // for check
- group.append("cuboidId", cuboid);
-
- for (int i = 0; i < columns.size(); i++) {
- TblColRef column = columns.get(i);
- parseColValue(group, column, values.get(i));
- }
-
- ByteBuffer valueBuf = measureCodec.encode(tuple._2());
- byte[] encodedBytes = new byte[valueBuf.position()];
- System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, valueBuf.position());
-
- int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
-
- int valueOffset = 0;
- for (int i = 0; i < valueLengths.length; ++i) {
- MeasureDesc measureDesc = measureDescs.get(i);
- parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i], tuple._2[i]);
- valueOffset += valueLengths[i];
- }
-
- return new Tuple2<>(null, group);
- }
-
- private void parseColValue(final Group group, final TblColRef colRef, final String value) {
- switch (colTypeMap.get(colRef)) {
- case "int":
- group.append(colRef.getTableAlias() + "_" + colRef.getName(), Integer.valueOf(value));
- break;
- case "long":
- group.append(colRef.getTableAlias() + "_" + colRef.getName(), Long.valueOf(value));
- break;
- default:
- group.append(colRef.getTableAlias() + "_" + colRef.getName(), Binary.fromString(value));
- break;
- }
- }
-
- private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length, final Object d) {
- switch (meaTypeMap.get(measureDesc)) {
- case "long":
- group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
- break;
- case "double":
- group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
- break;
- case "decimal":
- BigDecimal decimal = (BigDecimal)d;
- decimal = decimal.setScale(4);
- group.append(measureDesc.getName(), Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray()));
- break;
- default:
- group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
- break;
- }
- }
- }
-
- private MessageType cuboidToMessageType(Cuboid cuboid, IDimensionEncodingMap dimEncMap, CubeDesc cubeDesc, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
- Types.MessageTypeBuilder builder = Types.buildMessage();
-
- List<TblColRef> colRefs = cuboid.getColumns();
-
- builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named("cuboidId");
-
- for (TblColRef colRef : colRefs) {
- DimensionEncoding dimEnc = dimEncMap.get(colRef);
-
- if (dimEnc instanceof AbstractDateDimEnc) {
- builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
- colTypeMap.put(colRef, "long");
- } else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
- org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
- if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
- builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
- colTypeMap.put(colRef, "long");
- } else {
- // stringFamily && default
- builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
- colTypeMap.put(colRef, "string");
- }
- } else {
- builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
- colTypeMap.put(colRef, "int");
- }
- }
-
- MeasureIngester[] aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-
- for (int i = 0; i < cubeDesc.getMeasures().size(); i++) {
- MeasureDesc measureDesc = cubeDesc.getMeasures().get(i);
- org.apache.kylin.metadata.datatype.DataType meaDataType = measureDesc.getFunction().getReturnDataType();
- MeasureType measureType = measureDesc.getFunction().getMeasureType();
-
- if (measureType instanceof BasicMeasureType) {
- MeasureIngester measureIngester = aggrIngesters[i];
- if (measureIngester instanceof LongIngester) {
- builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "long");
- } else if (measureIngester instanceof DoubleIngester) {
- builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "double");
- } else if (measureIngester instanceof BigDecimalIngester) {
- builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.DECIMAL).precision(meaDataType.getPrecision()).scale(meaDataType.getScale()).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "decimal");
- } else {
- builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "binary");
- }
- } else {
- builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(measureDesc.getName());
- meaTypeMap.put(measureDesc, "binary");
- }
- }
-
- return builder.named(String.valueOf(cuboid.getId()));
- }
-
- private String getColName(TblColRef colRef) {
- return colRef.getTableAlias() + "_" + colRef.getName();
- }
-}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 6900ca7..bfc0976 100755
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -102,7 +102,7 @@ public class StorageCleanupJob extends AbstractApplication {
this.defaultFs = defaultFs;
this.hbaseFs = hbaseFs;
this.executableManager = ExecutableManager.getInstance(config);
- this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStorageSystemPathBuilderClz());
+ this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStoragePathBuilder());
}
public void setDelete(boolean delete) {
diff --git a/server-base/src/test/resources/test_meta/execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930 b/server-base/src/test/resources/test_meta/execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930
index ed6d6fa..208a0c9e 100644
--- a/server-base/src/test/resources/test_meta/execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930
+++ b/server-base/src/test/resources/test_meta/execute/d861b8b7-c773-47ab-bb1e-c8782ae8d930
@@ -21,7 +21,7 @@
"version" : "2.3.0.20500",
"name" : "Redistribute Flat Hive Table",
"tasks" : null,
- "type" : "org.apache.kylin.source.hive.HiveMRInput$RedistributeFlatHiveTableStep",
+ "type" : "zorg.apache.kylin.source.hive.HiveMRInput$RedistributeFlatHiveTableStep",
"params" : {
"HiveInit" : "USE default;\n",
"HiveRedistributeData" : "INSERT OVERWRITE TABLE kylin_intermediate_ss_a110ac52_9a91_49fe_944a_346d61e54115 SELECT * FROM kylin_intermediate_ss_a110ac52_9a91_49fe_944a_346d61e54115 DISTRIBUTE BY RAND();\n",
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 3c9434b..c2687a1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -115,10 +115,10 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
- final protected IJoinedFlatTableDesc flatDesc;
- final protected String flatTableDatabase;
- final protected String hdfsWorkingDir;
- final protected IStoragePathBuilder pathBuilder;
+ protected final IJoinedFlatTableDesc flatDesc;
+ protected final String flatTableDatabase;
+ protected final String hdfsWorkingDir;
+ protected final IStoragePathBuilder pathBuilder;
List<String> hiveViewIntermediateTables = Lists.newArrayList();
@@ -127,7 +127,7 @@ public class HiveMRInput extends HiveInputBase implements IMRInput {
this.flatDesc = flatDesc;
this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
- this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStorageSystemPathBuilderClz());
+ this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStoragePathBuilder());
}
@Override
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index 031548c8..1364eed 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -73,7 +73,7 @@ public class HiveSparkInput extends HiveInputBase implements ISparkInput {
this.flatDesc = flatDesc;
this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
- this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStorageSystemPathBuilderClz());
+ this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStoragePathBuilder());
}
@Override
diff --git a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
index 1bcd9ce..574c6f2 100644
--- a/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
+++ b/source-hive/src/test/java/org/apache/kylin/source/hive/HiveMRInputTest.java
@@ -45,11 +45,11 @@ public class HiveMRInputTest {
try (SetAndUnsetThreadLocalConfig autoUnset = KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig)) {
when(kylinConfig.getHiveTableDirCreateFirst()).thenReturn(true);
when(kylinConfig.getHdfsWorkingDirectory()).thenReturn("/tmp/kylin/");
- when(kylinConfig.getStorageSystemPathBuilderClz()).thenReturn("org.apache.kylin.storage.path.DefaultStoragePathBuilder");
+ when(kylinConfig.getStoragePathBuilder()).thenReturn("org.apache.kylin.storage.path.DefaultStoragePathBuilder");
DefaultChainedExecutable defaultChainedExecutable = mock(DefaultChainedExecutable.class);
defaultChainedExecutable.setId(RandomUtil.randomUUID().toString());
- String jobWorkingDir = HiveInputBase.getJobWorkingDir(defaultChainedExecutable, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), (IStoragePathBuilder) ClassUtil.newInstance(KylinConfig.getInstanceFromEnv().getStorageSystemPathBuilderClz()));
+ String jobWorkingDir = HiveInputBase.getJobWorkingDir(defaultChainedExecutable, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), (IStoragePathBuilder) ClassUtil.newInstance(KylinConfig.getInstanceFromEnv().getStoragePathBuilder()));
jobWorkDirPath = new Path(jobWorkingDir);
Assert.assertTrue(fileSystem.exists(jobWorkDirPath));
} finally {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 68f87ff..798d099 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -82,7 +82,7 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
public KafkaTableInputFormat(CubeSegment cubeSegment, JobEngineConfig conf) {
this.cubeSegment = cubeSegment;
this.conf = conf;
- this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(conf.getConfig().getStorageSystemPathBuilderClz());
+ this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(conf.getConfig().getStoragePathBuilder());
}
@Override
@@ -134,7 +134,7 @@ public class KafkaMRInput extends KafkaInputBase implements IMRInput {
this.seg = seg;
this.cubeDesc = seg.getCubeDesc();
this.cubeName = seg.getCubeInstance().getName();
- this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStorageSystemPathBuilderClz());
+ this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(config.getStoragePathBuilder());
}
@Override
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
index 9c14fd6..9a84612 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
@@ -73,7 +73,7 @@ public class KafkaSparkInput extends KafkaInputBase implements ISparkInput {
this.seg = seg;
this.cubeDesc = seg.getCubeDesc();
this.cubeName = seg.getCubeInstance().getName();
- this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(this.config.getStorageSystemPathBuilderClz());
+ this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(this.config.getStoragePathBuilder());
}
@Override
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
index f6d11bd..e8f7713 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/lookup/HBaseLookupMRSteps.java
@@ -61,7 +61,7 @@ public class HBaseLookupMRSteps {
public HBaseLookupMRSteps(CubeInstance cube) {
this.cube = cube;
this.config = new JobEngineConfig(cube.getConfig());
- this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(cube.getConfig().getStorageSystemPathBuilderClz());
+ this.pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(cube.getConfig().getStoragePathBuilder());
}
public void addMaterializeLookupTablesSteps(LookupMaterializeContext context) {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
index 05df80d..4c31c45 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HDFSPathGarbageCollectionStep.java
@@ -60,7 +60,7 @@ public class HDFSPathGarbageCollectionStep extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
try {
config = new JobEngineConfig(context.getConfig());
- pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(context.getConfig().getStorageSystemPathBuilderClz());
+ pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(context.getConfig().getStoragePathBuilder());
List<String> toDeletePaths = getDeletePaths();
dropHdfsPathOnCluster(toDeletePaths, HadoopUtil.getWorkingFileSystem());
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index ce43456..619c32c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -186,8 +186,8 @@ public class CubeMigrationCLI {
}
private static void renameFoldersInHdfs(CubeInstance cube) {
- IStoragePathBuilder srcPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(srcConfig.getStorageSystemPathBuilderClz());
- IStoragePathBuilder dstPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(dstConfig.getStorageSystemPathBuilderClz());
+ IStoragePathBuilder srcPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(srcConfig.getStoragePathBuilder());
+ IStoragePathBuilder dstPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(dstConfig.getStoragePathBuilder());
for (CubeSegment segment : cube.getSegments()) {
String jobUuid = segment.getLastBuildJobID();
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index fd6124b..496b2bb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -187,7 +187,7 @@ public class StorageCleanupJob extends AbstractApplication {
private void cleanUnusedHdfsFiles(Configuration conf) throws IOException {
JobEngineConfig engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- IStoragePathBuilder pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(engineConfig.getConfig().getStorageSystemPathBuilderClz());
+ IStoragePathBuilder pathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(engineConfig.getConfig().getStoragePathBuilder());
FileSystem fs = HadoopUtil.getWorkingFileSystem(conf);
List<String> allHdfsPathsNeedToBeDeleted = new ArrayList<String>();
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
index e05b433..e878ec4 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/ParquetStorage.java
@@ -47,7 +47,7 @@ public class ParquetStorage implements IStorage {
} else if (engineInterface == IMROutput2.class) {
return (I) new ParquetMROutput();
} else{
- throw new RuntimeException("Cannot adapt to " + engineInterface);
+ throw new UnsupportedOperationException("Cannot adapt to " + engineInterface);
}
}
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
index a3ce76f..2fb5733 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
@@ -20,10 +20,6 @@ package org.apache.kylin.storage.parquet.cube;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
public class CubeStorageQuery extends GTCubeStorageQueryBase {
@@ -33,11 +29,6 @@ public class CubeStorageQuery extends GTCubeStorageQueryBase {
}
@Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- return super.search(context, sqlDigest, returnTupleInfo);
- }
-
- @Override
protected String getGTStorage() {
return KylinConfig.getInstanceFromEnv().getSparkCubeGTStorage();
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
index 3129b8e..2f1aa17 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
@@ -119,7 +119,7 @@ public class ParquetPayload {
return storageType;
}
- static public class ParquetPayloadBuilder {
+ public static class ParquetPayloadBuilder {
private byte[] gtScanRequest;
private String gtScanRequestId;
private String kylinProperties;
@@ -136,9 +136,6 @@ public class ParquetPayload {
private long startTime;
private int storageType;
- public ParquetPayloadBuilder() {
- }
-
public ParquetPayloadBuilder setGtScanRequest(byte[] gtScanRequest) {
this.gtScanRequest = gtScanRequest;
return this;
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
index 611ee44..4b08c38 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
@@ -31,7 +31,12 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
+import org.apache.kylin.measure.percentile.PercentileMeasureType;
+import org.apache.kylin.measure.raw.RawMeasureType;
+import org.apache.kylin.measure.topn.TopNMeasureType;
import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.spark.api.java.JavaRDD;
@@ -94,7 +99,7 @@ public class ParquetTask implements Serializable {
String dataFolderName = request.getDataFolderName();
String baseFolder = dataFolderName.substring(0, dataFolderName.lastIndexOf('/'));
- String cuboidId = dataFolderName.substring(dataFolderName.lastIndexOf("/") + 1);
+ String cuboidId = dataFolderName.substring(dataFolderName.lastIndexOf('/') + 1);
String prefix = "cuboid_" + cuboidId + "_";
CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(request.getRealizationId());
@@ -117,11 +122,11 @@ public class ParquetTask implements Serializable {
pathBuilder.append(p.toString()).append(";");
}
- logger.info("Columnar path is " + pathBuilder.toString());
- logger.info("Required Measures: " + StringUtils.join(request.getParquetColumns(), ","));
- logger.info("Max GT length: " + request.getMaxRecordLength());
+ logger.info("Columnar path is {}", pathBuilder);
+ logger.info("Required Measures: {}", StringUtils.join(request.getParquetColumns(), ","));
+ logger.info("Max GT length: {}", request.getMaxRecordLength());
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new IllegalStateException(e);
}
}
@@ -138,14 +143,14 @@ public class ParquetTask implements Serializable {
map.clear();
} catch (IllegalAccessException | NoSuchFieldException e) {
- throw new RuntimeException(e);
+ throw new IllegalStateException(e);
}
}
public Iterator<Object[]> executeTask() {
- logger.info("Start to visit cube data with Spark SQL <<<<<<");
+ logger.debug("Start to visit cube data with Spark <<<<<<");
- SQLContext sqlContext = new SQLContext(SparderEnv.getSparkSession().sparkContext());
+ SQLContext sqlContext = SparderEnv.getSparkSession().sqlContext();
Dataset<Row> dataset = sqlContext.read().parquet(parquetPaths);
ImmutableBitSet dimensions = scanRequest.getDimensions();
@@ -175,21 +180,17 @@ public class ParquetTask implements Serializable {
// sort
dataset = dataset.sort(getSortColumn(groupBy, mapping));
- JavaRDD<Row> rowRDD = dataset.javaRDD();
-
- JavaRDD<Object[]> objRDD = rowRDD.map(new Function<Row, Object[]>() {
- @Override
- public Object[] call(Row row) throws Exception {
- Object[] objects = new Object[row.length()];
- for (int i = 0; i < row.length(); i++) {
- objects[i] = row.get(i);
- }
- return objects;
+ JavaRDD<Object[]> objRDD = dataset.javaRDD().map((Function<Row, Object[]>) row -> {
+ Object[] objects = new Object[row.length()];
+ for (int i = 0; i < row.length(); i++) {
+ objects[i] = row.get(i);
}
+ return objects;
});
- logger.info("partitions: {}", objRDD.getNumPartitions());
+ logger.debug("partitions: {}", objRDD.getNumPartitions());
+ // TODO: optimize the way to collect data.
List<Object[]> result = objRDD.collect();
return result.iterator();
}
@@ -216,23 +217,23 @@ public class ParquetTask implements Serializable {
private Column getAggColumn(String metName, String func, DataType dataType) {
Column column;
switch (func) {
- case "SUM":
+ case FunctionDesc.FUNC_SUM:
column = sum(metName);
break;
- case "MIN":
+ case FunctionDesc.FUNC_MIN:
column = min(metName);
break;
- case "MAX":
+ case FunctionDesc.FUNC_MAX:
column = max(metName);
break;
- case "COUNT":
+ case FunctionDesc.FUNC_COUNT:
column = sum(metName);
break;
- case "TOP_N":
- case "COUNT_DISTINCT":
- case "EXTENDED_COLUMN":
- case "PERCENTILE_APPROX":
- case "RAW":
+ case TopNMeasureType.FUNC_TOP_N:
+ case FunctionDesc.FUNC_COUNT_DISTINCT:
+ case ExtendedColumnMeasureType.FUNC_EXTENDED_COLUMN:
+ case PercentileMeasureType.FUNC_PERCENTILE_APPROX:
+ case RawMeasureType.FUNC_RAW:
String udf = UdfManager.register(dataType, func);
column = callUDF(udf, col(metName));
break;
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
index 1c8c246..604cb6d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
@@ -21,7 +21,6 @@ package org.apache.kylin.storage.parquet.spark;
import org.apache.kylin.ext.ClassLoaderUtils;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.storage.parquet.spark.gtscanner.ParquetRecordGTScanner;
import org.apache.kylin.storage.parquet.spark.gtscanner.ParquetRecordGTScanner4Cube;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,13 +29,9 @@ public class SparkSubmitter {
public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);
public static IGTScanner submitParquetTask(GTScanRequest scanRequest, ParquetPayload payload) {
-
Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
ParquetTask parquetTask = new ParquetTask(payload);
-
- ParquetRecordGTScanner scanner = new ParquetRecordGTScanner4Cube(scanRequest.getInfo(), parquetTask.executeTask(), scanRequest,
- payload.getMaxScanBytes());
-
- return scanner;
+ return new ParquetRecordGTScanner4Cube(scanRequest.getInfo(),
+ parquetTask.executeTask(), scanRequest, payload.getMaxScanBytes());
}
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
index 322aec1..27f005a 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
@@ -19,18 +19,30 @@
package org.apache.kylin.storage.parquet.spark.gtscanner;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.exceptions.KylinTimeoutException;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
-import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.dimension.DictionaryDimEnc;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.measure.bitmap.BitmapSerializer;
+import org.apache.kylin.measure.dim.DimCountDistincSerializer;
+import org.apache.kylin.measure.extendedcolumn.ExtendedColumnSerializer;
+import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.measure.percentile.PercentileSerializer;
+import org.apache.kylin.measure.raw.RawSerializer;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Iterator;
+import java.util.Map;
/**
* this class tracks resource
@@ -43,20 +55,16 @@ public abstract class ParquetRecordGTScanner implements IGTScanner {
private ImmutableBitSet columns;
private long maxScannedBytes;
-
private long scannedRows;
private long scannedBytes;
- private ImmutableBitSet[] columnBlocks;
-
public ParquetRecordGTScanner(GTInfo info, Iterator<Object[]> iterator, GTScanRequest scanRequest,
- long maxScannedBytes) {
+ long maxScannedBytes) {
this.iterator = iterator;
this.info = info;
this.gtrecord = new GTRecord(info);
this.columns = scanRequest.getColumns();
this.maxScannedBytes = maxScannedBytes;
- this.columnBlocks = getParquetCoveredColumnBlocks(scanRequest);
}
@Override
@@ -79,12 +87,40 @@ public abstract class ParquetRecordGTScanner implements IGTScanner {
@Override
public Iterator<GTRecord> iterator() {
return Iterators.transform(iterator, new com.google.common.base.Function<Object[], GTRecord>() {
+ private int maxColumnLength = -1;
+ private ByteBuffer byteBuf = null;
+ private final Map<Integer, Integer> dictCols = Maps.newHashMap();
+ private final Map<Integer, Integer> binaryCols = Maps.newHashMap();
+ private final Map<Integer, Integer> otherCols = Maps.newHashMap();
@Nullable
@Override
+ @SuppressWarnings("checkstyle:BooleanExpressionComplexity")
public GTRecord apply(@Nullable Object[] input) {
- gtrecord.setValuesParquet(ParquetRecordGTScanner.this.columns, new ByteArray(info.getMaxColumnLength(ParquetRecordGTScanner.this.columns)), input);
+ if (maxColumnLength <= 0) {
+ maxColumnLength = info.getMaxColumnLength(ParquetRecordGTScanner.this.columns);
+ for (int i = 0; i < ParquetRecordGTScanner.this.columns.trueBitCount(); i++) {
+ int c = ParquetRecordGTScanner.this.columns.trueBitAt(i);
+ DataTypeSerializer serializer = info.getCodeSystem().getSerializer(c);
+ if (serializer instanceof DictionaryDimEnc.DictionarySerializer) {
+ dictCols.put(i, c);
+ } else if (serializer instanceof TopNCounterSerializer || serializer instanceof HLLCSerializer
+ || serializer instanceof BitmapSerializer
+ || serializer instanceof ExtendedColumnSerializer
+ || serializer instanceof PercentileSerializer
+ || serializer instanceof DimCountDistincSerializer
+ || serializer instanceof RawSerializer) {
+ binaryCols.put(i, c);
+ } else {
+ otherCols.put(i, c);
+ }
+ }
+
+ byteBuf = ByteBuffer.allocate(maxColumnLength);
+ }
+ byteBuf.clear();
+ GTUtil.setValuesParquet(gtrecord, byteBuf, dictCols, binaryCols, otherCols, input);
- scannedBytes += info.getMaxColumnLength(ParquetRecordGTScanner.this.columns);
+ scannedBytes += maxColumnLength;
if ((++scannedRows % GTScanRequest.terminateCheckInterval == 1) && Thread.interrupted()) {
throw new KylinTimeoutException("Query timeout");
}
@@ -98,8 +134,6 @@ public abstract class ParquetRecordGTScanner implements IGTScanner {
});
}
- abstract protected ImmutableBitSet getParquetCoveredColumns(GTScanRequest scanRequest);
- abstract protected ImmutableBitSet[] getParquetCoveredColumnBlocks(GTScanRequest scanRequest);
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
index 3bf670e..d97531f 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
@@ -18,47 +18,15 @@
package org.apache.kylin.storage.parquet.spark.gtscanner;
-import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTScanRequest;
-import java.util.BitSet;
import java.util.Iterator;
public class ParquetRecordGTScanner4Cube extends ParquetRecordGTScanner {
public ParquetRecordGTScanner4Cube(GTInfo info, Iterator<Object[]> iterator, GTScanRequest scanRequest,
- long maxScannedBytes) {
+ long maxScannedBytes) {
super(info, iterator, scanRequest, maxScannedBytes);
}
- protected ImmutableBitSet getParquetCoveredColumns(GTScanRequest scanRequest) {
- BitSet bs = new BitSet();
-
- ImmutableBitSet dimensions = scanRequest.getInfo().getPrimaryKey();
- for (int i = 0; i < dimensions.trueBitCount(); ++i) {
- bs.set(dimensions.trueBitAt(i));
- }
-
- ImmutableBitSet queriedColumns = scanRequest.getColumns();
- for (int i = 0; i < queriedColumns.trueBitCount(); ++i) {
- bs.set(queriedColumns.trueBitAt(i));
- }
- return new ImmutableBitSet(bs);
- }
-
- protected ImmutableBitSet[] getParquetCoveredColumnBlocks(GTScanRequest scanRequest) {
-
- ImmutableBitSet selectedColBlocksBitSet = scanRequest.getSelectedColBlocks();
-
- ImmutableBitSet[] selectedColBlocks = new ImmutableBitSet[selectedColBlocksBitSet.trueBitCount()];
-
- for(int i = 0; i < selectedColBlocksBitSet.trueBitCount(); i++) {
-
- selectedColBlocks[i] = scanRequest.getInfo().getColumnBlock(selectedColBlocksBitSet.trueBitAt(i));
-
- }
-
- return selectedColBlocks;
- }
-
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
index c778ee0..a0c1a2d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ConvertToParquetReducer.java
@@ -43,7 +43,6 @@ import java.io.IOException;
import java.util.Map;
/**
- * Created by Yichen on 11/14/18.
*/
public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritable, Group> {
private ParquetConvertor convertor;
@@ -51,7 +50,7 @@ public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritab
private CubeSegment cubeSegment;
@Override
- protected void doSetup(Context context) throws IOException, InterruptedException {
+ protected void doSetup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
super.bindCurrentConfiguration(conf);
mos = new MultipleOutputs(context);
@@ -68,9 +67,9 @@ public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritab
SerializableConfiguration sConf = new SerializableConfiguration(conf);
Map<TblColRef, String> colTypeMap = Maps.newHashMap();
- Map<MeasureDesc, String> meaTypeMap = Maps.newHashMap();
- ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cube.getDescriptor(), colTypeMap, meaTypeMap);
- convertor = new ParquetConvertor(cubeName, segmentId, kylinConfig, sConf, colTypeMap, meaTypeMap);
+ Map<MeasureDesc, String> measureTypeMap = Maps.newHashMap();
+ ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cube.getDescriptor(), colTypeMap, measureTypeMap);
+ convertor = new ParquetConvertor(cubeName, segmentId, kylinConfig, sConf, colTypeMap, measureTypeMap);
}
@Override
@@ -80,14 +79,10 @@ public class ConvertToParquetReducer extends KylinReducer<Text, Text, NullWritab
int partitionId = context.getTaskAttemptID().getTaskID().getId();
for (Text value : values) {
- try {
- Group group = convertor.parseValueToGroup(key, value);
- String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel("", layerNumber)
- + "/" + ParquetJobSteps.getCuboidOutputFileName(cuboidId, partitionId);
- mos.write(MRCubeParquetJob.BY_LAYER_OUTPUT, null, group, output);
- } catch (IOException e){
- throw new IOException(e);
- }
+ Group group = convertor.parseValueToGroup(key, value);
+ String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel("", layerNumber) + "/"
+ + ParquetJobSteps.getCuboidOutputFileName(cuboidId, partitionId);
+ mos.write(MRCubeParquetJob.BY_LAYER_OUTPUT, null, group, output);
}
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
index 7fcf95d..97309f1 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/CuboidToPartitionMapping.java
@@ -39,7 +39,6 @@ import java.util.Map;
import java.util.Set;
/**
- * Created by Yichen on 11/12/18.
*/
public class CuboidToPartitionMapping implements Serializable {
private static final Logger logger = LoggerFactory.getLogger(CuboidToPartitionMapping.class);
@@ -58,31 +57,26 @@ public class CuboidToPartitionMapping implements Serializable {
public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig) throws IOException {
cuboidPartitions = Maps.newHashMap();
-
Set<Long> allCuboidIds = cubeSeg.getCuboidScheduler().getAllCuboidIds();
-
- CalculatePartitionId(cubeSeg, kylinConfig, allCuboidIds);
+ calculatePartitionId(cubeSeg, kylinConfig, allCuboidIds);
}
public CuboidToPartitionMapping(CubeSegment cubeSeg, KylinConfig kylinConfig, int level) throws IOException {
cuboidPartitions = Maps.newHashMap();
-
List<Long> layeredCuboids = cubeSeg.getCuboidScheduler().getCuboidsByLayer().get(level);
-
- CalculatePartitionId(cubeSeg, kylinConfig, layeredCuboids);
+ calculatePartitionId(cubeSeg, kylinConfig, layeredCuboids);
}
- private void CalculatePartitionId(CubeSegment cubeSeg, KylinConfig kylinConfig, Collection<Long> cuboidIds) throws IOException {
+ private void calculatePartitionId(CubeSegment cubeSeg, KylinConfig kylinConfig, Collection<Long> cuboidIds)
+ throws IOException {
int position = 0;
CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSeg, kylinConfig);
for (Long cuboidId : cuboidIds) {
int partition = estimateCuboidPartitionNum(cuboidId, cubeStatsReader, kylinConfig);
List<Integer> positions = Lists.newArrayListWithCapacity(partition);
-
for (int i = position; i < position + partition; i++) {
positions.add(i);
}
-
cuboidPartitions.put(cuboidId, positions);
position = position + partition;
}
@@ -95,7 +89,9 @@ public class CuboidToPartitionMapping implements Serializable {
}
public static CuboidToPartitionMapping deserialize(String jsonMapping) throws IOException {
- Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping, new TypeReference<Map<Long, List<Integer>>>() {});
+ Map<Long, List<Integer>> cuboidPartitions = JsonUtil.readValue(jsonMapping,
+ new TypeReference<Map<Long, List<Integer>>>() {
+ });
return new CuboidToPartitionMapping(cuboidPartitions);
}
@@ -137,9 +133,7 @@ public class CuboidToPartitionMapping implements Serializable {
public String getPartitionFilePrefix(int partition) {
long cuboid = getCuboidIdByPartition(partition);
int partNum = partition % getPartitionNumForCuboidId(cuboid);
- String prefix = ParquetJobSteps.getCuboidOutputFileName(cuboid, partNum);
-
- return prefix;
+ return ParquetJobSteps.getCuboidOutputFileName(cuboid, partNum);
}
private int estimateCuboidPartitionNum(long cuboidId, CubeStatsReader cubeStatsReader, KylinConfig kylinConfig) {
@@ -157,7 +151,8 @@ public class CuboidToPartitionMapping implements Serializable {
public String toString() {
StringBuilder sb = new StringBuilder();
for (Map.Entry<Long, List<Integer>> entry : cuboidPartitions.entrySet()) {
- sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ",")).append("]\n");
+ sb.append("cuboidId:").append(entry.getKey()).append(" [").append(StringUtils.join(entry.getValue(), ","))
+ .append("]\n");
}
return sb.toString();
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
index 7113a7a..a1d74fe 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/MRCubeParquetJob.java
@@ -48,13 +48,12 @@ import java.io.IOException;
/**
- * Created by Yichen on 11/9/18.
*/
public class MRCubeParquetJob extends AbstractHadoopJob {
protected static final Logger logger = LoggerFactory.getLogger(MRCubeParquetJob.class);
- final static String BY_LAYER_OUTPUT = "ByLayer";
+ public static final String BY_LAYER_OUTPUT = "ByLayer";
private Options options;
public MRCubeParquetJob(){
@@ -73,7 +72,7 @@ public class MRCubeParquetJob extends AbstractHadoopJob {
final Path inputPath = new Path(getOptionValue(OPTION_INPUT_PATH));
final Path outputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
final String cubeName = getOptionValue(OPTION_CUBE_NAME);
- logger.info("CubeName: ", cubeName);
+ logger.info("CubeName: {}", cubeName);
final String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
@@ -94,13 +93,10 @@ public class MRCubeParquetJob extends AbstractHadoopJob {
Cuboid baseCuboid = Cuboid.getBaseCuboid(cubeSegment.getCubeDesc());
MessageType schema = ParquetConvertor.cuboidToMessageType(baseCuboid, dimEncMap, cubeSegment.getCubeDesc());
- logger.info("Schema: {}", schema.toString());
+ logger.info("Schema: {}", schema);
try {
-
job.getConfiguration().set(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING, jsonStr);
-
-
addInputDirs(inputPath.toString(), job);
FileOutputFormat.setOutputPath(job, outputPath);
@@ -145,7 +141,7 @@ public class MRCubeParquetJob extends AbstractHadoopJob {
try {
mapping = CuboidToPartitionMapping.deserialize(conf.get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new IllegalArgumentException(e);
}
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
index 9b9578d..b3be22d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetConvertor.java
@@ -64,7 +64,6 @@ import java.util.List;
import java.util.Map;
/**
- * Created by Yichen on 11/9/18.
*/
public class ParquetConvertor {
private static final Logger logger = LoggerFactory.getLogger(ParquetConvertor.class);
@@ -121,7 +120,7 @@ public class ParquetConvertor {
int valueOffset = 0;
for (int i = 0; i < valueLengths.length; ++i) {
MeasureDesc measureDesc = measureDescs.get(i);
- parseMeaValue(group, measureDesc, rawValue.getBytes(), valueOffset, valueLengths[i]);
+ parseMeasureValue(group, measureDesc, rawValue.getBytes(), valueOffset, valueLengths[i]);
valueOffset += valueLengths[i];
}
@@ -146,7 +145,7 @@ public class ParquetConvertor {
}
}
- private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) throws IOException {
+ private void parseMeasureValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) {
if (value==null) {
logger.error("value is null");
return;
@@ -161,10 +160,10 @@ public class ParquetConvertor {
case DATATYPE_DECIMAL:
BigDecimal decimal = serializer.deserialize(ByteBuffer.wrap(value, offset, length));
decimal = decimal.setScale(4);
- group.append(measureDesc.getName(), Binary.fromByteArray(decimal.unscaledValue().toByteArray()));
+ group.append(measureDesc.getName(), Binary.fromReusedByteArray(decimal.unscaledValue().toByteArray()));
break;
default:
- group.append(measureDesc.getName(), Binary.fromByteArray(value, offset, length));
+ group.append(measureDesc.getName(), Binary.fromReusedByteArray(value, offset, length));
break;
}
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
index 625737a..f097a6a 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetJobSteps.java
@@ -22,7 +22,6 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.job.execution.AbstractExecutable;
-
/**
* Common steps for building cube into Parquet
*/
@@ -33,8 +32,7 @@ public abstract class ParquetJobSteps extends JobBuilderSupport {
}
public static String getCuboidOutputFileName(long cuboid, int partNum) {
- String fileName = "cuboid_" + cuboid + "_part" + partNum;
- return fileName;
+ return "cuboid_" + cuboid + "_part" + partNum;
}
- abstract public AbstractExecutable convertToParquetStep(String jobId);
+ public abstract AbstractExecutable convertToParquetStep(String jobId);
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
index f54a7a0..00f8e36 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMROutput.java
@@ -32,20 +32,14 @@ import org.apache.kylin.engine.mr.steps.HiveToBaseCuboidMapper;
import org.apache.kylin.engine.mr.steps.InMemCuboidMapper;
import org.apache.kylin.engine.mr.steps.NDCuboidMapper;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
-
/**
- * Created by Yichen on 10/16/18.
*/
public class ParquetMROutput implements IMROutput2 {
- private static final Logger logger = LoggerFactory.getLogger(ParquetMROutput.class);
-
@Override
public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg) {
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
index e1d0b0f..bfa5ce6 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/ParquetMRSteps.java
@@ -25,7 +25,6 @@ import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.execution.AbstractExecutable;
/**
- * Created by Yichen on 11/8/18.
*/
public class ParquetMRSteps extends ParquetJobSteps{
public ParquetMRSteps(CubeSegment seg) {
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
index 4ad7cb3..c6ea2ac 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
@@ -63,8 +63,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
-
-public class SparkCubeParquet extends AbstractApplication implements Serializable{
+public class SparkCubeParquet extends AbstractApplication implements Serializable {
protected static final Logger logger = LoggerFactory.getLogger(SparkCubeParquet.class);
@@ -78,12 +77,12 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
.isRequired(true).withDescription("Paqruet output path").create(BatchConstants.ARG_OUTPUT);
public static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Cuboid files PATH").create(BatchConstants.ARG_INPUT);
- public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
- .isRequired(true).withDescription("Counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
+ public static final Option OPTION_COUNTER_PATH = OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT)
+ .hasArg().isRequired(true).withDescription("Counter output path").create(BatchConstants.ARG_COUNTER_OUPUT);
private Options options;
- public SparkCubeParquet(){
+ public SparkCubeParquet() {
options = new Options();
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_CUBE_NAME);
@@ -107,17 +106,18 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
final String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
final String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
- Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Text.class, Group.class};
+ Class[] kryoClassArray = new Class[] { Class.forName("scala.reflect.ClassTag$$anon$1"), Text.class,
+ Group.class };
- SparkConf conf = new SparkConf().setAppName("Converting Parquet File for: " + cubeName + " segment " + segmentId);
+ SparkConf conf = new SparkConf()
+ .setAppName("Converting Parquet File for: " + cubeName + " segment " + segmentId);
//serialization conf
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.kylin.engine.spark.KylinKryoRegistrator");
conf.set("spark.kryo.registrationRequired", "true").registerKryoClasses(kryoClassArray);
-
KylinSparkJobListener jobListener = new KylinSparkJobListener();
- try (JavaSparkContext sc = new JavaSparkContext(conf)){
+ try (JavaSparkContext sc = new JavaSparkContext(conf)) {
sc.sc().addSparkListener(jobListener);
HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
@@ -147,10 +147,10 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
ParquetConvertor.generateTypeMap(baseCuboid, dimEncMap, cubeSegment.getCubeDesc(), colTypeMap, meaTypeMap);
GroupWriteSupport.setSchema(schema, job.getConfiguration());
- GenerateGroupRDDFunction groupPairFunction = new GenerateGroupRDDFunction(cubeName, cubeSegment.getUuid(), metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap);
-
+ GenerateGroupRDDFunction groupPairFunction = new GenerateGroupRDDFunction(cubeName, cubeSegment.getUuid(),
+ metaUrl, new SerializableConfiguration(job.getConfiguration()), colTypeMap, meaTypeMap);
- logger.info("Schema: {}", schema.toString());
+ logger.info("Schema: {}", schema);
// Read from cuboid and save to parquet
for (int level = 0; level <= totalLevels; level++) {
@@ -162,7 +162,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
Map<String, String> counterMap = Maps.newHashMap();
- counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
+ counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN,
+ String.valueOf(jobListener.metrics.getBytesWritten()));
// save counter to hdfs
HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, counterMap);
@@ -170,10 +171,12 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
}
- protected void saveToParquet(JavaPairRDD<Text, Text> rdd, GenerateGroupRDDFunction groupRDDFunction, CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws IOException {
- final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig, level);
+ protected void saveToParquet(JavaPairRDD<Text, Text> rdd, GenerateGroupRDDFunction groupRDDFunction,
+ CubeSegment cubeSeg, String parquetOutput, int level, Job job, KylinConfig kylinConfig) throws IOException {
+ final CuboidToPartitionMapping cuboidToPartitionMapping = new CuboidToPartitionMapping(cubeSeg, kylinConfig,
+ level);
- logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping.toString());
+ logger.info("CuboidToPartitionMapping: {}", cuboidToPartitionMapping);
String output = BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(parquetOutput, level);
@@ -182,7 +185,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
CustomParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);
CustomParquetOutputFormat.setCuboidToPartitionMapping(job, cuboidToPartitionMapping);
- JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping)).mapToPair(groupRDDFunction);
+ JavaPairRDD<Void, Group> groupRDD = rdd.partitionBy(new CuboidPartitioner(cuboidToPartitionMapping))
+ .mapToPair(groupRDDFunction);
groupRDD.saveAsNewAPIHadoopDataset(job.getConfiguration());
}
@@ -201,7 +205,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
@Override
public int getPartition(Object key) {
- Text textKey = (Text)key;
+ Text textKey = (Text) key;
return mapping.getPartitionByKey(textKey.getBytes());
}
}
@@ -210,16 +214,19 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
@Override
public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException {
- FileOutputCommitter committer = (FileOutputCommitter)this.getOutputCommitter(context);
+ FileOutputCommitter committer = (FileOutputCommitter) this.getOutputCommitter(context);
TaskID taskId = context.getTaskAttemptID().getTaskID();
int partition = taskId.getId();
- CuboidToPartitionMapping mapping = CuboidToPartitionMapping.deserialize(context.getConfiguration().get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
+ CuboidToPartitionMapping mapping = CuboidToPartitionMapping
+ .deserialize(context.getConfiguration().get(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING));
- return new Path(committer.getWorkPath(), getUniqueFile(context, mapping.getPartitionFilePrefix(partition)+ "-" + getOutputName(context), extension));
+ return new Path(committer.getWorkPath(), getUniqueFile(context,
+ mapping.getPartitionFilePrefix(partition) + "-" + getOutputName(context), extension));
}
- public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping) throws IOException {
+ public static void setCuboidToPartitionMapping(Job job, CuboidToPartitionMapping cuboidToPartitionMapping)
+ throws IOException {
String jsonStr = cuboidToPartitionMapping.serialize();
job.getConfiguration().set(BatchConstants.ARG_CUBOID_TO_PARTITION_MAPPING, jsonStr);
@@ -227,7 +234,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
}
static class GenerateGroupRDDFunction implements PairFunction<Tuple2<Text, Text>, Void, Group> {
- private volatile transient boolean initialized = false;
+ private transient volatile boolean initialized = false;
private String cubeName;
private String segmentId;
private String metaUrl;
@@ -237,7 +244,9 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
private transient ParquetConvertor convertor;
- public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
+ public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl,
+ SerializableConfiguration conf, Map<TblColRef, String> colTypeMap,
+ Map<MeasureDesc, String> meaTypeMap) {
this.cubeName = cubeName;
this.segmentId = segmentId;
this.metaUrl = metaurl;
@@ -253,9 +262,9 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
@Override
public Tuple2<Void, Group> call(Tuple2<Text, Text> tuple) throws Exception {
- if (initialized == false) {
+ if (!initialized) {
synchronized (SparkCubeParquet.class) {
- if (initialized == false) {
+ if (!initialized) {
init();
initialized = true;
}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
index a0c212c..88012c8 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
@@ -113,7 +113,7 @@ public class DebugTomcatClassLoader extends ParallelWebappClassLoader {
return sparkClassLoader.loadClass(name);
}
if (isParentCLPrecedent(name)) {
- logger.debug("Skipping exempt class " + name + " - delegating directly to parent");
+ logger.trace("Skipping exempt class " + name + " - delegating directly to parent");
return parent.loadClass(name);
}
return super.loadClass(name, resolve);
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
index 8fe211e..1cdfbc6 100644
--- a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
@@ -159,11 +159,11 @@ public class SparkClassLoader extends URLClassLoader {
// Check whether the class has already been loaded:
Class<?> clasz = findLoadedClass(name);
if (clasz != null) {
- logger.debug("Class " + name + " already loaded");
+ logger.trace("Class " + name + " already loaded");
} else {
try {
// Try to find this class using the URLs passed to this ClassLoader
- logger.debug("Finding class: " + name);
+ logger.trace("Finding class: " + name);
clasz = super.findClass(name);
if (clasz == null) {
logger.debug("cannot find class" + name);
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index 5c2a839..e01c3c7 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -230,8 +230,8 @@ public class CubeMigrationCLI extends AbstractApplication {
}
protected void renameFoldersInHdfs(CubeInstance cube) throws IOException {
- IStoragePathBuilder srcPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(srcConfig.getStorageSystemPathBuilderClz());
- IStoragePathBuilder dstPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(dstConfig.getStorageSystemPathBuilderClz());
+ IStoragePathBuilder srcPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(srcConfig.getStoragePathBuilder());
+ IStoragePathBuilder dstPathBuilder = (IStoragePathBuilder)ClassUtil.newInstance(dstConfig.getStoragePathBuilder());
for (CubeSegment segment : cube.getSegments()) {
diff --git a/webapp/app/js/model/cubeConfig.js b/webapp/app/js/model/cubeConfig.js
index 42e4c34..34358d7 100644
--- a/webapp/app/js/model/cubeConfig.js
+++ b/webapp/app/js/model/cubeConfig.js
@@ -29,7 +29,7 @@ KylinApp.constant('cubeConfig', {
],
storageTypes: [
{name: 'HBase', value: 2},
- {name: 'Parquet', value: 4}
+ {name: 'Parquet (alpha)', value: 4}
],
joinTypes: [
{name: 'Left', value: 'left'},
@@ -204,4 +204,4 @@ KylinApp.constant('cubeConfig', {
'left': '-12px'
}
}
-});
\ No newline at end of file
+});