You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/10 03:41:27 UTC
[kylin] 03/06: KYLIN-3625 Query engine for Parquet and apply CI for
Parquet
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch kylin-on-parquet
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c464a38d0a7864abf11e03e55d8b13038b451a12
Author: chao long <wa...@qq.com>
AuthorDate: Sat Sep 29 16:17:58 2018 +0800
KYLIN-3625 Query engine for Parquet and apply CI for Parquet
---
build/script/prepare-libs.sh | 2 +
.../java/org/apache/kylin/common/KylinConfig.java | 60 ++-
.../java/org/apache/kylin/common/QueryContext.java | 50 +++
.../org/apache/kylin/common/util/FileUtils.java | 31 +-
.../src/main/resources/kylin-defaults.properties | 6 +-
.../cube/gridtable/CuboidToGridTableMapping.java | 7 +
.../org/apache/kylin/cube/kv/RowKeyColumnIO.java | 6 +-
.../org/apache/kylin/cube/kv/RowKeyDecoder.java | 12 +-
.../apache/kylin/cube/kv/RowKeyDecoderParquet.java | 27 +-
.../java/org/apache/kylin/gridtable/GTRecord.java | 53 ++-
.../org/apache/kylin/gridtable/GTScanRequest.java | 33 +-
.../kylin/gridtable/GTScanRequestBuilder.java | 8 +-
.../java/org/apache/kylin/gridtable/GTUtil.java | 100 ++++-
.../apache/kylin/measure/MeasureTypeFactory.java | 11 +-
.../kylin/measure/bitmap/BitmapAggregator.java | 10 +-
.../apache/kylin/measure/hllc/HLLCAggregator.java | 10 +-
.../measure/percentile/PercentileAggregator.java | 10 +-
.../measure/percentile/PercentileMeasureType.java | 2 +
.../apache/kylin/measure/raw/RawAggregator.java | 11 +-
.../apache/kylin/measure/topn/TopNAggregator.java | 13 +-
.../org/apache/kylin/measure/topn/TopNCounter.java | 5 +
.../filter/BuiltInFunctionTupleFilter.java | 40 ++
.../kylin/metadata/filter/CaseTupleFilter.java | 15 +
.../kylin/metadata/filter/ColumnTupleFilter.java | 11 +
.../kylin/metadata/filter/CompareTupleFilter.java | 26 ++
.../kylin/metadata/filter/ConstantTupleFilter.java | 37 ++
.../kylin/metadata/filter/DynamicTupleFilter.java | 5 +
.../kylin/metadata/filter/ExtractTupleFilter.java | 5 +
.../kylin/metadata/filter/LogicalTupleFilter.java | 25 ++
.../apache/kylin/metadata/filter/TupleFilter.java | 45 ++
.../metadata/filter/UDF/MassInTupleFilter.java | 5 +
.../metadata/filter/UnsupportedTupleFilter.java | 5 +
.../apache/kylin/metadata/model/ParameterDesc.java | 2 +-
.../storage/gtrecord/CubeScanRangePlanner.java | 15 +-
.../apache/kylin/engine/mr/JobBuilderSupport.java | 4 +
.../kylin/engine/mr/steps/CuboidReducer.java | 6 +-
engine-spark/pom.xml | 5 +
.../engine/spark/SparkBatchCubingJobBuilder2.java | 10 +-
.../kylin/engine/spark/SparkCubingByLayer.java | 18 +-
.../engine/spark/SparkCubingByLayerParquet.java | 16 +-
.../java/org/apache/spark/sql/KylinSession.scala | 231 ++++++++++
.../java/org/apache/spark/sql/SparderEnv.scala | 114 +++++
.../sql/hive/KylinHiveSessionStateBuilder.scala | 64 +++
.../org/apache/spark/sql/manager/UdfManager.scala | 98 +++++
.../org/apache/spark/sql/udf/SparderAggFun.scala | 152 +++++++
.../apache/spark/sql/util/SparderTypeUtil.scala | 473 +++++++++++++++++++++
.../org/apache/spark/util/KylinReflectUtils.scala | 78 ++++
.../main/java/org/apache/spark/util/XmlUtils.scala | 111 +++++
.../test_case_data/webapps/META-INF/context.xml | 38 ++
kylin-it/jacoco-it.exec | Bin 0 -> 2432563 bytes
kylin-it/pom.xml | 246 ++++++-----
.../org/apache/kylin/jdbc/ITJDBCDriver2Test.java | 30 +-
.../kylin/provision/BuildCubeWithEngine.java | 83 ++--
.../org/apache/kylin/query/ITCombination2Test.java | 80 ++++
.../apache/kylin/query/ITFailfastQuery2Test.java | 84 ++++
.../org/apache/kylin/query/ITKylinQuery2Test.java | 115 +++++
.../org/apache/kylin/query/ITKylinQueryTest.java | 8 +-
kylin-test/pom.xml | 32 ++
.../main/java/org/apache/kylin/junit/EnvUtils.java | 64 +++
.../org/apache/kylin/junit/SparkTestRunner.java | 78 ++++
.../junit/SparkTestRunnerRunnerWithParameters.java | 147 +++++++
.../SparkTestRunnerWithParametersFactory.java | 35 +-
pom.xml | 18 +
.../apache/kylin/rest/init/InitialTaskManager.java | 26 ++
server/pom.xml | 7 +
.../java/org/apache/kylin/rest/DebugTomcat.java | 10 +
server/src/main/webapp/META-INF/context.xml | 38 ++
storage-parquet/pom.xml | 159 +++++++
.../kylin/storage/parquet/cube/CubeSparkRPC.java | 9 +
.../storage/parquet/cube/CubeStorageQuery.java | 3 +-
.../storage/parquet/spark/ParquetPayload.java | 222 ++++++++++
.../kylin/storage/parquet/spark/ParquetTask.java | 308 ++++++++++++++
.../storage/parquet/spark/SparkSubmitter.java | 42 ++
.../spark/gtscanner/ParquetRecordGTScanner.java | 105 +++++
.../gtscanner/ParquetRecordGTScanner4Cube.java | 64 +++
.../storage/parquet/steps/SparkCubeParquet.java | 46 +-
.../org/apache/kylin/ext/ClassLoaderUtils.java | 89 ++++
.../apache/kylin/ext/DebugTomcatClassLoader.java | 152 +++++++
.../java/org/apache/kylin/ext/ItClassLoader.java | 175 ++++++++
.../org/apache/kylin/ext/ItSparkClassLoader.java | 189 ++++++++
.../org/apache/kylin/ext/SparkClassLoader.java | 236 ++++++++++
.../org/apache/kylin/ext/TomcatClassLoader.java | 190 +++++++++
tool/pom.xml | 4 +
webapp/app/META-INF/context.xml | 38 ++
84 files changed, 4910 insertions(+), 343 deletions(-)
diff --git a/build/script/prepare-libs.sh b/build/script/prepare-libs.sh
index 789a120..45c8150 100644
--- a/build/script/prepare-libs.sh
+++ b/build/script/prepare-libs.sh
@@ -32,6 +32,7 @@ rm -rf build/lib build/tool
mkdir build/lib build/tool
cp assembly/target/kylin-assembly-${version}-job.jar build/lib/kylin-job-${version}.jar
cp storage-hbase/target/kylin-storage-hbase-${version}-coprocessor.jar build/lib/kylin-coprocessor-${version}.jar
+cp storage-parquet/target/kylin-storage-parquet-${version}-spark.jar build/lib/kylin-storage-parquet-${version}.jar
cp jdbc/target/kylin-jdbc-${version}.jar build/lib/kylin-jdbc-${version}.jar
cp tool-assembly/target/kylin-tool-assembly-${version}-assembly.jar build/tool/kylin-tool-${version}.jar
cp datasource-sdk/target/kylin-datasource-sdk-${version}-lib.jar build/lib/kylin-datasource-sdk-${version}.jar
@@ -39,6 +40,7 @@ cp datasource-sdk/target/kylin-datasource-sdk-${version}-lib.jar build/lib/kylin
# Copied file becomes 000 for some env (e.g. my Cygwin)
chmod 644 build/lib/kylin-job-${version}.jar
chmod 644 build/lib/kylin-coprocessor-${version}.jar
+chmod 644 build/lib/kylin-storage-parquet-${version}.jar
chmod 644 build/lib/kylin-jdbc-${version}.jar
chmod 644 build/tool/kylin-tool-${version}.jar
chmod 644 build/lib/kylin-datasource-sdk-${version}.jar
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 4a86b76..f3d3a29 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -18,6 +18,16 @@
package org.apache.kylin.common;
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.FileUtils;
+import org.apache.kylin.common.util.OrderedProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
@@ -35,16 +45,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.restclient.RestClient;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.OrderedProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
/**
*/
public class KylinConfig extends KylinConfigBase {
@@ -555,4 +555,44 @@ public class KylinConfig extends KylinConfigBase {
return this.base() == ((KylinConfig) another).base();
}
+ public Map<String, String> getSparkConf() {
+ return getPropertiesByPrefix("kylin.storage.columnar.spark-conf.");
+ }
+
+ public String getColumnarSparkEnv(String conf) {
+ return getPropertiesByPrefix("kylin.storage.columnar.spark-env.").get(conf);
+ }
+
+ public boolean isParquetSeparateFsEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.storage.columnar.separate-fs-enable", "false"));
+ }
+
+ public String getParquetSeparateOverrideFiles() {
+ return getOptional("kylin.storage.columnar.separate-override-files",
+ "core-site.xml,hdfs-site.xml,yarn-site.xml");
+ }
+
+ public String getSparkCubeGTStorage() {
+ return getOptional("kylin.storage.parquet.gtstorage",
+ "org.apache.kylin.storage.parquet.cube.CubeSparkRPC");
+ }
+
+ public boolean isParquetSparkCleanCachedRDDAfterUse() {
+ return Boolean.parseBoolean(getOptional("kylin.storage.parquet.clean-cached-rdd-after-use", "false"));
+ }
+
+ public String sparderJars() {
+ try {
+ File storageFile = FileUtils.findFile(KylinConfigBase.getKylinHome() + "/lib",
+ "kylin-storage-parquet-.*.jar");
+ String path1 = "";
+ if (storageFile != null) {
+ path1 = storageFile.getCanonicalPath();
+ }
+
+ return getOptional("kylin.query.parquet-additional-jars", path1);
+ } catch (IOException e) {
+ return "";
+ }
+ }
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index a065a13..49ffab1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -19,10 +19,12 @@
package org.apache.kylin.common;
import java.io.Serializable;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -45,6 +47,21 @@ public class QueryContext {
void stop(QueryContext query);
}
+ private static final ThreadLocal<QueryContext> contexts = new ThreadLocal<QueryContext>() {
+ @Override
+ protected QueryContext initialValue() {
+ return new QueryContext();
+ }
+ };
+
+ public static QueryContext current() {
+ return contexts.get();
+ }
+
+ public static void reset() {
+ contexts.remove();
+ }
+
private long queryStartMillis;
private final String queryId;
@@ -55,6 +72,10 @@ public class QueryContext {
private AtomicLong scannedBytes = new AtomicLong();
private Object calcitePlan;
+ private boolean isHighPriorityQuery = false;
+ private Set<Future> allRunningTasks = new HashSet<>();
+ private boolean isTimeout;
+
private AtomicBoolean isRunning = new AtomicBoolean(true);
private volatile Throwable throwable;
private String stopReason;
@@ -82,6 +103,35 @@ public class QueryContext {
}
}
+ public boolean isHighPriorityQuery() {
+ return isHighPriorityQuery;
+ }
+
+ public void markHighPriorityQuery() {
+ isHighPriorityQuery = true;
+ }
+
+ public void addRunningTasks(Future task) {
+ this.allRunningTasks.add(task);
+ }
+
+ public Set<Future> getAllRunningTasks() {
+ return allRunningTasks;
+ }
+
+ public void removeRunningTask(Future task) {
+ this.allRunningTasks.remove(task);
+ }
+
+ public boolean isTimeout() {
+ return isTimeout;
+ }
+
+ public void setTimeout(boolean timeout) {
+ isTimeout = timeout;
+ }
+
+
public String getQueryId() {
return queryId == null ? "" : queryId;
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
similarity index 52%
copy from storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
copy to core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
index 6a3ad59..dd9b9b5 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/FileUtils.java
@@ -16,28 +16,19 @@
* limitations under the License.
*/
-package org.apache.kylin.storage.parquet.cube;
+package org.apache.kylin.common.util;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
+import java.io.File;
-public class CubeStorageQuery extends GTCubeStorageQueryBase {
-
- public CubeStorageQuery(CubeInstance cube) {
- super(cube);
- }
-
- @Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- return super.search(context, sqlDigest, returnTupleInfo);
- }
-
- @Override
- protected String getGTStorage() {
+public final class FileUtils {
+ public static File findFile(String dir, String ptn) {
+ File[] files = new File(dir).listFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (f.getName().matches(ptn))
+ return f;
+ }
+ }
return null;
}
}
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 6238e44..8115a50 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -146,7 +146,7 @@ kylin.storage.partition.aggr-spill-enabled=true
# The maximum number of bytes each coprocessor is allowed to scan.
# To allow arbitrary large scan, you can set it to 0.
-kylin.storage.partition.max-scan-bytes=3221225472
+kylin.storage.partition.max-scan-bytes=0
# The default coprocessor timeout is (hbase.rpc.timeout * 0.9) / 1000 seconds,
# You can set it to a smaller value. 0 means use default.
@@ -361,8 +361,8 @@ kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=
kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
-kylin.storage.columnar.spark-conf.spark.driver.memory=512m
-kylin.storage.columnar.spark-conf.spark.executor.memory=512m
+kylin.storage.columnar.spark-conf.spark.driver.memory=1g
+kylin.storage.columnar.spark-conf.spark.executor.memory=1g
kylin.storage.columnar.spark-conf.spark.yarn.executor.memoryOverhead=512
kylin.storage.columnar.spark-conf.yarn.am.memory=512m
kylin.storage.columnar.spark-conf.spark.executor.cores=1
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 05256cc..9052e50 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -51,6 +51,7 @@ public class CuboidToGridTableMapping {
private int nDimensions;
private Map<TblColRef, Integer> dim2gt;
+ private Map<MeasureDesc, Integer> met2gt;
private ImmutableBitSet gtPrimaryKey;
private int nMetrics;
@@ -68,6 +69,7 @@ public class CuboidToGridTableMapping {
// dimensions
dim2gt = Maps.newHashMap();
+ met2gt = Maps.newHashMap();
BitSet pk = new BitSet();
for (TblColRef dimension : cuboid.getColumns()) {
gtDataTypes.add(dimension.getType());
@@ -96,6 +98,7 @@ public class CuboidToGridTableMapping {
// Ensure the holistic version if exists is always the first.
FunctionDesc func = measure.getFunction();
metrics2gt.put(func, gtColIdx);
+ met2gt.put(measure, gtColIdx);
gtDataTypes.add(func.getReturnDataType());
// map to column block
@@ -245,4 +248,8 @@ public class CuboidToGridTableMapping {
public Map<TblColRef, Integer> getDim2gt() {
return ImmutableMap.copyOf(dim2gt);
}
+
+ public Map<MeasureDesc, Integer> getMet2gt() {
+ return ImmutableMap.copyOf(met2gt);
+ }
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
index b0efc91..79cc233 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -56,11 +56,15 @@ public class RowKeyColumnIO implements java.io.Serializable {
dimEnc.encode(value, output, outputOffset);
}
- public String readColumnString(TblColRef col, byte[] bytes, int offset, int length) {
+ public String readColumnStringKeepDicValue(TblColRef col, byte[] bytes, int offset, int length) {
DimensionEncoding dimEnc = dimEncMap.get(col);
if (dimEnc instanceof DictionaryDimEnc)
return String.valueOf(BytesUtil.readUnsigned(bytes, offset, length));
return dimEnc.decode(bytes, offset, length);
}
+ public String readColumnString(TblColRef col, byte[] bytes, int offset, int length) {
+ DimensionEncoding dimEnc = dimEncMap.get(col);
+ return dimEnc.decode(bytes, offset, length);
+ }
}
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
index 71ad4bf..fde7f33 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoder.java
@@ -36,12 +36,12 @@ import org.apache.kylin.metadata.model.TblColRef;
*/
public class RowKeyDecoder {
- private final CubeDesc cubeDesc;
- private final RowKeyColumnIO colIO;
- private final RowKeySplitter rowKeySplitter;
+ protected final CubeDesc cubeDesc;
+ protected final RowKeyColumnIO colIO;
+ protected final RowKeySplitter rowKeySplitter;
- private Cuboid cuboid;
- private List<String> values;
+ protected Cuboid cuboid;
+ protected List<String> values;
public RowKeyDecoder(CubeSegment cubeSegment) {
this.cubeDesc = cubeSegment.getCubeDesc();
@@ -76,7 +76,7 @@ public class RowKeyDecoder {
this.cuboid = Cuboid.findForMandatory(cubeDesc, cuboidID);
}
- private void collectValue(TblColRef col, byte[] valueBytes, int offset, int length) throws IOException {
+ protected void collectValue(TblColRef col, byte[] valueBytes, int offset, int length) throws IOException {
String strValue = colIO.readColumnString(col, valueBytes, offset, length);
values.add(strValue);
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoderParquet.java
similarity index 52%
copy from storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
copy to core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoderParquet.java
index 6a3ad59..9b1f1a5 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyDecoderParquet.java
@@ -16,28 +16,21 @@
* limitations under the License.
*/
-package org.apache.kylin.storage.parquet.cube;
+package org.apache.kylin.cube.kv;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.TblColRef;
-public class CubeStorageQuery extends GTCubeStorageQueryBase {
+import java.io.IOException;
- public CubeStorageQuery(CubeInstance cube) {
- super(cube);
+public class RowKeyDecoderParquet extends RowKeyDecoder {
+ public RowKeyDecoderParquet(CubeSegment cubeSegment) {
+ super(cubeSegment);
}
@Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- return super.search(context, sqlDigest, returnTupleInfo);
- }
-
- @Override
- protected String getGTStorage() {
- return null;
+ protected void collectValue(TblColRef col, byte[] valueBytes, int offset, int length) throws IOException {
+ String strValue = colIO.readColumnStringKeepDicValue(col, valueBytes, offset, length);
+ values.add(strValue);
}
}
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index b4a57c7..d7be088 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -18,14 +18,23 @@
package org.apache.kylin.gridtable;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Comparator;
-
+import com.google.common.base.Preconditions;
import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.measure.bitmap.BitmapSerializer;
+import org.apache.kylin.measure.dim.DimCountDistincSerializer;
+import org.apache.kylin.measure.extendedcolumn.ExtendedColumnSerializer;
+import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.measure.percentile.PercentileSerializer;
+import org.apache.kylin.measure.raw.RawSerializer;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
-import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
public class GTRecord implements Comparable<GTRecord> {
@@ -103,6 +112,40 @@ public class GTRecord implements Comparable<GTRecord> {
return this;
}
+ /** set record to the codes of specified values, reuse given space to hold the codes */
+ public GTRecord setValuesParquet(ImmutableBitSet selectedCols, ByteArray space, Object... values) {
+ assert selectedCols.cardinality() == values.length;
+
+ ByteBuffer buf = space.asBuffer();
+ int pos = buf.position();
+ for (int i = 0; i < selectedCols.trueBitCount(); i++) {
+ int c = selectedCols.trueBitAt(i);
+
+ DataTypeSerializer serializer = info.codeSystem.getSerializer(c);
+ if (serializer instanceof DictionaryDimEnc.DictionarySerializer) {
+ int len = serializer.peekLength(buf);
+ BytesUtil.writeUnsigned((Integer) values[i], len, buf);
+ int newPos = buf.position();
+ cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+ pos = newPos;
+ } else if (serializer instanceof TopNCounterSerializer ||
+ serializer instanceof HLLCSerializer ||
+ serializer instanceof BitmapSerializer ||
+ serializer instanceof ExtendedColumnSerializer ||
+ serializer instanceof PercentileSerializer ||
+ serializer instanceof DimCountDistincSerializer ||
+ serializer instanceof RawSerializer) {
+ cols[c].reset((byte[]) values[i], 0, ((byte[]) values[i]).length);
+ } else {
+ info.codeSystem.encodeColumnValue(c, values[i], buf);
+ int newPos = buf.position();
+ cols[c].reset(buf.array(), buf.arrayOffset() + pos, newPos - pos);
+ pos = newPos;
+ }
+ }
+ return this;
+ }
+
/** decode and return the values of this record */
public Object[] getValues() {
return getValues(info.colAll, new Object[info.getColumnCount()]);
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index e3a2234..b6cf250 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -18,14 +18,9 @@
package org.apache.kylin.gridtable;
-import java.io.IOException;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ByteArray;
@@ -45,9 +40,13 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class GTScanRequest {
@@ -71,6 +70,7 @@ public class GTScanRequest {
// optional filtering
private TupleFilter filterPushDown;
+ private String filterPushDownSQL;
private TupleFilter havingFilterPushDown;
// optional aggregation
@@ -95,7 +95,7 @@ public class GTScanRequest {
GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, ImmutableBitSet rtAggrMetrics, //
ImmutableBitSet dynamicCols, Map<Integer, TupleExpression> tupleExpressionMap, //
- TupleFilter filterPushDown, TupleFilter havingFilterPushDown, //
+ TupleFilter filterPushDown, String filterPushDownSQL, TupleFilter havingFilterPushDown, //
boolean allowStorageAggregation, double aggCacheMemThreshold, int storageScanRowNumThreshold, //
int storagePushDownLimit, StorageLimitLevel storageLimitLevel, String storageBehavior, long startTime,
long timeout) {
@@ -107,6 +107,7 @@ public class GTScanRequest {
}
this.columns = dimensions;
this.filterPushDown = filterPushDown;
+ this.filterPushDownSQL = filterPushDownSQL;
this.havingFilterPushDown = havingFilterPushDown;
this.aggrGroupBy = aggrGroupBy;
@@ -318,6 +319,10 @@ public class GTScanRequest {
return filterPushDown;
}
+ public String getFilterPushDownSQL() {
+ return filterPushDownSQL;
+ }
+
public TupleFilter getHavingFilterPushDown() {
return havingFilterPushDown;
}
@@ -445,6 +450,7 @@ public class GTScanRequest {
BytesUtil.writeVLong(value.startTime, out);
BytesUtil.writeVLong(value.timeout, out);
BytesUtil.writeUTFString(value.storageBehavior, out);
+ BytesUtil.writeUTFString(value.filterPushDownSQL, out);
// for dynamic related info
ImmutableBitSet.serializer.serialize(value.dynamicCols, out);
@@ -499,6 +505,7 @@ public class GTScanRequest {
long startTime = BytesUtil.readVLong(in);
long timeout = BytesUtil.readVLong(in);
String storageBehavior = BytesUtil.readUTFString(in);
+ String filterPushDownSQL = BytesUtil.readUTFString(in);
ImmutableBitSet aDynCols = ImmutableBitSet.serializer.deserialize(in);
@@ -516,7 +523,7 @@ public class GTScanRequest {
.setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs)
.setRtAggrMetrics(aRuntimeAggrMetrics).setDynamicColumns(aDynCols)
.setExprsPushDown(sTupleExpressionMap)
- .setFilterPushDown(sGTFilter).setHavingFilterPushDown(sGTHavingFilter)
+ .setFilterPushDown(sGTFilter).setFilterPushDownSQL(filterPushDownSQL).setHavingFilterPushDown(sGTHavingFilter)
.setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB)
.setStorageScanRowNumThreshold(storageScanRowNumThreshold)
.setStoragePushDownLimit(storagePushDownLimit).setStorageLimitLevel(storageLimitLevel)
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
index 94a89e6..daf766d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -33,6 +33,7 @@ public class GTScanRequestBuilder {
private GTInfo info;
private List<GTScanRange> ranges;
private TupleFilter filterPushDown;
+ private String filterPushDownSQL;
private TupleFilter havingFilterPushDown;
private ImmutableBitSet dimensions;
private ImmutableBitSet aggrGroupBy = null;
@@ -80,6 +81,11 @@ public class GTScanRequestBuilder {
return this;
}
+ public GTScanRequestBuilder setFilterPushDownSQL(String filterPushDownSQL) {
+ this.filterPushDownSQL = filterPushDownSQL;
+ return this;
+ }
+
public GTScanRequestBuilder setHavingFilterPushDown(TupleFilter havingFilterPushDown) {
this.havingFilterPushDown = havingFilterPushDown;
return this;
@@ -180,7 +186,7 @@ public class GTScanRequestBuilder {
this.timeout = timeout == -1 ? 300000 : timeout;
return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, rtAggrMetrics,
- dynamicColumns, exprsPushDown, filterPushDown, havingFilterPushDown, allowStorageAggregation,
+ dynamicColumns, exprsPushDown, filterPushDown, filterPushDownSQL, havingFilterPushDown, allowStorageAggregation,
aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit, storageLimitLevel,
storageBehavior, startTime, timeout);
}
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
index 298225f..49c68c5 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTUtil.java
@@ -24,9 +24,17 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.dimension.AbstractDateDimEnc;
+import org.apache.kylin.dimension.DictionaryDimEnc;
+import org.apache.kylin.dimension.DimensionEncoding;
+import org.apache.kylin.dimension.FixedLenDimEnc;
+import org.apache.kylin.dimension.IntDimEnc;
+import org.apache.kylin.dimension.IntegerDimEnc;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
import org.apache.kylin.metadata.expression.TupleExpression;
import org.apache.kylin.metadata.expression.TupleExpressionSerializer;
import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -44,6 +52,8 @@ import org.apache.kylin.metadata.model.TblColRef;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import static org.apache.kylin.metadata.filter.ConstantTupleFilter.TRUE;
+
public class GTUtil {
private GTUtil(){}
@@ -76,8 +86,22 @@ public class GTUtil {
}
public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
- Map<TblColRef, Integer> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
- TupleFilter filter = convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);
+ Map<TblColRef, Integer> colMapping, Set<TblColRef> unevaluatableColumnCollector) {
+ return convertFilterColumnsAndConstants(rootFilter, info, colMapping, unevaluatableColumnCollector, false);
+ }
+
+ public static TupleFilter convertFilterColumnsAndConstants(TupleFilter rootFilter, GTInfo info, //
+ Map<TblColRef, Integer> colMapping, Set<TblColRef> unevaluatableColumnCollector, boolean isParquet) {
+ if (rootFilter == null) {
+ return null;
+ }
+
+ TupleFilter filter;
+ if (isParquet) {
+ filter = convertFilter(rootFilter, new GTConvertDecoratorParquet(unevaluatableColumnCollector, colMapping, info, true));
+ } else {
+ filter = convertFilter(rootFilter, info, colMapping, true, unevaluatableColumnCollector);
+ }
// optimize the filter: after translating with dictionary, some filters become determined
// e.g.
@@ -110,6 +134,20 @@ public class GTUtil {
return TupleFilterSerializer.deserialize(bytes, filterCodeSystem);
}
+ private static TupleFilter convertFilter(TupleFilter rootFilter, GTConvertDecorator decorator) {
+ rootFilter = decorator.onSerialize(rootFilter);
+ List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(rootFilter.getChildren().size());
+ if (rootFilter.hasChildren()) {
+ for (TupleFilter childFilter : rootFilter.getChildren()) {
+ newChildren.add(convertFilter(childFilter, decorator));
+
+ }
+ rootFilter.removeAllChildren();
+ rootFilter.addChildren(newChildren);
+ }
+ return rootFilter;
+ }
+
public static TupleExpression convertFilterColumnsAndConstants(TupleExpression rootExpression, GTInfo info,
CuboidToGridTableMapping mapping, Set<TblColRef> unevaluatableColumnCollector) {
Map<TblColRef, FunctionDesc> innerFuncMap = Maps.newHashMap();
@@ -183,6 +221,43 @@ public class GTUtil {
};
}
+ private static class GTConvertDecoratorParquet extends GTConvertDecorator {
+ public GTConvertDecoratorParquet(Set<TblColRef> unevaluatableColumnCollector, Map<TblColRef, Integer> colMapping,
+ GTInfo info, boolean encodeConstants) {
+ super(unevaluatableColumnCollector, colMapping, info, encodeConstants);
+ }
+
+ @Override
+ protected TupleFilter convertColumnFilter(ColumnTupleFilter columnFilter) {
+ return columnFilter;
+ }
+
+ @Override
+ protected Object translate(int col, Object value, int roundingFlag) {
+ try {
+ buf.clear();
+ DimensionEncoding dimEnc = info.codeSystem.getDimEnc(col);
+ info.codeSystem.encodeColumnValue(col, value, roundingFlag, buf);
+ DataTypeSerializer serializer = dimEnc.asDataTypeSerializer();
+ buf.flip();
+ if (dimEnc instanceof DictionaryDimEnc) {
+ int id = BytesUtil.readUnsigned(buf, dimEnc.getLengthOfEncoding());
+ return id;
+ } else if (dimEnc instanceof AbstractDateDimEnc) {
+ return Long.valueOf((String)serializer.deserialize(buf));
+ } else if (dimEnc instanceof FixedLenDimEnc) {
+ return serializer.deserialize(buf);
+ } else if (dimEnc instanceof IntegerDimEnc || dimEnc instanceof IntDimEnc) {
+ return Integer.valueOf((String)serializer.deserialize(buf));
+ } else {
+ return value;
+ }
+ } catch (IllegalArgumentException ex) {
+ return null;
+ }
+ }
+ }
+
protected static class GTConvertDecorator implements TupleFilterSerializer.Decorator {
protected final Set<TblColRef> unevaluatableColumnCollector;
protected final Map<TblColRef, Integer> colMapping;
@@ -190,7 +265,7 @@ public class GTUtil {
protected final boolean useEncodeConstants;
public GTConvertDecorator(Set<TblColRef> unevaluatableColumnCollector, Map<TblColRef, Integer> colMapping,
- GTInfo info, boolean encodeConstants) {
+ GTInfo info, boolean encodeConstants) {
this.unevaluatableColumnCollector = unevaluatableColumnCollector;
this.colMapping = colMapping;
this.info = info;
@@ -213,20 +288,18 @@ public class GTUtil {
// will always return FALSE.
if (filter.getOperator() == FilterOperatorEnum.NOT && !TupleFilter.isEvaluableRecursively(filter)) {
TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
- return ConstantTupleFilter.TRUE;
+ return TRUE;
}
// shortcut for unEvaluatable filter
if (!filter.isEvaluable()) {
TupleFilter.collectColumns(filter, unevaluatableColumnCollector);
- return ConstantTupleFilter.TRUE;
+ return TRUE;
}
// map to column onto grid table
if (colMapping != null && filter instanceof ColumnTupleFilter) {
- ColumnTupleFilter colFilter = (ColumnTupleFilter) filter;
- int gtColIdx = mapCol(colFilter.getColumn());
- return new ColumnTupleFilter(info.colRef(gtColIdx));
+ return convertColumnFilter((ColumnTupleFilter) filter);
}
// encode constants
@@ -237,6 +310,11 @@ public class GTUtil {
return filter;
}
+ protected TupleFilter convertColumnFilter(ColumnTupleFilter columnFilter) {
+ int gtColIdx = mapCol(columnFilter.getColumn());
+ return new ColumnTupleFilter(info.colRef(gtColIdx));
+ }
+
protected TupleFilter encodeConstants(CompareTupleFilter oldCompareFilter) {
// extract ColumnFilter & ConstantFilter
TblColRef externalCol = oldCompareFilter.getColumn();
@@ -261,7 +339,7 @@ public class GTUtil {
int col = colMapping == null ? externalCol.getColumnDesc().getZeroBasedIndex() : mapCol(externalCol);
TupleFilter result;
- ByteArray code;
+ Object code;
// translate constant into code
switch (newCompareFilter.getOperator()) {
@@ -353,7 +431,7 @@ public class GTUtil {
return result;
}
- private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef col, ByteArray code) {
+ private TupleFilter newCompareFilter(FilterOperatorEnum op, TblColRef col, Object code) {
CompareTupleFilter r = new CompareTupleFilter(op);
r.addChild(new ColumnTupleFilter(col));
r.addChild(new ConstantTupleFilter(code));
@@ -368,7 +446,7 @@ public class GTUtil {
transient ByteBuffer buf;
- protected ByteArray translate(int col, Object value, int roundingFlag) {
+ protected Object translate(int col, Object value, int roundingFlag) {
try {
buf.clear();
info.codeSystem.encodeColumnValue(col, value, roundingFlag, buf);
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index d16a705..621a13e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -18,10 +18,8 @@
package org.apache.kylin.measure;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigCannotInitException;
import org.apache.kylin.measure.basic.BasicMeasureType;
@@ -38,8 +36,9 @@ import org.apache.kylin.metadata.model.FunctionDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
/**
* Factory for MeasureType.
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
index 19fa49e..d57af48 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapAggregator.java
@@ -45,8 +45,14 @@ public class BitmapAggregator extends MeasureAggregator<BitmapCounter> {
@Override
public BitmapCounter aggregate(BitmapCounter value1, BitmapCounter value2) {
- value1.orWith(value2);
- return value1;
+ BitmapCounter merged = bitmapFactory.newBitmap();
+ if (value1 != null) {
+ merged.orWith(value1);
+ }
+ if (value2 != null) {
+ merged.orWith(value2);
+ }
+ return merged;
}
@Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
index 4e09265..a134f92 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
@@ -47,8 +47,14 @@ public class HLLCAggregator extends MeasureAggregator<HLLCounter> {
@Override
public HLLCounter aggregate(HLLCounter value1, HLLCounter value2) {
- value1.merge(value2);
- return value1;
+ if (value1 == null) {
+ return new HLLCounter(value2);
+ } else if (value2 == null) {
+ return new HLLCounter(value1);
+ }
+ HLLCounter result = new HLLCounter(value1);
+ result.merge(value2);
+ return result;
}
@Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
index ef8896a..b7185e3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileAggregator.java
@@ -43,8 +43,14 @@ public class PercentileAggregator extends MeasureAggregator<PercentileCounter> {
@Override
public PercentileCounter aggregate(PercentileCounter value1, PercentileCounter value2) {
- value1.merge(value2);
- return value1;
+ if (value1 == null) {
+ return new PercentileCounter(value2);
+ } else if (value2 == null) {
+ return new PercentileCounter(value1);
+ }
+ PercentileCounter merged = new PercentileCounter(value1);
+ merged.merge(value2);
+ return merged;
}
@Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
index 60b3282..9534ecf 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileMeasureType.java
@@ -108,4 +108,6 @@ public class PercentileMeasureType extends MeasureType<PercentileCounter> {
public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
return UDAF_MAP;
}
+
+
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
index 2ee36e3..9cc539a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawAggregator.java
@@ -21,6 +21,7 @@ package org.apache.kylin.measure.raw;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.measure.MeasureAggregator;
@@ -49,13 +50,15 @@ public class RawAggregator extends MeasureAggregator<List<ByteArray>> {
@Override
public List<ByteArray> aggregate(List<ByteArray> value1, List<ByteArray> value2) {
if (value1 == null) {
- return value2;
+ return Lists.newArrayList(value2);
} else if (value2 == null) {
- return value1;
+ return Lists.newArrayList(value1);
}
- value1.addAll(value2);
- return value1;
+ List<ByteArray> result = new ArrayList<>(value1.size() + value2.size());
+ result.addAll(value1);
+ result.addAll(value2);
+ return result;
}
@Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
index 34ceb9c..ec8bca7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
@@ -46,8 +46,17 @@ public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
@Override
public TopNCounter<ByteArray> aggregate(TopNCounter<ByteArray> value1, TopNCounter<ByteArray> value2) {
- value1.merge(value2);
- return value1;
+ if (value1 == null) {
+ return new TopNCounter<>(value2);
+ } else if (value2 == null) {
+ return new TopNCounter<>(value1);
+ }
+ int thisCapacity = value1.getCapacity();
+ TopNCounter<ByteArray> aggregated = new TopNCounter<>(thisCapacity * 2);
+ aggregated.merge(value1);
+ aggregated.merge(value2);
+ aggregated.retain(thisCapacity);
+ return aggregated;
}
@Override
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
index 932248d..84debeb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -59,6 +59,11 @@ public class TopNCounter<T> implements Iterable<Counter<T>>, java.io.Serializabl
counterList = Lists.newLinkedList();
}
+ public TopNCounter(TopNCounter another) {
+ this(another.capacity);
+ merge(another);
+ }
+
public int getCapacity() {
return capacity;
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
index 9082c1f..38cbd66 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/BuiltInFunctionTupleFilter.java
@@ -177,6 +177,46 @@ public class BuiltInFunctionTupleFilter extends FunctionTupleFilter {
}
@Override
+ public String toSparkSqlFilter() {
+ List<? extends TupleFilter> childFilter = this.getChildren();
+ String op = this.getName();
+ switch (op) {
+ case "LIKE":
+ assert childFilter.size() == 2;
+ return childFilter.get(0).toSparkSqlFilter() + toSparkFuncMap.get(op) + childFilter.get(1).toSparkSqlFilter();
+ case "||":
+ StringBuilder result = new StringBuilder().append(toSparkFuncMap.get(op)).append("(");
+ int index = 0;
+ for (TupleFilter filter : childFilter) {
+ result.append(filter.toSparkSqlFilter());
+ if (index < childFilter.size() - 1) {
+ result.append(",");
+ }
+ index ++;
+ }
+ result.append(")");
+ return result.toString();
+ case "LOWER":
+ case "UPPER":
+ case "CHAR_LENGTH":
+ assert childFilter.size() == 1;
+ return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSparkSqlFilter() + ")";
+ case "SUBSTRING":
+ assert childFilter.size() == 3;
+ return toSparkFuncMap.get(op) + "(" + childFilter.get(0).toSparkSqlFilter() + "," + childFilter.get(1).toSparkSqlFilter() + "," + childFilter.get(2).toSparkSqlFilter() + ")";
+ default:
+ if (childFilter.size() == 1) {
+ return op + "(" + childFilter.get(0).toSparkSqlFilter() + ")";
+ } else if (childFilter.size() == 2) {
+ return childFilter.get(0).toSparkSqlFilter() + op + childFilter.get(1).toSparkSqlFilter();
+ } else if (childFilter.size() == 3) {
+ return op + "(" + childFilter.get(0).toSparkSqlFilter() + "," + childFilter.get(1).toSparkSqlFilter() + "," + childFilter.get(2).toSparkSqlFilter() + ")";
+ }
+ throw new IllegalArgumentException("Operator " + op + " is not supported");
+ }
+ }
+
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
if (isReversed)
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
index 9083212..4305557 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CaseTupleFilter.java
@@ -134,6 +134,21 @@ public class CaseTupleFilter extends TupleFilter implements IOptimizeableTupleFi
}
@Override
+ public String toSparkSqlFilter() {
+ String result = "(case ";
+ TupleFilter whenFilter;
+ TupleFilter thenFilter;
+ for (int i = 0; i < this.getWhenFilters().size(); i++) {
+ whenFilter = this.getWhenFilters().get(i);
+ thenFilter = this.getThenFilters().get(i);
+ result += " when " + whenFilter.toSparkSqlFilter() + " then " + thenFilter.toSparkSqlFilter();
+ }
+ result += " else " + this.getElseFilter().toSparkSqlFilter();
+ result += " end)";
+ return result;
+ }
+
+ @Override
public TupleFilter acceptOptimizeTransformer(FilterOptimizeTransformer transformer) {
List<TupleFilter> newChildren = Lists.newArrayList();
for (TupleFilter child : this.getChildren()) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
index 6d3d541..09a16f5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ColumnTupleFilter.java
@@ -46,12 +46,18 @@ public class ColumnTupleFilter extends TupleFilter {
private TblColRef columnRef;
private Object tupleValue;
private List<Object> values;
+ private String colName;
public ColumnTupleFilter(TblColRef column) {
+ this(column, null);
+ }
+
+ public ColumnTupleFilter(TblColRef column, String colName) {
super(Collections.<TupleFilter> emptyList(), FilterOperatorEnum.COLUMN);
this.columnRef = column;
this.values = new ArrayList<Object>(1);
this.values.add(null);
+ this.colName = colName;
}
public TblColRef getColumn() {
@@ -155,4 +161,9 @@ public class ColumnTupleFilter extends TupleFilter {
this.columnRef = column.getRef();
}
}
+
+ @Override
+ public String toSparkSqlFilter() {
+ return this.columnRef.getTableAlias() + "_" + this.columnRef.getName();
+ }
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
index 1c1c409..b63ac0a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/CompareTupleFilter.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -279,6 +280,31 @@ public class CompareTupleFilter extends TupleFilter implements IOptimizeableTupl
}
@Override
+ public String toSparkSqlFilter() {
+ List<? extends TupleFilter> childFilter = this.getChildren();
+ switch (this.getOperator()) {
+ case EQ:
+ case NEQ:
+ case LT:
+ case GT:
+ case GTE:
+ case LTE:
+ assert childFilter.size() == 2;
+ return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator()) + childFilter.get(1).toSparkSqlFilter();
+ case IN:
+ case NOTIN:
+ assert childFilter.size() == 2;
+ return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator()) + "(" + childFilter.get(1).toSparkSqlFilter() + ")";
+ case ISNULL:
+ case ISNOTNULL:
+ assert childFilter.size() == 1;
+ return childFilter.get(0).toSparkSqlFilter() + toSparkOpMap.get(this.getOperator());
+ default:
+ throw new IllegalStateException("operator " + this.getOperator() + " not supported: ");
+ }
+ }
+
+ @Override
public TupleFilter acceptOptimizeTransformer(FilterOptimizeTransformer transformer) {
return transformer.visit(this);
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
index e4f8b2e..e9ecf16 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ConstantTupleFilter.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.filter;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
@@ -88,6 +89,10 @@ public class ConstantTupleFilter extends TupleFilter {
return this.constantValues;
}
+ public void setValues(List<Object> constantValues) {
+ this.constantValues = constantValues;
+ }
+
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void serialize(IFilterCodeSystem cs, ByteBuffer buffer) {
@@ -108,6 +113,38 @@ public class ConstantTupleFilter extends TupleFilter {
}
}
+ @Override
+ public String toSparkSqlFilter() {
+ if (this.equals(TRUE)) {
+ return "true";
+ } else if (this.equals(FALSE)) {
+ return "false";
+ }
+
+ StringBuilder sb = new StringBuilder("");
+
+ if (constantValues.isEmpty()) {
+ sb.append("null");
+ } else {
+ for (Object value : constantValues) {
+ if (value == null) {
+ sb.append("null");
+ }
+ if (value instanceof String) {
+ sb.append("'" + value + "'");
+ } else {
+ sb.append(value);
+ }
+ sb.append(",");
+ }
+ }
+ String result = sb.toString();
+ if (result.endsWith(",")) {
+ result = result.substring(0, result.length() - 1);
+ }
+ return result;
+ }
+
@Override public boolean equals(Object o) {
if (this == o)
return true;
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
index d9dc52a..c4490e5 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/DynamicTupleFilter.java
@@ -78,4 +78,9 @@ public class DynamicTupleFilter extends TupleFilter {
this.variableName = BytesUtil.readUTFString(buffer);
}
+ @Override
+ public String toSparkSqlFilter() {
+ return "1=1";
+ }
+
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
index 8c2ba94..36ea021 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -122,4 +122,9 @@ public class ExtractTupleFilter extends TupleFilter {
public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
}
+ @Override
+ public String toSparkSqlFilter() {
+ return "1=1";
+ }
+
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
index f0c825f..99cf3f3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -155,6 +155,31 @@ public class LogicalTupleFilter extends TupleFilter implements IOptimizeableTupl
}
@Override
+ public String toSparkSqlFilter() {
+ StringBuilder result = new StringBuilder("");
+ switch (this.getOperator()) {
+ case AND:
+ case OR:
+ result.append("(");
+ String op = toSparkOpMap.get(this.getOperator());
+ int index = 0;
+ for (TupleFilter filter : this.getChildren()) {
+ result.append(filter.toSparkSqlFilter());
+ if (index < this.getChildren().size() - 1) {
+ result.append(op);
+ }
+ index ++;
+ }
+ result.append(")");
+ break;
+ default:
+ throw new IllegalArgumentException("Operator " + this.getOperator() + " is not supported");
+ }
+
+ return result.toString();
+ }
+
+ @Override
public TupleFilter acceptOptimizeTransformer(FilterOptimizeTransformer transformer) {
List<TupleFilter> newChildren = Lists.newArrayList();
for (TupleFilter child : this.getChildren()) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
index 672aba0..28a8c6c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import com.google.common.collect.ImmutableMap;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
import org.slf4j.Logger;
@@ -34,6 +35,19 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.AND;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.EQ;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.GT;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.GTE;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.IN;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.ISNOTNULL;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.ISNULL;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.LT;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.LTE;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.NEQ;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.NOTIN;
+import static org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum.OR;
+
/**
*
* @author xjiang
@@ -43,6 +57,35 @@ public abstract class TupleFilter {
static final Logger logger = LoggerFactory.getLogger(TupleFilter.class);
+ public static final Map<TupleFilter.FilterOperatorEnum, String> toSparkOpMap = ImmutableMap.<TupleFilter.FilterOperatorEnum, String>builder()
+ .put(EQ, " = ")
+ .put(NEQ, " != ")
+ .put(LT, " < ")
+ .put(GT, " > ")
+ .put(GTE, " >= ")
+ .put(LTE, " <= ")
+ .put(ISNULL, " is null")
+ .put(ISNOTNULL, " is not null")
+ .put(IN, " in ")
+ .put(NOTIN, " not in ")
+ .put(AND, " and ")
+ .put(OR, " or ")
+ .build();
+
+ //TODO all function mapping
+ public static final Map<String, String> toSparkFuncMap = ImmutableMap.<String, String>builder()
+ .put("LOWER", "LOWER")
+ .put("UPPER", "UPPER")
+ .put("CHAR_LENGTH", "LENGTH")
+ .put("SUBSTRING", "SUBSTRING")
+ .put("LIKE", " LIKE ")
+ .put("||", "CONCAT")
+ .build();
+
+ public void removeAllChildren() {
+ this.children.clear();
+ }
+
public enum FilterOperatorEnum {
EQ(1), NEQ(2), GT(3), LT(4), GTE(5), LTE(6), ISNULL(7), ISNOTNULL(8), IN(9), NOTIN(10), AND(20), OR(21), NOT(22), COLUMN(30), CONSTANT(31), DYNAMIC(32), EXTRACT(33), CASE(34), FUNCTION(35), MASSIN(36), EVAL_FUNC(37), UNSUPPORTED(38);
@@ -378,4 +421,6 @@ public abstract class TupleFilter {
}
}
+ public abstract String toSparkSqlFilter();
+
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
index d0ff92e..b153003 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UDF/MassInTupleFilter.java
@@ -153,6 +153,11 @@ public class MassInTupleFilter extends FunctionTupleFilter {
reverse = Boolean.parseBoolean(BytesUtil.readUTFString(buffer));
}
+ @Override
+ public String toSparkSqlFilter() {
+ return "1=1";
+ }
+
public static boolean containsMassInTupleFilter(TupleFilter filter) {
if (filter == null)
return false;
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
index 85605d4..143ee6d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/UnsupportedTupleFilter.java
@@ -56,4 +56,9 @@ public class UnsupportedTupleFilter extends TupleFilter {
@Override
public void deserialize(IFilterCodeSystem<?> cs, ByteBuffer buffer) {
}
+
+ @Override
+ public String toSparkSqlFilter() {
+ return "1=1";
+ }
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index f757503..c3bb5a3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -195,7 +195,7 @@ public class ParameterDesc implements Serializable {
* 1. easy to compare without considering order
* 2. easy to compare one by one
*/
- private static class PlainParameter {
+ private static class PlainParameter implements Serializable{
private String type;
private String value;
private TblColRef colRef = null;
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
index 229ef01..60fe33f 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java
@@ -72,6 +72,8 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
protected CubeSegment cubeSegment;
protected CubeDesc cubeDesc;
protected Cuboid cuboid;
+ protected String filterPushDownSQL;
+ protected CuboidToGridTableMapping mapping;
public CubeScanRangePlanner(CubeSegment cubeSegment, Cuboid cuboid, TupleFilter filter, Set<TblColRef> dimensions, //
Set<TblColRef> groupByDims, List<TblColRef> dynGroupsDims, List<TupleExpression> dynGroupExprs, //
@@ -87,7 +89,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
this.cubeDesc = cubeSegment.getCubeDesc();
this.cuboid = cuboid;
- final CuboidToGridTableMapping mapping = context.getMapping();
+ mapping = context.getMapping();
this.gtInfo = CubeGridTable.newGTInfo(cuboid, new CubeDimEncMap(cubeSegment), mapping);
@@ -98,13 +100,21 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
this.rangeEndComparator = RecordComparators.getRangeEndComparator(comp);
//start key GTRecord compare to stop key GTRecord
this.rangeStartEndComparator = RecordComparators.getRangeStartEndComparator(comp);
-
//replace the constant values in filter to dictionary codes
Set<TblColRef> groupByPushDown = Sets.newHashSet(groupByDims);
groupByPushDown.addAll(dynGroupsDims);
+
this.gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getDim2gt(), groupByPushDown);
+
+ TupleFilter convertedFilter = GTUtil.convertFilterColumnsAndConstants(filter, gtInfo, mapping.getDim2gt(), groupByPushDown, true);
+
this.havingFilter = havingFilter;
+ if (convertedFilter != null) {
+ this.filterPushDownSQL = convertedFilter.toSparkSqlFilter();
+ logger.info("--filterPushDownSQL--: {}", this.filterPushDownSQL);
+ }
+
this.gtDimensions = mapping.makeGridTableColumns(dimensions);
this.gtAggrGroups = mapping.makeGridTableColumns(replaceDerivedColumns(groupByPushDown, cubeSegment.getCubeDesc()));
this.gtAggrMetrics = mapping.makeGridTableColumns(metrics);
@@ -171,6 +181,7 @@ public class CubeScanRangePlanner extends ScanRangePlannerBase {
scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions)
.setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs)
.setFilterPushDown(gtFilter)//
+ .setFilterPushDownSQL(filterPushDownSQL)
.setRtAggrMetrics(gtRtAggrMetrics).setDynamicColumns(gtDynColumns)
.setExprsPushDown(tupleExpressionMap)//
.setAllowStorageAggregation(context.isNeedStorageAggregation())
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index d21638a..fd11c99 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -287,6 +287,10 @@ public class JobBuilderSupport {
return getCuboidRootPath(seg.getLastBuildJobID());
}
+ public String getCuboidRootPath() {
+ return getCuboidRootPath(seg.getLastBuildJobID());
+ }
+
public void appendMapReduceParameters(StringBuilder buf) {
appendMapReduceParameters(buf, JobEngineConfig.DEFAULT_JOB_CONF_SUFFIX);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index a7fa2cd..e0652ca 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -39,7 +39,7 @@ import org.slf4j.LoggerFactory;
/**
* @author George Song (ysong1)
- *
+ *
*/
public class CuboidReducer extends KylinReducer<Text, Text, Text, Text> {
diff --git a/engine-spark/pom.xml b/engine-spark/pom.xml
index 26a3ad7..7bb4a52 100644
--- a/engine-spark/pom.xml
+++ b/engine-spark/pom.xml
@@ -83,6 +83,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-tomcat-ext</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 62ccf03..94333a2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -18,9 +18,6 @@
package org.apache.kylin.engine.spark;
-import java.util.HashMap;
-import java.util.Map;
-
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.util.StringUtil;
@@ -35,7 +32,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.kylin.metadata.model.IStorageAware.ID_PARQUET;
+import java.util.HashMap;
+import java.util.Map;
/**
*/
@@ -144,10 +142,6 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
StringUtil.appendWithSeparator(jars, seg.getConfig().getSparkAdditionalJars());
sparkExecutable.setJars(jars.toString());
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
-
- if (seg.getStorageType() == ID_PARQUET) {
- sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, getCounterOutputPath(jobId));
- }
}
public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) {
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index 99e6a67..0d5ee37 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,14 +17,6 @@
*/
package org.apache.kylin.engine.spark;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
import com.google.common.collect.Maps;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
@@ -69,13 +61,19 @@ import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import org.apache.spark.sql.SQLContext;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Tuple2;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
/**
* Spark application to build cube with the "by-layer" algorithm. Only support source data from Hive; Metadata in HBase.
*/
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
index d8fccf9..154c058 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayerParquet.java
@@ -56,6 +56,8 @@ import org.apache.kylin.measure.basic.BasicMeasureType;
import org.apache.kylin.measure.basic.BigDecimalIngester;
import org.apache.kylin.measure.basic.DoubleIngester;
import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.measure.percentile.PercentileSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.parquet.example.data.Group;
@@ -75,6 +77,7 @@ import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
@@ -264,6 +267,8 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
private Map<MeasureDesc, String> meaTypeMap;
private GroupFactory factory;
private BufferedMeasureCodec measureCodec;
+ private PercentileSerializer serializer;
+ private ByteBuffer byteBuffer;
public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
this.cubeName = cubeName;
@@ -284,6 +289,8 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
decoder = new RowKeyDecoder(cubeSegment);
factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+ serializer = new PercentileSerializer(DataType.getType("percentile(100)"));
+
}
@Override
@@ -319,7 +326,7 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
int valueOffset = 0;
for (int i = 0; i < valueLengths.length; ++i) {
MeasureDesc measureDesc = measureDescs.get(i);
- parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i]);
+ parseMeaValue(group, measureDesc, encodedBytes, valueOffset, valueLengths[i], tuple._2[i]);
valueOffset += valueLengths[i];
}
@@ -340,7 +347,7 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
}
}
- private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length) {
+ private void parseMeaValue(final Group group, final MeasureDesc measureDesc, final byte[] value, final int offset, final int length, final Object d) {
switch (meaTypeMap.get(measureDesc)) {
case "long":
group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
@@ -348,6 +355,11 @@ public class SparkCubingByLayerParquet extends SparkCubingByLayer {
case "double":
group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
break;
+ case "decimal":
+ BigDecimal decimal = (BigDecimal)d;
+ decimal = decimal.setScale(4);
+ group.append(measureDesc.getName(), Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray()));
+ break;
default:
group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
break;
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala b/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
new file mode 100644
index 0000000..20f86e5
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/KylinSession.scala
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import java.io.File
+import java.nio.file.Paths
+import java.util.Properties
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
+import org.apache.spark.sql.SparkSession.Builder
+import org.apache.spark.sql.internal.{SessionState, SharedState}
+import org.apache.spark.sql.manager.UdfManager
+import org.apache.spark.util.{KylinReflectUtils, XmlUtils}
+import org.apache.spark.{SparkConf, SparkContext}
+
+import scala.collection.JavaConverters._
+
+class KylinSession(
+ @transient val sc: SparkContext,
+ @transient private val existingSharedState: Option[SharedState])
+ extends SparkSession(sc) {
+
+ def this(sc: SparkContext) {
+ this(sc, None)
+ }
+
+// @transient
+// override lazy val sessionState: SessionState =
+// KylinReflectUtils.getSessionState(sc, this).asInstanceOf[SessionState]
+
+ override def newSession(): SparkSession = {
+ new KylinSession(sparkContext, Some(sharedState))
+ }
+}
+
+object KylinSession extends Logging {
+
+ implicit class KylinBuilder(builder: Builder) {
+ def getOrCreateKylinSession(): SparkSession = synchronized {
+ val options =
+ getValue("options", builder)
+ .asInstanceOf[scala.collection.mutable.HashMap[String, String]]
+ val userSuppliedContext: Option[SparkContext] =
+ getValue("userSuppliedContext", builder)
+ .asInstanceOf[Option[SparkContext]]
+ var session: SparkSession = SparkSession.getActiveSession match {
+ case Some(sparkSession: KylinSession) =>
+ if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+ options.foreach {
+ case (k, v) => sparkSession.sessionState.conf.setConfString(k, v)
+ }
+ sparkSession
+ } else {
+ null
+ }
+ case _ => null
+ }
+ if (session ne null) {
+ return session
+ }
+ // Global synchronization so we will only set the default session once.
+ SparkSession.synchronized {
+ // If the current thread does not have an active session, get it from the global session.
+ session = SparkSession.getDefaultSession match {
+ case Some(sparkSession: KylinSession) =>
+ if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
+ sparkSession
+ } else {
+ null
+ }
+ case _ => null
+ }
+ if (session ne null) {
+ return session
+ }
+ val sparkContext = userSuppliedContext.getOrElse {
+ // set app name if not given
+ val sparkConf = initSparkConf()
+ options.foreach { case (k, v) => sparkConf.set(k, v) }
+ val sc = SparkContext.getOrCreate(sparkConf)
+ // maybe this is an existing SparkContext, update its SparkConf which maybe used
+ // by SparkSession
+ sc
+ }
+ session = new KylinSession(sparkContext)
+ SparkSession.setDefaultSession(session)
+ sparkContext.addSparkListener(new SparkListener {
+ override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
+ SparkSession.setDefaultSession(null)
+ SparkSession.sqlListener.set(null)
+ }
+ })
+ UdfManager.create(session)
+ session
+ }
+ }
+
+ def getValue(name: String, builder: Builder): Any = {
+ val currentMirror = scala.reflect.runtime.currentMirror
+ val instanceMirror = currentMirror.reflect(builder)
+ val m = currentMirror
+ .classSymbol(builder.getClass)
+ .toType
+ .members
+ .find { p =>
+ p.name.toString.equals(name)
+ }
+ .get
+ .asTerm
+ instanceMirror.reflectField(m).get
+ }
+
+ private lazy val kylinConfig: KylinConfig = KylinConfig.getInstanceFromEnv
+
+ def initSparkConf(): SparkConf = {
+ val sparkConf = new SparkConf()
+
+ kylinConfig.getSparkConf.asScala.foreach {
+ case (k, v) =>
+ sparkConf.set(k, v)
+ }
+ if (KylinConfig.getInstanceFromEnv.isParquetSeparateFsEnabled) {
+ logInfo("ParquetSeparateFs is enabled : begin override read cluster conf to sparkConf")
+ addReadConfToSparkConf(sparkConf)
+ }
+ val instances = sparkConf.get("spark.executor.instances").toInt
+ val cores = sparkConf.get("spark.executor.cores").toInt
+ val sparkCores = instances * cores
+ if (sparkConf.get("spark.sql.shuffle.partitions", "").isEmpty) {
+ sparkConf.set("spark.sql.shuffle.partitions", sparkCores.toString)
+ }
+ sparkConf.set("spark.sql.session.timeZone", "UTC")
+ sparkConf.set("spark.debug.maxToStringFields", "1000")
+ sparkConf.set("spark.scheduler.mode", "FAIR")
+ if (new File(KylinConfig.getKylinConfDir.getCanonicalPath + "/fairscheduler.xml")
+ .exists()) {
+ val fairScheduler = KylinConfig.getKylinConfDir.getCanonicalPath + "/fairscheduler.xml"
+ sparkConf.set("spark.scheduler.allocation.file", fairScheduler)
+ }
+
+ if (!"true".equalsIgnoreCase(System.getProperty("spark.local"))) {
+ if("yarn-client".equalsIgnoreCase(sparkConf.get("spark.master"))){
+ sparkConf.set("spark.yarn.dist.jars", kylinConfig.sparderJars)
+ } else {
+ sparkConf.set("spark.jars", kylinConfig.sparderJars)
+ }
+
+ val filePath = KylinConfig.getInstanceFromEnv.sparderJars
+ .split(",")
+ .filter(p => p.contains("storage-parquet"))
+ .apply(0)
+ val fileName = filePath.substring(filePath.lastIndexOf('/') + 1)
+ sparkConf.set("spark.executor.extraClassPath", fileName)
+ }
+
+ sparkConf
+ }
+
+
+ /**
+ * For R/W Splitting
+ *
+ * @param sparkConf
+ * @return
+ */
+ def addReadConfToSparkConf(sparkConf: SparkConf): SparkConf = {
+ val readHadoopConfDir = kylinConfig
+ .getColumnarSparkEnv("HADOOP_CONF_DIR")
+ if (!new File(readHadoopConfDir).exists()) {
+ throw new IllegalArgumentException(
+ "kylin.storage.columnar.spark-env.HADOOP_CONF_DIR not found: " + readHadoopConfDir)
+ }
+ overrideHadoopConfigToSparkConf(readHadoopConfDir, sparkConf)
+ changeStagingDir(sparkConf, readHadoopConfDir)
+ sparkConf
+ }
+
+ private def changeStagingDir(sparkConf: SparkConf, readHadoopConf: String) = {
+ val coreProperties: Properties =
+ XmlUtils.loadProp(readHadoopConf + "/core-site.xml")
+ val path = new Path(coreProperties.getProperty("fs.defaultFS"))
+ val homePath =
+ path.getFileSystem(new Configuration()).getHomeDirectory.toString
+ sparkConf.set("spark.yarn.stagingDir", homePath)
+ }
+
+ private def overrideHadoopConfigToSparkConf(
+ readHadoopConf: String,
+ sparkConf: SparkConf): Unit = {
+ val overrideFiles =
+ kylinConfig.getParquetSeparateOverrideFiles.split(",")
+ overrideFiles.foreach { configFile =>
+ logInfo("find config file : " + configFile)
+ val cleanedPath = Paths.get(readHadoopConf, configFile)
+ val properties: Properties =
+ XmlUtils.loadProp(cleanedPath.toString)
+ properties
+ .entrySet()
+ .asScala
+ .filter(_.getValue.asInstanceOf[String].nonEmpty)
+ .foreach { entry =>
+ logInfo("override : " + entry)
+ sparkConf.set(
+ "spark.hadoop." + entry.getKey.asInstanceOf[String],
+ entry.getValue.asInstanceOf[String])
+ }
+ }
+ }
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala b/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
new file mode 100644
index 0000000..ecaa5c8
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/SparderEnv.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.apache.spark.internal.Logging;
+import org.apache.spark.sql.KylinSession._
+import org.apache.kylin.ext.ClassLoaderUtils;
+
+object SparderEnv extends Logging {
+ @volatile
+ private var spark: SparkSession = _
+
+ def getSparkSession: SparkSession = withClassLoad {
+ if (spark == null || spark.sparkContext.isStopped) {
+ logInfo("Init spark.")
+ initSpark()
+ }
+ spark
+ }
+
+ def isSparkAvailable: Boolean = {
+ spark != null && !spark.sparkContext.isStopped
+ }
+
+ def restartSpark(): Unit = withClassLoad {
+ this.synchronized {
+ if (spark != null && !spark.sparkContext.isStopped) {
+ spark.stop()
+ }
+
+ logInfo("Restart Spark")
+ init()
+ }
+ }
+
+ def init(): Unit = withClassLoad {
+ getSparkSession
+ }
+
+ def getSparkConf(key: String): String = {
+ getSparkSession.sparkContext.conf.get(key)
+ }
+
+ def getActiveJobs(): Int = {
+ SparderEnv.getSparkSession.sparkContext.jobProgressListener.activeJobs.size
+ }
+
+ def getFailedJobs(): Int = {
+ SparderEnv.getSparkSession.sparkContext.jobProgressListener.failedJobs.size
+ }
+
+ def getAsyncResultCore: Int = {
+ val sparkConf = getSparkSession.sparkContext.getConf
+ val instances = sparkConf.get("spark.executor.instances").toInt
+ val cores = sparkConf.get("spark.executor.cores").toInt
+ Math.round(instances * cores / 3)
+ }
+
+ def initSpark(): Unit = withClassLoad {
+ this.synchronized {
+ if (spark == null || spark.sparkContext.isStopped) {
+ val sparkSession = System.getProperty("spark.local") match {
+ case "true" =>
+ SparkSession.builder
+ .master("local")
+ .appName("test-local-sql-context")
+// .enableHiveSupport()
+ .getOrCreateKylinSession()
+ case _ =>
+ SparkSession.builder
+ .appName("test-sql-context")
+ .enableHiveSupport()
+ .getOrCreateKylinSession()
+ }
+ spark = sparkSession
+
+ logInfo("Spark context started successfully with stack trace:")
+ logInfo(Thread.currentThread().getStackTrace.mkString("\n"))
+ logInfo("Class loader: " + Thread.currentThread().getContextClassLoader.toString)
+ }
+ }
+ }
+
+ /**
+ * To avoid spark being affected by the environment, we use spark classloader load spark.
+ *
+ * @param body Somewhere if you use spark
+ * @tparam T Action function
+ * @return The body return
+ */
+ def withClassLoad[T](body: => T): T = {
+ val originClassLoad = Thread.currentThread().getContextClassLoader
+ Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader)
+ val t = body
+ Thread.currentThread().setContextClassLoader(originClassLoad)
+ t
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala b/engine-spark/src/main/java/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
new file mode 100644
index 0000000..c399381
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/hive/KylinHiveSessionStateBuilder.scala
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2016 Kyligence Inc. All rights reserved.
+ *
+ * http://kyligence.io
+ *
+ * This software is the confidential and proprietary information of
+ * Kyligence Inc. ("Confidential Information"). You shall not disclose
+ * such Confidential Information and shall use it only in accordance
+ * with the terms of the license agreement you entered into with
+ * Kyligence Inc.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.{BaseSessionStateBuilder, SessionState}
+
+/**
+ * hive session hava some rule exp: find datasource table rule
+ *
+ * @param sparkSession
+ * @param parentState
+ */
+class KylinHiveSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState] = None)
+ extends HiveSessionStateBuilder(sparkSession, parentState) {
+ experimentalMethods.extraOptimizations = {
+ Seq()
+ }
+
+ private def externalCatalog: HiveExternalCatalog =
+ session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]
+
+ override protected def newBuilder: NewBuilder =
+ new KylinHiveSessionStateBuilder(_, _)
+
+}
+
+/**
+ * use for no hive mode
+ *
+ * @param sparkSession
+ * @param parentState
+ */
+class KylinSessionStateBuilder(sparkSession: SparkSession, parentState: Option[SessionState] = None)
+ extends BaseSessionStateBuilder(sparkSession, parentState) {
+ experimentalMethods.extraOptimizations = {
+ Seq()
+ }
+
+ override protected def newBuilder: NewBuilder =
+ new KylinSessionStateBuilder(_, _)
+
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/manager/UdfManager.scala b/engine-spark/src/main/java/org/apache/spark/sql/manager/UdfManager.scala
new file mode 100644
index 0000000..499cbcd
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/manager/UdfManager.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.manager
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+import com.google.common.cache.{Cache, CacheBuilder, RemovalListener, RemovalNotification}
+import org.apache.kylin.metadata.datatype.DataType
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.udf.SparderAggFun
+
+class UdfManager(sparkSession: SparkSession) extends Logging {
+ private var udfCache: Cache[String, String] = _
+
+ udfCache = CacheBuilder.newBuilder
+ .maximumSize(100)
+ .expireAfterWrite(1, TimeUnit.HOURS)
+ .removalListener(new RemovalListener[String, String]() {
+ override def onRemoval(notification: RemovalNotification[String, String]): Unit = {
+ val func = notification.getKey
+ logInfo(s"remove function $func")
+ }
+ })
+ .build
+ .asInstanceOf[Cache[String, String]]
+
+ def destory(): Unit = {
+ udfCache.cleanUp()
+ }
+
+ def doRegister(dataType: DataType, funcName: String): String = {
+ val name = genKey(dataType, funcName)
+ val cacheFunc = udfCache.getIfPresent(name)
+ if (cacheFunc == null) {
+ udfCache.put(name, "")
+ sparkSession.udf.register(name, new SparderAggFun(funcName, dataType))
+ }
+ name
+ }
+
+ def genKey(dataType: DataType, funcName: String): String = {
+ dataType.toString
+ .replace("(", "_")
+ .replace(")", "_")
+ .replace(",", "_") + funcName
+ }
+
+}
+
+object UdfManager {
+
+ private val defaultManager = new AtomicReference[UdfManager]
+ private val defaultSparkSession: AtomicReference[SparkSession] =
+ new AtomicReference[SparkSession]
+
+ def refresh(sc: JavaSparkContext): Unit = {
+ val sparkSession = SparkSession.builder.config(sc.getConf).getOrCreate
+
+ defaultManager.get().destory()
+ create(sparkSession)
+ }
+
+ def create(sparkSession: SparkSession): Unit = {
+ val manager = new UdfManager(sparkSession)
+ defaultManager.set(manager)
+ defaultSparkSession.set(sparkSession)
+ }
+
+ def create(sc: JavaSparkContext): Unit = {
+ val sparkSession = SparkSession.builder.config(sc.getConf).getOrCreate
+ create(sparkSession)
+
+ }
+
+ def register(dataType: DataType, func: String): String = {
+ defaultManager.get().doRegister(dataType, func)
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/udf/SparderAggFun.scala b/engine-spark/src/main/java/org/apache/spark/sql/udf/SparderAggFun.scala
new file mode 100644
index 0000000..d84b31d
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/udf/SparderAggFun.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.udf
+
+import java.nio.ByteBuffer
+
+import org.apache.kylin.gridtable.GTInfo
+import org.apache.kylin.measure.MeasureAggregator
+import org.apache.kylin.metadata.datatype.DataTypeSerializer
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparderTypeUtil
+
+class SparderAggFun(funcName: String, dataTp: org.apache.kylin.metadata.datatype.DataType)
+ extends UserDefinedAggregateFunction
+ with Logging {
+
+ protected val _inputDataType = {
+ val schema = StructType(Seq(StructField("inputBinary", BinaryType)))
+ schema
+ }
+
+ protected val _bufferSchema: StructType = {
+ val schema = StructType(Seq(StructField("bufferBinary", BinaryType)))
+ schema
+ }
+
+ protected val _returnDataType: DataType =
+ SparderTypeUtil.kylinTypeToSparkResultType(dataTp)
+
+ protected var byteBuffer: ByteBuffer = null
+ protected var init = false
+ protected var gtInfo: GTInfo = _
+ protected var measureAggregator: MeasureAggregator[Any] = _
+ protected var colId: Int = _
+ protected var serializer: DataTypeSerializer[Any] = _
+ protected var aggregator: MeasureAggregator[Any] = _
+
+ var time = System.currentTimeMillis()
+
+ override def bufferSchema: StructType = _bufferSchema
+
+ override def inputSchema: StructType = _inputDataType
+
+ override def deterministic: Boolean = true
+
+
+ override def initialize(buffer: MutableAggregationBuffer): Unit = {
+ val isSum0 = (funcName == "$SUM0")
+ if (byteBuffer == null) {
+ serializer = DataTypeSerializer.create(dataTp).asInstanceOf[DataTypeSerializer[Any]]
+ byteBuffer = ByteBuffer.allocate(serializer.maxLength)
+ }
+
+ aggregator = MeasureAggregator
+ .create(if (isSum0) "COUNT" else funcName, dataTp)
+ .asInstanceOf[MeasureAggregator[Any]]
+
+ aggregator.reset()
+
+ val initVal = if (isSum0) {
+ // $SUM0 is the rewritten form of COUNT, which should return 0 instead of null in case of no input
+ byteBuffer.clear()
+ serializer.serialize(aggregator.getState, byteBuffer)
+ byteBuffer.array().slice(0, byteBuffer.position())
+ } else {
+ null
+ }
+ buffer.update(0, initVal)
+ }
+
+ val MAX_BUFFER_CAP: Int = 50 * 1024 * 1024
+
+ override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+ merge(buffer, input)
+ }
+
+ override def merge(buffer: MutableAggregationBuffer, input: Row): Unit = {
+ if (!input.isNullAt(0)) {
+ try {
+ val byteArray = input.apply(0).asInstanceOf[Array[Byte]]
+ if (byteArray.length == 0) {
+ return
+ }
+ val oldValue = if(buffer.isNullAt(0)) null else serializer.deserialize(ByteBuffer.wrap(buffer.apply(0).asInstanceOf[Array[Byte]]))
+ val newValue = serializer.deserialize(ByteBuffer.wrap(byteArray))
+
+ val aggedValue = aggregator.aggregate(oldValue, newValue)
+
+ if (aggedValue != null) {
+ byteBuffer.clear()
+ serializer.serialize(aggedValue, byteBuffer)
+ buffer.update(0, byteBuffer.array().slice(0, byteBuffer.position()))
+ }
+ } catch {
+ case e: Exception =>
+ throw new Exception(
+ "error data is: " + input
+ .apply(0)
+ .asInstanceOf[Array[Byte]]
+ .mkString(","),
+ e)
+ }
+ }
+ }
+
+ override def evaluate(buffer: Row): Any = {
+ if (buffer.isNullAt(0)) {
+ null
+ } else {
+ val ret = dataTp.getName match {
+ case dt if dt.startsWith("percentile") => buffer.apply(0).asInstanceOf[Array[Byte]]
+ case "hllc" => buffer.apply(0).asInstanceOf[Array[Byte]]
+ case "bitmap" => buffer.apply(0).asInstanceOf[Array[Byte]]
+ case "dim_dc" => buffer.apply(0).asInstanceOf[Array[Byte]]
+ case "extendedcolumn" => buffer.apply(0).asInstanceOf[Array[Byte]]
+ case "raw" => buffer.apply(0).asInstanceOf[Array[Byte]]
+ case t if t startsWith "top" => buffer.apply(0).asInstanceOf[Array[Byte]]
+ case _ => null
+ }
+
+ if (ret != null)
+ ret
+ else
+ throw new IllegalArgumentException("unsupported function")
+ }
+ }
+
+ override def toString: String = {
+ s"SparderAggFun@$funcName${dataType.toString}"
+ }
+
+ override def dataType: DataType = _returnDataType
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/sql/util/SparderTypeUtil.scala b/engine-spark/src/main/java/org/apache/spark/sql/util/SparderTypeUtil.scala
new file mode 100644
index 0000000..f2adbcc
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/sql/util/SparderTypeUtil.scala
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.spark.sql.util
+
+import java.math.BigDecimal
+import java.sql.Timestamp
+import java.util.Locale
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.sql.`type`.SqlTypeName
+import org.apache.kylin.common.util.DateFormat
+import org.apache.kylin.metadata.datatype.DataType
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types._
+
+object SparderTypeUtil extends Logging {
+ val DATETIME_FAMILY = List("time", "date", "timestamp", "datetime")
+
+ def isDateTimeFamilyType(dataType: String): Boolean = {
+ DATETIME_FAMILY.contains(dataType.toLowerCase(Locale.ROOT))
+ }
+
+ def isDateType(dataType: String): Boolean = {
+ "date".equalsIgnoreCase(dataType)
+ }
+
+ def isDateTime(sqlTypeName: SqlTypeName): Boolean = {
+ SqlTypeName.DATETIME_TYPES.contains(sqlTypeName)
+ }
+
+ // scalastyle:off
+ def kylinTypeToSparkResultType(dataTp: DataType): org.apache.spark.sql.types.DataType = {
+ dataTp.getName match {
+ case tp if tp.startsWith("hllc") => BinaryType
+ case tp if tp.startsWith("top") => BinaryType
+ case tp if tp.startsWith("percentile") => BinaryType
+ case tp if tp.startsWith("bitmap") => BinaryType
+ case "decimal" => DecimalType(dataTp.getPrecision, dataTp.getScale)
+ case "date" => IntegerType
+ case "time" => LongType
+ case "timestamp" => LongType
+ case "datetime" => LongType
+ case "tinyint" => ByteType
+ case "smallint" => ShortType
+ case "integer" => IntegerType
+ case "int4" => IntegerType
+ case "bigint" => LongType
+ case "long8" => LongType
+ case "float" => FloatType
+ case "double" => DoubleType
+ case "real" => DoubleType
+ case tp if tp.startsWith("varchar") => StringType
+ case tp if tp.startsWith("char") => StringType
+ case "bitmap" => LongType
+ case "dim_dc" => BinaryType
+ case "boolean" => BooleanType
+ case "extendedcolumn" => BinaryType
+ case "raw" => BinaryType
+ case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+ }
+ }
+
+ // scalastyle:off
+ def kylinSQLTypeToSparkType(dataTp: DataType): org.apache.spark.sql.types.DataType = {
+ dataTp.getName match {
+ case "decimal" => DecimalType(dataTp.getPrecision, dataTp.getScale)
+ case "date" => DateType
+ case "time" => DateType
+ case "timestamp" => TimestampType
+ case "datetime" => DateType
+ case "tinyint" => ByteType
+ case "smallint" => ShortType
+ case "integer" => IntegerType
+ case "int4" => IntegerType
+ case "bigint" => LongType
+ case "long8" => LongType
+ case "float" => FloatType
+ case "double" => DoubleType
+ case "real" => DoubleType
+ case tp if tp.startsWith("varchar") => StringType
+ case tp if tp.startsWith("char") => StringType
+ case "bitmap" => LongType
+ case "dim_dc" => LongType
+ case "boolean" => BooleanType
+ case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+ }
+ }
+
+ // scalastyle:off
+ def convertSqlTypeNameToSparkType(sqlTypeName: SqlTypeName): String = {
+ sqlTypeName match {
+ case SqlTypeName.DECIMAL => "decimal"
+ case SqlTypeName.CHAR => "string"
+ case SqlTypeName.VARCHAR => "string"
+ case SqlTypeName.INTEGER => "int"
+ case SqlTypeName.TINYINT => "byte"
+ case SqlTypeName.SMALLINT => "short"
+ case SqlTypeName.BIGINT => "long"
+ case SqlTypeName.FLOAT => "float"
+ case SqlTypeName.DOUBLE => "double"
+ case SqlTypeName.DATE => "date"
+ case SqlTypeName.TIMESTAMP => "timestamp"
+ case SqlTypeName.BOOLEAN => "boolean"
+ case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+ }
+ }
+
+ // scalastyle:off
+ def kylinCubeDataTypeToSparkType(dataTp: DataType): org.apache.spark.sql.types.DataType = {
+ dataTp.getName match {
+ case "decimal" => DecimalType(dataTp.getPrecision, dataTp.getScale)
+ case "date" => DateType
+ case "time" => DateType
+ case "timestamp" => TimestampType
+ case "datetime" => DateType
+ case "tinyint" => ByteType
+ case "smallint" => ShortType
+ case "integer" => IntegerType
+ case "int4" => IntegerType
+ case "bigint" => LongType
+ case "long8" => LongType
+ case "float" => FloatType
+ case "double" => DoubleType
+ case "real" => DoubleType
+ case tp if tp.startsWith("varchar") => StringType
+ case tp if tp.startsWith("char") => StringType
+ case "bitmap" => LongType
+ case "dim_dc" => LongType
+ case "boolean" => BooleanType
+ case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+ }
+ }
+
+ // for reader
+ // scalastyle:off
+ def kylinDimensionDataTypeToSparkType(dataTp: String): org.apache.spark.sql.types.DataType = {
+ dataTp match {
+ case "string" => StringType
+ case "date" => LongType
+ case "time" => LongType
+ case "timestamp" => LongType
+ case "datetime" => LongType
+ case "tinyint" => ByteType
+ case "smallint" => ShortType
+ case "integer" => IntegerType
+ case "int4" => IntegerType
+ case "bigint" => LongType
+ case "long8" => LongType
+ case "float" => FloatType
+ case "double" => DoubleType
+ case "real" => DoubleType
+ case tp if tp.startsWith("varchar") => StringType
+ case tp if tp.startsWith("char") => StringType
+ case "bitmap" => LongType
+ case "dim_dc" => LongType
+ case "decimal" => DecimalType(19, 4)
+ case tp if tp.startsWith("decimal") && tp.contains("(") => {
+ try {
+ val precisionAndScale = tp.replace("decimal", "").replace("(", "").replace(")", "").split(",")
+ DataTypes.createDecimalType(precisionAndScale(0).toInt, precisionAndScale(1).toInt)
+ } catch {
+ case e: Exception =>
+ throw new IllegalArgumentException(s"Unsupported data type : $tp", e)
+ }
+ }
+ case "boolean" => BooleanType
+ case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+ }
+ }
+
+ // scalastyle:off
+ def kylinRawTableSQLTypeToSparkType(dataTp: DataType): org.apache.spark.sql.types.DataType = {
+ dataTp.getName match {
+ case "decimal" => DecimalType(dataTp.getPrecision, dataTp.getScale)
+ case "date" => DateType
+ case "time" => DateType
+ case "timestamp" => TimestampType
+ case "datetime" => DateType
+ case "tinyint" => ByteType
+ case "smallint" => ShortType
+ case "integer" => IntegerType
+ case "int4" => IntegerType
+ case "bigint" => LongType
+ case "long8" => LongType
+ case "float" => FloatType
+ case "double" => DoubleType
+ case "real" => DoubleType
+ case tp if tp.startsWith("char") => StringType
+ case tp if tp.startsWith("varchar") => StringType
+ case "bitmap" => LongType
+ case "dim_dc" => LongType
+ case "boolean" => BooleanType
+ case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+ }
+ }
+
+
+ def convertSqlTypeToSparkType(dt: RelDataType): org.apache.spark.sql.types.DataType = {
+ dt.getSqlTypeName match {
+ case SqlTypeName.DECIMAL => DecimalType(dt.getPrecision, dt.getScale)
+ case SqlTypeName.CHAR => StringType
+ case SqlTypeName.VARCHAR => StringType
+ case SqlTypeName.INTEGER => IntegerType
+ case SqlTypeName.TINYINT => ByteType
+ case SqlTypeName.SMALLINT => ShortType
+ case SqlTypeName.BIGINT => LongType
+ case SqlTypeName.FLOAT => FloatType
+ case SqlTypeName.DOUBLE => DoubleType
+ case SqlTypeName.DATE => DateType
+ case SqlTypeName.TIMESTAMP => TimestampType
+ case SqlTypeName.BOOLEAN => BooleanType
+ case noSupport => throw new IllegalArgumentException(s"No supported data type: $noSupport")
+ }
+ }
+
+ // scalastyle:off
+ def convertStringToValue(s: Any, rowType: RelDataType, toCalcite: Boolean): Any = {
+ val sqlTypeName = rowType.getSqlTypeName
+ if (s == null) {
+ null
+ } else if (s.toString.isEmpty) {
+ val a: Any = sqlTypeName match {
+ case SqlTypeName.DECIMAL => new java.math.BigDecimal(0)
+ case SqlTypeName.CHAR => s.toString
+ case SqlTypeName.VARCHAR => s.toString
+ case SqlTypeName.INTEGER => 0
+ case SqlTypeName.TINYINT => 0.toByte
+ case SqlTypeName.SMALLINT => 0.toShort
+ case SqlTypeName.BIGINT => 0L
+ case SqlTypeName.FLOAT => 0f
+ case SqlTypeName.DOUBLE => 0d
+ case SqlTypeName.DATE => 0
+ case SqlTypeName.TIMESTAMP => 0L
+ case SqlTypeName.TIME => 0L
+ case SqlTypeName.BOOLEAN => null;
+ case null => null
+ case _ => null
+ }
+ } else {
+ try {
+ val a: Any = sqlTypeName match {
+ case SqlTypeName.DECIMAL =>
+ if (s.isInstanceOf[java.lang.Double] || s
+ .isInstanceOf[java.lang.Float] || s.toString.contains(".")) {
+ new java.math.BigDecimal(s.toString)
+ .setScale(rowType.getScale, BigDecimal.ROUND_HALF_EVEN)
+ } else {
+ new java.math.BigDecimal(s.toString)
+ }
+ case SqlTypeName.CHAR => s.toString
+ case SqlTypeName.VARCHAR => s.toString
+ case SqlTypeName.INTEGER => s.toString.toInt
+ case SqlTypeName.TINYINT => s.toString.toByte
+ case SqlTypeName.SMALLINT => s.toString.toShort
+ case SqlTypeName.BIGINT => s.toString.toLong
+ case SqlTypeName.FLOAT => java.lang.Float.parseFloat(s.toString)
+ case SqlTypeName.DOUBLE => java.lang.Double.parseDouble(s.toString)
+ case SqlTypeName.DATE => {
+ // time over here is with timezone.
+ val string = s.toString
+ if (string.contains("-")) {
+ val time = DateFormat.stringToDate(string).getTime
+ if (toCalcite) {
+ (time / (3600 * 24 * 1000)).toInt
+ } else {
+ // ms to s
+ time / 1000
+ }
+ } else {
+ // should not come to here?
+ if (toCalcite) {
+ (toCalciteTimestamp(DateFormat.stringToMillis(string)) / (3600 * 24 * 1000)).toInt
+ } else {
+ DateFormat.stringToMillis(string)
+ }
+ }
+ }
+ case SqlTypeName.TIMESTAMP | SqlTypeName.TIME => {
+ var ts = s.asInstanceOf[Timestamp].getTime
+ if (toCalcite) {
+ ts
+ } else {
+ // ms to s
+ ts / 1000
+ }
+ }
+ case SqlTypeName.BOOLEAN => s;
+ case _ => s.toString
+ }
+ a
+ } catch {
+ case th: Throwable =>
+ logError(s"Error for convert value : $s , class: ${s.getClass}", th)
+ safetyConvertStringToValue(s, rowType, toCalcite)
+ }
+ }
+ }
+
+ def safetyConvertStringToValue(s: Any, rowType: RelDataType, toCalcite: Boolean): Any = {
+ try {
+ rowType.getSqlTypeName match {
+ case SqlTypeName.DECIMAL =>
+ if (s.isInstanceOf[java.lang.Double] || s
+ .isInstanceOf[java.lang.Float] || s.toString.contains(".")) {
+ new java.math.BigDecimal(s.toString)
+ .setScale(rowType.getScale, BigDecimal.ROUND_HALF_EVEN)
+ } else {
+ new java.math.BigDecimal(s.toString)
+ }
+ case SqlTypeName.CHAR => s.toString
+ case SqlTypeName.VARCHAR => s.toString
+ case SqlTypeName.INTEGER => s.toString.toDouble.toInt
+ case SqlTypeName.TINYINT => s.toString.toDouble.toByte
+ case SqlTypeName.SMALLINT => s.toString.toDouble.toShort
+ case SqlTypeName.BIGINT => s.toString.toDouble.toLong
+ case SqlTypeName.FLOAT => java.lang.Float.parseFloat(s.toString)
+ case SqlTypeName.DOUBLE => java.lang.Double.parseDouble(s.toString)
+ case SqlTypeName.DATE => {
+ // time over here is with timezone.
+ val string = s.toString
+ if (string.contains("-")) {
+ val time = DateFormat.stringToDate(string).getTime
+ if (toCalcite) {
+ (time / (3600 * 24 * 1000)).toInt
+ } else {
+ // ms to s
+ time / 1000
+ }
+ } else {
+ // should not come to here?
+ if (toCalcite) {
+ (toCalciteTimestamp(DateFormat.stringToMillis(string)) / (3600 * 24 * 1000)).toInt
+ } else {
+ DateFormat.stringToMillis(string)
+ }
+ }
+ }
+ case SqlTypeName.TIMESTAMP | SqlTypeName.TIME => {
+ var ts = s.asInstanceOf[Timestamp].getTime
+ if (toCalcite) {
+ ts
+ } else {
+ // ms to s
+ ts / 1000
+ }
+ }
+ case SqlTypeName.BOOLEAN => s;
+ case _ => s.toString
+ }
+ } catch {
+ case th: Throwable =>
+ throw new RuntimeException(s"Error for convert value : $s , class: ${s.getClass}", th)
+ }
+ }
+
+ // scalastyle:off
+ def convertStringToResultValue(s: Any, rowType: String, toCalcite: Boolean): Any = {
+ if (s == null) {
+ val a: Any = rowType match {
+ case "DECIMAL" => new java.math.BigDecimal(0)
+ case "CHAR" => null
+ case "VARCHAR" => null
+ case "INTEGER" => 0
+ case "TINYINT" => 0.toByte
+ case "SMALLINT" => 0.toShort
+ case "BIGINT" => 0L
+ case "FLOAT" => 0f
+ case "DOUBLE" => 0d
+ case "DATE" => 0
+ case "TIMESTAMP" => 0L
+ case "TIME" => 0L
+ case "BOOLEAN" => null
+ case null => null
+ case _ => null
+ }
+ a
+ } else {
+ try {
+ val a: Any = rowType match {
+ case "DECIMAL" => new java.math.BigDecimal(s.toString)
+ case "CHAR" => s.toString
+ case "VARCHAR" => s.toString
+ case "INTEGER" => s.toString.toInt
+ case "TINYINT" => s.toString.toByte
+ case "SMALLINT" => s.toString.toShort
+ case "BIGINT" => s.toString.toLong
+ case "FLOAT" => java.lang.Float.parseFloat(s.toString)
+ case "DOUBLE" => java.lang.Double.parseDouble(s.toString)
+ case "DATE" => {
+ if (toCalcite)
+ DateFormat.formatToDateStr(DateFormat.stringToMillis(s.toString))
+ else
+ DateFormat.stringToMillis(s.toString)
+ }
+ case "TIMESTAMP" | "TIME" =>
+ DateFormat.formatToTimeStr(s.asInstanceOf[Timestamp].getTime)
+ case "BOOLEAN" => s
+ case _ => s.toString
+ }
+ a
+ } catch {
+ case th: Throwable =>
+ logError(s"Error for convert value : $s , class: ${s.getClass}", th)
+ throw th
+ }
+ }
+ }
+
+ def convertRowToRow(
+ rows: Iterator[Row],
+ typeMap: Map[Int, String],
+ separator: String): Iterator[String] = {
+ rows.map {
+ row =>
+ var rowIndex = 0
+ row.toSeq
+ .map {
+ cell => {
+ val rType = typeMap.apply(rowIndex)
+ val value =
+ SparderTypeUtil
+ .convertStringToResultValue(cell, rType, toCalcite = true)
+
+ rowIndex = rowIndex + 1
+ if (value == null) {
+ ""
+ } else {
+ value
+ }
+ }
+ }
+ .mkString(separator)
+ }
+
+ }
+
+ // ms to second
+ def toSparkTimestamp(calciteTimestamp: Long): java.lang.Long = {
+ calciteTimestamp / 1000
+ }
+
+ // ms to microsecond, spark need micro sec.
+ def toSparkMicrosecond(calciteTimestamp: Long): java.lang.Long = {
+ calciteTimestamp * 1000
+ }
+
+ // ms to day
+ def toSparkDate(calciteTimestamp: Long): java.lang.Integer = {
+ (calciteTimestamp / 1000 / 3600 / 24).toInt
+ }
+
+ def toCalciteTimestamp(sparkTimestamp: Long): Long = {
+ sparkTimestamp * 1000
+ }
+
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/util/KylinReflectUtils.scala b/engine-spark/src/main/java/org/apache/spark/util/KylinReflectUtils.scala
new file mode 100644
index 0000000..a139273
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/util/KylinReflectUtils.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.lang.reflect.Field
+
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
+import org.apache.spark.{SPARK_VERSION, SparkContext}
+
+import scala.reflect.runtime._
+import scala.reflect.runtime.universe._
+
+object KylinReflectUtils {
+ private val rm = universe.runtimeMirror(getClass.getClassLoader)
+
+
+ def getSessionState(sparkContext: SparkContext, kylinSession: Object): Any = {
+ if (SPARK_VERSION.startsWith("2.2")) {
+ var className: String =
+ "org.apache.spark.sql.hive.KylinHiveSessionStateBuilder"
+ if (!"hive".equals(sparkContext.getConf
+ .get(CATALOG_IMPLEMENTATION.key, "in-memory"))) {
+ className = "org.apache.spark.sql.hive.KylinSessionStateBuilder"
+ }
+ val tuple = createObject(className, kylinSession, None)
+ val method = tuple._2.getMethod("build")
+ method.invoke(tuple._1)
+ } else {
+ throw new UnsupportedOperationException("Spark version not supported")
+ }
+ }
+
+ /**
+ * Returns the field val from a object through reflection.
+ *
+ * @param name - name of the field being retrieved.
+ * @param obj - Object from which the field has to be retrieved.
+ * @tparam T
+ * @return
+ */
+ def getField[T: TypeTag : reflect.ClassTag](name: String, obj: T): Any = {
+ val im = rm.reflect(obj)
+
+ im.symbol.typeSignature.members.find(_.name.toString.equals(name))
+ .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
+ }
+
+
+ def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
+ val clazz = Utils.classForName(className)
+ val ctor = clazz.getConstructors.head
+ ctor.setAccessible(true)
+ (ctor.newInstance(conArgs: _*), clazz)
+ }
+
+ def getObjectField(clazz: Class[_], filedName: String): Field = {
+ val field = clazz.getDeclaredField(filedName)
+ field.setAccessible(true)
+ field
+
+ }
+}
diff --git a/engine-spark/src/main/java/org/apache/spark/util/XmlUtils.scala b/engine-spark/src/main/java/org/apache/spark/util/XmlUtils.scala
new file mode 100644
index 0000000..6d3f0cd
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/spark/util/XmlUtils.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.{BufferedInputStream, File, FileInputStream, InputStream}
+import java.util.Properties
+
+import javax.xml.parsers.{DocumentBuilder, DocumentBuilderFactory}
+import org.apache.hadoop.util.StringInterner
+import org.slf4j.LoggerFactory
+import org.w3c.dom.{Document, Element, Text}
+
+import scala.collection.immutable.Range
+
+object XmlUtils {
+
+ private val logger = LoggerFactory.getLogger(XmlUtils.getClass)
+
+ def loadProp(path: String): Properties = {
+ val docBuilderFactory = DocumentBuilderFactory.newInstance
+ docBuilderFactory.setIgnoringComments(true)
+
+ // allow includes in the xml file
+ docBuilderFactory.setNamespaceAware(true)
+ try docBuilderFactory.setXIncludeAware(true)
+ catch {
+ case e: UnsupportedOperationException =>
+ }
+ val builder = docBuilderFactory.newDocumentBuilder
+ var doc: Document = null
+ var root: Element = null
+ val properties = new Properties()
+ val file = new File(path).getAbsoluteFile
+ if (file.exists) {
+ doc = parse(builder, new BufferedInputStream(new FileInputStream(file)), path)
+ } else {
+ throw new RuntimeException("File not found in path: " + path)
+ }
+ root = doc.getDocumentElement
+ if (!("configuration" == root.getTagName)) {
+ logger.error("bad conf file: top-level element not <configuration>")
+ }
+
+ val props = root.getChildNodes
+ for (i <- Range(0, props.getLength)) {
+ val propNode = props.item(i)
+
+ propNode match {
+ case prop: Element =>
+ if ("configuration".equals(prop.getTagName())) {}
+ if (!"property".equals(prop.getTagName())) {
+ logger.warn("bad conf file: element not <property>")
+ }
+
+ val fields = prop.getChildNodes
+ var attr: String = null
+ var value: String = null
+ var finalParameter: Boolean = false
+ val source: java.util.LinkedList[String] =
+ new java.util.LinkedList[String]
+ for (j <- Range(0, fields.getLength)) {
+ val fieldNode = fields.item(j)
+ fieldNode match {
+ case field: Element =>
+ if ("name" == field.getTagName && field.hasChildNodes) {
+ attr =
+ StringInterner.weakIntern(field.getFirstChild.asInstanceOf[Text].getData.trim)
+ }
+ if ("value" == field.getTagName && field.hasChildNodes) {
+ value = StringInterner.weakIntern(field.getFirstChild.asInstanceOf[Text].getData)
+ }
+ case _ =>
+ }
+ if (attr != null && value != null) {
+ properties.setProperty(attr, value)
+ }
+ }
+ case _ =>
+ }
+
+ }
+ properties
+ }
+
+ def parse(builder: DocumentBuilder, is: InputStream, systemId: String): Document = {
+ if (is == null) {
+ return null
+ }
+ try {
+ builder.parse(is, systemId)
+ } finally {
+ is.close()
+ }
+ }
+}
diff --git a/examples/test_case_data/webapps/META-INF/context.xml b/examples/test_case_data/webapps/META-INF/context.xml
new file mode 100644
index 0000000..0ad90dc
--- /dev/null
+++ b/examples/test_case_data/webapps/META-INF/context.xml
@@ -0,0 +1,38 @@
+<?xml version='1.0' encoding='utf-8'?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!-- The contents of this file will be loaded for each web application -->
+<Context allowLinking="true">
+
+ <!-- Default set of monitored resources -->
+ <WatchedResource>WEB-INF/web.xml</WatchedResource>
+
+ <!-- Uncomment this to disable session persistence across Tomcat restarts -->
+ <!--
+ <Manager pathname="" />
+ -->
+
+ <!-- Uncomment this to enable Comet connection tacking (provides events
+ on session expiration as well as webapp lifecycle) -->
+ <!--
+ <Valve className="org.apache.catalina.valves.CometConnectionManagerValve" />
+ -->
+
+ <Loader loaderClass="org.apache.kylin.ext.DebugTomcatClassLoader"/>
+
+</Context>
diff --git a/kylin-it/jacoco-it.exec b/kylin-it/jacoco-it.exec
new file mode 100644
index 0000000..5af2bbf
Binary files /dev/null and b/kylin-it/jacoco-it.exec differ
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 18bb1d9..36cbc33 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -96,6 +96,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-query</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-test</artifactId>
+ </dependency>
<!-- Env & Test -->
@@ -338,6 +342,27 @@
</dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>hbaseStorage</id>
+ <properties>
+ <storageType>2</storageType>
+ <excludeTest>**/ITCombination2Test.java,**/ITFailfastQuery2Test.java,**/ITJDBCDriver2Test.java</excludeTest>
+ </properties>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ </profile>
+
+ <profile>
+ <id>parquetStorage</id>
+ <properties>
+ <storageType>4</storageType>
+ <excludeTest>**/ITCombinationTest.java,**/ITFailfastQueryTest.java,**/ITJDBCDriverTest.java,**/ITAclTableMigrationToolTest.java</excludeTest>
+ </properties>
+ </profile>
+ </profiles>
+
<build>
<plugins>
<plugin>
@@ -361,123 +386,114 @@
</execution>
</executions>
</plugin>
- </plugins>
- </build>
- <profiles>
- <profile>
- <id>sandbox</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-failsafe-plugin</artifactId>
- <executions>
- <execution>
- <id>integration-tests</id>
- <goals>
- <goal>integration-test</goal>
- </goals>
- </execution>
- <execution>
- <id>verify</id>
- <goals>
- <goal>verify</goal>
- </goals>
- </execution>
- </executions>
+ <!--CI plugins-->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>integration-tests</id>
+ <goals>
+ <goal>integration-test</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>verify</id>
+ <goals>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <excludes>
+ <exclude>**/*$*</exclude>
+ <exclude>${excludeTest}</exclude>
+ </excludes>
+ <systemProperties>
+ <property>
+ <name>log4j.configuration</name>
+ <value>
+ file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
+ </value>
+ </property>
+ </systemProperties>
+ <argLine>-Xms1G -Xmx2G -XX:PermSize=128M -XX:MaxPermSize=512M
+ -Dkylin.server.cluster-servers=localhost:7070
+ -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it.exec
+ </argLine>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>build_cube_with_engine</id>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <phase>pre-integration-test</phase>
<configuration>
- <excludes>
- <exclude>**/*$*</exclude>
- </excludes>
- <systemProperties>
- <property>
- <name>log4j.configuration</name>
- <value>
- file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
- </value>
- </property>
- </systemProperties>
- <argLine>-Xms1G -Xmx2G -XX:PermSize=128M -XX:MaxPermSize=512M
- -Dkylin.server.cluster-servers=localhost:7070
- -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it.exec
- </argLine>
+ <skip>${skipTests}</skip>
+ <classpathScope>test</classpathScope>
+ <executable>java</executable>
+ <arguments>
+ <argument>-Dhdp.version=${hdp.version}</argument>
+ <argument>-DfastBuildMode=${fastBuildMode}</argument>
+ <argument>
+ -DbuildCubeUsingProvidedData=${buildCubeUsingProvidedData}
+ </argument>
+ <argument>-DengineType=${engineType}</argument>
+ <argument>-DstorageType=${storageType}</argument>
+ <argument>
+ -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
+ </argument>
+ <argument>
+ -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it-engine.exec
+ </argument>
+ <argument>-classpath</argument>
+ <classpath />
+ <argument>org.apache.kylin.provision.BuildCubeWithEngine
+ </argument>
+ </arguments>
+ <workingDirectory>${project.basedir}</workingDirectory>
</configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>exec-maven-plugin</artifactId>
- <executions>
- <execution>
- <id>build_cube_with_engine</id>
- <goals>
- <goal>exec</goal>
- </goals>
- <phase>pre-integration-test</phase>
- <configuration>
- <skip>${skipTests}</skip>
- <classpathScope>test</classpathScope>
- <executable>java</executable>
- <arguments>
- <argument>-Dhdp.version=${hdp.version}</argument>
- <argument>-DfastBuildMode=${fastBuildMode}</argument>
- <argument>
- -DbuildCubeUsingProvidedData=${buildCubeUsingProvidedData}
- </argument>
- <argument>-DengineType=${engineType}</argument>
- <argument>
- -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
- </argument>
- <argument>
- -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it-engine.exec
- </argument>
- <argument>-classpath</argument>
- <classpath />
- <argument>org.apache.kylin.provision.BuildCubeWithEngine
- </argument>
- </arguments>
- <workingDirectory>${project.basedir}</workingDirectory>
- </configuration>
- </execution>
- <execution>
- <id>build_cube_with_stream</id>
- <goals>
- <goal>exec</goal>
- </goals>
- <phase>pre-integration-test</phase>
- <configuration>
- <skip>${skipTests}</skip>
- <classpathScope>test</classpathScope>
- <executable>java</executable>
- <arguments>
- <argument>-Dhdp.version=${hdp.version}</argument>
- <argument>-DfastBuildMode=${fastBuildMode}</argument>
- <argument>
- -DbuildCubeUsingProvidedData=${buildCubeUsingProvidedData}
- </argument>
- <argument>
- -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
- </argument>
- <argument>
- -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it-stream.exec
- </argument>
- <argument>-classpath</argument>
- <classpath />
- <argument>org.apache.kylin.provision.BuildCubeWithStream
- </argument>
- </arguments>
- <workingDirectory>${project.basedir}</workingDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
-
- </plugins>
- </build>
- </profile>
- </profiles>
+ </execution>
+ <execution>
+ <id>build_cube_with_stream</id>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <phase>pre-integration-test</phase>
+ <configuration>
+ <skip>${skipTests}</skip>
+ <classpathScope>test</classpathScope>
+ <executable>java</executable>
+ <arguments>
+ <argument>-Dhdp.version=${hdp.version}</argument>
+ <argument>-DfastBuildMode=${fastBuildMode}</argument>
+ <argument>
+ -DbuildCubeUsingProvidedData=${buildCubeUsingProvidedData}
+ </argument>
+ <argument>
+ -Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties
+ </argument>
+ <argument>
+ -javaagent:${project.basedir}/..//dev-support/jacocoagent.jar=includes=org.apache.kylin.*,output=file,destfile=jacoco-it-stream.exec
+ </argument>
+ <argument>-classpath</argument>
+ <classpath />
+ <argument>org.apache.kylin.provision.BuildCubeWithStream
+ </argument>
+ </arguments>
+ <workingDirectory>${project.basedir}</workingDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriver2Test.java
similarity index 51%
copy from storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
copy to kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriver2Test.java
index 6a3ad59..558e046 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/kylin-it/src/test/java/org/apache/kylin/jdbc/ITJDBCDriver2Test.java
@@ -14,30 +14,16 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
- */
-
-package org.apache.kylin.storage.parquet.cube;
+*/
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
+package org.apache.kylin.jdbc;
-public class CubeStorageQuery extends GTCubeStorageQueryBase {
+import org.apache.kylin.junit.SparkTestRunner;
+import org.junit.runner.RunWith;
- public CubeStorageQuery(CubeInstance cube) {
- super(cube);
- }
-
- @Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- return super.search(context, sqlDigest, returnTupleInfo);
- }
+/**
+ */
+@RunWith(SparkTestRunner.class)
+public class ITJDBCDriver2Test extends ITJDBCDriverTest {
- @Override
- protected String getGTStorage() {
- return null;
- }
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index ec5bc35..9bd741c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -18,25 +18,9 @@
package org.apache.kylin.provision;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.TimeZone;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -74,9 +58,24 @@ import org.apache.kylin.storage.hbase.util.ZookeeperJobLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
public class BuildCubeWithEngine {
@@ -86,6 +85,7 @@ public class BuildCubeWithEngine {
protected ExecutableManager jobService;
private static boolean fastBuildMode = false;
private static int engineType;
+ private static int storageType;
private static final Logger logger = LoggerFactory.getLogger(BuildCubeWithEngine.class);
@@ -131,10 +131,14 @@ public class BuildCubeWithEngine {
String specifiedEngineType = System.getProperty("engineType");
if (StringUtils.isNotEmpty(specifiedEngineType)) {
engineType = Integer.parseInt(specifiedEngineType);
- } else {
- engineType = 2;
}
+ String specifiedStorageType = System.getProperty("storageType");
+ if (StringUtils.isNotEmpty(specifiedEngineType)) {
+ storageType = Integer.parseInt(specifiedStorageType);
+ }
+ logger.info("==storageType: " + specifiedStorageType);
+
System.setProperty(KylinConfig.KYLIN_CONF, confDir);
System.setProperty("SPARK_HOME", "/usr/local/spark"); // need manually create and put spark to this folder on Jenkins
System.setProperty("kylin.hadoop.conf.dir", confDir);
@@ -195,6 +199,9 @@ public class BuildCubeWithEngine {
}
cubeDescManager = CubeDescManager.getInstance(kylinConfig);
+
+ // update enginType and storageTpye
+ updateCubeDesc("ci_inner_join_cube", "ci_left_join_cube");
}
public void after() {
@@ -353,12 +360,30 @@ public class BuildCubeWithEngine {
return doBuildAndMergeOnCube(cubeName);
}
- @SuppressWarnings("unused")
private void updateCubeEngineType(String cubeName) throws IOException {
- CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName);
- if (cubeDesc.getEngineType() != engineType) {
- cubeDesc.setEngineType(engineType);
- cubeDescManager.updateCubeDesc(cubeDesc);
+ if (engineType != 0) {
+ CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName);
+ if (cubeDesc.getEngineType() != engineType) {
+ cubeDesc.setEngineType(engineType);
+ cubeDescManager.updateCubeDesc(cubeDesc);
+ }
+ }
+ }
+
+ private void updateCubeStorageType(String cubeName) throws IOException {
+ if (storageType != 0) {
+ CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName);
+ if (cubeDesc.getStorageType() != storageType) {
+ cubeDesc.setStorageType(storageType);
+ cubeDescManager.updateCubeDesc(cubeDesc);
+ }
+ }
+ }
+
+ private void updateCubeDesc(String... cubeNames) throws IOException {
+ for (String cubeName : cubeNames) {
+ updateCubeEngineType(cubeName);
+ updateCubeStorageType(cubeName);
}
}
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITCombination2Test.java b/kylin-it/src/test/java/org/apache/kylin/query/ITCombination2Test.java
new file mode 100644
index 0000000..f266c34
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITCombination2Test.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.junit.SparkTestRunnerWithParametersFactory;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.Candidate;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ */
+@RunWith(Parameterized.class)
+@Parameterized.UseParametersRunnerFactory(SparkTestRunnerWithParametersFactory.class)
+public class ITCombination2Test extends ITKylinQuery2Test {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITCombination2Test.class);
+
+ @BeforeClass
+ public static void setUp() {
+ logger.info("setUp in ITCombination2Test");
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ logger.info("tearDown in ITCombination2Test");
+ clean();
+ Candidate.restorePriorities();
+ }
+
+ /**
+ * return all config combinations, where first setting specifies join type
+ * (inner or left), and the second setting specifies whether to force using
+ * coprocessors(on, off or unset).
+ */
+ @Parameterized.Parameters
+ public static Collection<Object[]> configs() {
+ return Arrays.asList(new Object[][] { //
+ { "inner", "on", "v2" }, //
+ { "left", "on", "v2" }, //
+ });
+ }
+
+ public ITCombination2Test(String joinType, String coprocessorToggle, String queryEngine) throws Exception {
+ logger.info("Into combination join type: " + joinType + ", coprocessor toggle: " + coprocessorToggle + ", query engine: " + queryEngine);
+ Map<RealizationType, Integer> priorities = Maps.newHashMap();
+ priorities.put(RealizationType.HYBRID, 0);
+ priorities.put(RealizationType.CUBE, 0);
+ priorities.put(RealizationType.INVERTED_INDEX, 0);
+ Candidate.setPriorities(priorities);
+ ITKylinQuery2Test.joinType = joinType;
+ setupAll();
+ }
+}
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQuery2Test.java b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQuery2Test.java
new file mode 100644
index 0000000..228a09b
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITFailfastQuery2Test.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.query;
+
+import java.util.Map;
+
+import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.ext.ClassLoaderUtils;
+import org.apache.kylin.junit.SparkTestRunner;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.spark.sql.SparderEnv;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+@RunWith(SparkTestRunner.class)
+public class ITFailfastQuery2Test extends ITFailfastQueryTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITFailfastQuery2Test.class);
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ logger.info("setUp in ITFailfastQueryTest");
+ Map<RealizationType, Integer> priorities = Maps.newHashMap();
+ priorities.put(RealizationType.HYBRID, 0);
+ priorities.put(RealizationType.CUBE, 0);
+ priorities.put(RealizationType.INVERTED_INDEX, 0);
+ Candidate.setPriorities(priorities);
+ joinType = "left";
+ setupAll();
+
+ // init spark
+ ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
+ SparderEnv.init();
+ Thread.currentThread().setContextClassLoader(originClassLoader);
+ }
+
+ @After
+ public void cleanUp() {
+ QueryContextFacade.resetCurrent();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ logger.info("tearDown in ITFailfastQuery2Test");
+ Candidate.restorePriorities();
+ clean();
+ }
+
+ @Override
+ @Test
+ public void testQueryExceedMaxScanBytes() throws Exception {
+ logger.info("testQueryExceedMaxScanBytes ignored");
+ }
+
+ @Override
+ @Test
+ public void testQueryNotExceedMaxScanBytes() throws Exception {
+ logger.info("testQueryNotExceedMaxScanBytes ignored");
+ }
+}
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQuery2Test.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQuery2Test.java
new file mode 100644
index 0000000..942e256
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQuery2Test.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query;
+
+import java.sql.DriverManager;
+import java.util.Map;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
+import org.apache.kylin.ext.ClassLoaderUtils;
+import org.apache.kylin.junit.SparkTestRunner;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.spark.sql.SparderEnv;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+@Ignore("@RunWith(SparkTestRunner.class) is contained by ITCombination2Test")
+@RunWith(SparkTestRunner.class)
+public class ITKylinQuery2Test extends ITKylinQueryTest {
+
+ private static final Logger logger = LoggerFactory.getLogger(ITKylinQuery2Test.class);
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ logger.info("setUp in ITKylinQuery2Test");
+ Map<RealizationType, Integer> priorities = Maps.newHashMap();
+ priorities.put(RealizationType.HYBRID, 0);
+ priorities.put(RealizationType.CUBE, 0);
+ priorities.put(RealizationType.INVERTED_INDEX, 0);
+ Candidate.setPriorities(priorities);
+
+ joinType = "left";
+
+ setupAll();
+ }
+
+ protected static void setupAll() throws Exception {
+ //setup env
+ HBaseMetadataTestCase.staticCreateTestMetadata();
+ config = KylinConfig.getInstanceFromEnv();
+
+ //setup cube conn
+ String project = ProjectInstance.DEFAULT_PROJECT_NAME;
+ cubeConnection = QueryConnection.getConnection(project);
+
+ //setup h2
+ h2Connection = DriverManager.getConnection("jdbc:h2:mem:db" + (h2InstanceCount++) + ";CACHE_SIZE=32072", "sa",
+ "");
+ // Load H2 Tables (inner join)
+ H2Database h2DB = new H2Database(h2Connection, config, project);
+ h2DB.loadAllTables();
+
+ // init spark
+ ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
+ SparderEnv.init();
+ Thread.currentThread().setContextClassLoader(originClassLoader);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ logger.info("tearDown in ITKylin2QueryTest");
+ Candidate.restorePriorities();
+ clean();
+ }
+
+ @Override
+ @Test
+ public void testTimeoutQuery() throws Exception {
+ logger.info("TimeoutQuery ignored.");
+ }
+
+ @Override
+ @Test
+ public void testExpressionQuery() throws Exception {
+ logger.info("ExpressionQuery ignored.");
+ }
+
+ @Override
+ @Test
+ public void testStreamingTableQuery() throws Exception {
+ logger.info("StreamingTableQuery ignored.");
+ }
+}
+
\ No newline at end of file
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index c6d1f62..bb35322 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.query;
@@ -461,8 +461,6 @@ public class ITKylinQueryTest extends KylinTestBase {
execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_values", null, true);
}
-
-
@Test
public void testPlan() throws Exception {
String originProp = System.getProperty("calcite.debug");
diff --git a/kylin-test/pom.xml b/kylin-test/pom.xml
new file mode 100644
index 0000000..ad1437f
--- /dev/null
+++ b/kylin-test/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>kylin</artifactId>
+ <groupId>org.apache.kylin</groupId>
+ <version>2.6.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <name>Apache Kylin - Test</name>
+ <artifactId>kylin-test</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-tomcat-ext</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-test</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/kylin-test/src/main/java/org/apache/kylin/junit/EnvUtils.java b/kylin-test/src/main/java/org/apache/kylin/junit/EnvUtils.java
new file mode 100644
index 0000000..19a5b50
--- /dev/null
+++ b/kylin-test/src/main/java/org/apache/kylin/junit/EnvUtils.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.junit;
+
+import com.google.common.collect.Maps;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.Map;
+
+public final class EnvUtils {
+
+ public static boolean checkEnv(String env) {
+ return System.getenv(env) != null;
+ }
+
+ public static void setNormalEnv() throws Exception {
+
+ Map<String, String> newenv = Maps.newHashMap();
+ setDefaultEnv("SPARK_HOME", "../../build/spark", newenv);
+ //setDefaultEnv("hdp.version", "2.4.0.0-169", newenv);
+ setDefaultEnv("ZIPKIN_HOSTNAME", "sandbox", newenv);
+ setDefaultEnv("ZIPKIN_SCRIBE_PORT", "9410", newenv);
+ setDefaultEnv("KAP_HDFS_WORKING_DIR", "/kylin", newenv);
+ changeEnv(newenv);
+
+ }
+
+ protected static void changeEnv(Map<String, String> newenv) throws Exception {
+ Class[] classes = Collections.class.getDeclaredClasses();
+ Map<String, String> env = System.getenv();
+ for (Class cl : classes) {
+ if ("java.util.Collections$UnmodifiableMap".equals(cl.getName())) {
+ Field field = cl.getDeclaredField("m");
+ field.setAccessible(true);
+ Object obj = field.get(env);
+ Map<String, String> map = (Map<String, String>) obj;
+ map.putAll(newenv);
+ }
+ }
+ }
+
+ private static void setDefaultEnv(String env, String defaultValue, Map<String, String> newenv) {
+ if (System.getenv(env) == null) {
+ newenv.put(env, defaultValue);
+ }
+ }
+}
diff --git a/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunner.java b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunner.java
new file mode 100644
index 0000000..bcc8c17
--- /dev/null
+++ b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunner.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.junit;
+
+import org.apache.kylin.ext.ItClassLoader;
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.model.InitializationError;
+
+import java.io.IOException;
+
+public class SparkTestRunner extends BlockJUnit4ClassRunner {
+
+ static public ItClassLoader customClassLoader;
+
+ public SparkTestRunner(Class<?> clazz) throws Exception {
+ super(loadFromCustomClassloader(clazz));
+ }
+
+ // Loads a class in the custom classloader
+ private static Class<?> loadFromCustomClassloader(Class<?> clazz) throws Exception {
+ if(!EnvUtils.checkEnv("SPARK_HOME")){
+ EnvUtils.setNormalEnv();
+ }
+ try {
+ // Only load once to support parallel tests
+ if (customClassLoader == null) {
+ customClassLoader = new ItClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+ return Class.forName(clazz.getName(), false, customClassLoader);
+ } catch (ClassNotFoundException | IOException e) {
+ throw new InitializationError(e);
+ }
+ }
+
+ public static ItClassLoader get() throws IOException {
+ if (customClassLoader == null) {
+ customClassLoader = new ItClassLoader(Thread.currentThread().getContextClassLoader());
+ }
+ return customClassLoader;
+ }
+
+ // Runs junit tests in a separate thread using the custom class loader
+ @Override
+ public void run(final RunNotifier notifier) {
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ SparkTestRunner.super.run(notifier);
+ }
+ };
+ Thread thread = new Thread(runnable);
+ thread.setContextClassLoader(customClassLoader);
+ thread.start();
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerRunnerWithParameters.java b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerRunnerWithParameters.java
new file mode 100644
index 0000000..3df9831
--- /dev/null
+++ b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerRunnerWithParameters.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.junit;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.util.List;
+
+import org.junit.runner.notification.RunNotifier;
+import org.junit.runners.BlockJUnit4ClassRunner;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.model.FrameworkField;
+import org.junit.runners.model.FrameworkMethod;
+import org.junit.runners.model.Statement;
+import org.junit.runners.parameterized.TestWithParameters;
+
+/**
+ * A {@link BlockJUnit4ClassRunner} with parameters support. Parameters can be
+ * injected via constructor or into annotated fields.
+ */
+public class SparkTestRunnerRunnerWithParameters extends SparkTestRunner {
+ private final Object[] parameters;
+
+ private final String name;
+
+ public SparkTestRunnerRunnerWithParameters(TestWithParameters test) throws Exception {
+ super(test.getTestClass().getJavaClass());
+ parameters = test.getParameters().toArray(new Object[test.getParameters().size()]);
+ name = test.getName();
+ }
+
+ @Override
+ public Object createTest() throws Exception {
+ if (fieldsAreAnnotated()) {
+ return createTestUsingFieldInjection();
+ } else {
+ return createTestUsingConstructorInjection();
+ }
+ }
+
+ private Object createTestUsingConstructorInjection() throws Exception {
+ return getTestClass().getOnlyConstructor().newInstance(parameters);
+ }
+
+ private Object createTestUsingFieldInjection() throws Exception {
+ List<FrameworkField> annotatedFieldsByParameter = getAnnotatedFieldsByParameter();
+ if (annotatedFieldsByParameter.size() != parameters.length) {
+ throw new Exception("Wrong number of parameters and @Parameter fields." + " @Parameter fields counted: "
+ + annotatedFieldsByParameter.size() + ", available parameters: " + parameters.length + ".");
+ }
+ Object testClassInstance = getTestClass().getJavaClass().newInstance();
+ for (FrameworkField each : annotatedFieldsByParameter) {
+ Field field = each.getField();
+ Parameter annotation = field.getAnnotation(Parameter.class);
+ int index = annotation.value();
+ try {
+ field.set(testClassInstance, parameters[index]);
+ } catch (IllegalArgumentException iare) {
+ throw new Exception(getTestClass().getName() + ": Trying to set " + field.getName() + " with the value "
+ + parameters[index] + " that is not the right type ("
+ + parameters[index].getClass().getSimpleName() + " instead of "
+ + field.getType().getSimpleName() + ").", iare);
+ }
+ }
+ return testClassInstance;
+ }
+
+ @Override
+ protected String getName() {
+ return name;
+ }
+
+ @Override
+ protected String testName(FrameworkMethod method) {
+ return method.getName() + getName();
+ }
+
+ @Override
+ protected void validateConstructor(List<Throwable> errors) {
+ validateOnlyOneConstructor(errors);
+ if (fieldsAreAnnotated()) {
+ validateZeroArgConstructor(errors);
+ }
+ }
+
+ @Override
+ protected void validateFields(List<Throwable> errors) {
+ super.validateFields(errors);
+ if (fieldsAreAnnotated()) {
+ List<FrameworkField> annotatedFieldsByParameter = getAnnotatedFieldsByParameter();
+ int[] usedIndices = new int[annotatedFieldsByParameter.size()];
+ for (FrameworkField each : annotatedFieldsByParameter) {
+ int index = each.getField().getAnnotation(Parameter.class).value();
+ if (index < 0 || index > annotatedFieldsByParameter.size() - 1) {
+ errors.add(new Exception("Invalid @Parameter value: " + index + ". @Parameter fields counted: "
+ + annotatedFieldsByParameter.size() + ". Please use an index between 0 and "
+ + (annotatedFieldsByParameter.size() - 1) + "."));
+ } else {
+ usedIndices[index]++;
+ }
+ }
+ for (int index = 0; index < usedIndices.length; index++) {
+ int numberOfUse = usedIndices[index];
+ if (numberOfUse == 0) {
+ errors.add(new Exception("@Parameter(" + index + ") is never used."));
+ } else if (numberOfUse > 1) {
+ errors.add(
+ new Exception("@Parameter(" + index + ") is used more than once (" + numberOfUse + ")."));
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Statement classBlock(RunNotifier notifier) {
+ return childrenInvoker(notifier);
+ }
+
+ @Override
+ protected Annotation[] getRunnerAnnotations() {
+ return new Annotation[0];
+ }
+
+ private List<FrameworkField> getAnnotatedFieldsByParameter() {
+ return getTestClass().getAnnotatedFields(Parameter.class);
+ }
+
+ private boolean fieldsAreAnnotated() {
+ return !getAnnotatedFieldsByParameter().isEmpty();
+ }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerWithParametersFactory.java
similarity index 51%
copy from storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
copy to kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerWithParametersFactory.java
index 6a3ad59..68ec888 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/kylin-test/src/main/java/org/apache/kylin/junit/SparkTestRunnerWithParametersFactory.java
@@ -16,28 +16,19 @@
* limitations under the License.
*/
-package org.apache.kylin.storage.parquet.cube;
+package org.apache.kylin.junit;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
+import org.junit.runner.Runner;
+import org.junit.runners.model.InitializationError;
+import org.junit.runners.parameterized.ParametersRunnerFactory;
+import org.junit.runners.parameterized.TestWithParameters;
-public class CubeStorageQuery extends GTCubeStorageQueryBase {
-
- public CubeStorageQuery(CubeInstance cube) {
- super(cube);
- }
-
- @Override
- public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
- return super.search(context, sqlDigest, returnTupleInfo);
- }
-
- @Override
- protected String getGTStorage() {
- return null;
+public class SparkTestRunnerWithParametersFactory implements ParametersRunnerFactory {
+ public Runner createRunnerForTestWithParameters(TestWithParameters test) throws InitializationError {
+ try {
+ return new SparkTestRunnerRunnerWithParameters(test);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
-}
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 3585927..2325fc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,6 +61,7 @@
<!-- Spark versions -->
<spark.version>2.3.2</spark.version>
+
<kryo.version>4.0.0</kryo.version>
<!-- mysql versions -->
@@ -297,6 +298,11 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-storage-parquet</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-query</artifactId>
<version>${project.version}</version>
</dependency>
@@ -337,6 +343,16 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-tomcat-ext</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-test</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-core-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
@@ -1283,6 +1299,7 @@
<module>source-jdbc</module>
<module>source-kafka</module>
<module>storage-hbase</module>
+ <module>storage-parquet</module>
<module>query</module>
<module>server-base</module>
<module>server</module>
@@ -1297,6 +1314,7 @@
<module>metrics-reporter-kafka</module>
<module>cache</module>
<module>datasource-sdk</module>
+ <module>kylin-test</module>
</modules>
<reporting>
diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
index 467ef82..4fdcf5d 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
@@ -20,8 +20,10 @@ package org.apache.kylin.rest.init;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.ext.ClassLoaderUtils;
import org.apache.kylin.rest.metrics.QueryMetrics2Facade;
import org.apache.kylin.rest.metrics.QueryMetricsFacade;
+import org.apache.spark.sql.SparderEnv;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
@@ -37,6 +39,8 @@ public class InitialTaskManager implements InitializingBean {
public void afterPropertiesSet() throws Exception {
logger.info("Kylin service is starting.....");
+ checkAndInitSpark();
+
runInitialTasks();
}
@@ -62,4 +66,26 @@ public class InitialTaskManager implements InitializingBean {
logger.info("All initial tasks finished.");
}
}
+
+ private void checkAndInitSpark() {
+ boolean hasSparkJar = true;
+ // if in ut, has not spark jar.
+ try {
+ Class.forName("org.apache.spark.sql.SparkSession");
+ } catch (ClassNotFoundException e) {
+ logger.info("Can not find org.apache.spark.sql.SparkSession.Spark has not started.");
+ hasSparkJar = false;
+ }
+
+ if (hasSparkJar) {
+ ClassLoader originClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
+ try {
+ SparderEnv.init();
+ } catch (Throwable ex) {
+ logger.error("Initial Spark Context at starting failed", ex);
+ }
+ Thread.currentThread().setContextClassLoader(originClassLoader);
+ }
+ }
}
diff --git a/server/pom.xml b/server/pom.xml
index a898eff..d4427c2 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -305,6 +305,13 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>${mysql-connector.version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
index db28595..c0519e6 100644
--- a/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
+++ b/server/src/main/java/org/apache/kylin/rest/DebugTomcat.java
@@ -19,6 +19,7 @@
package org.apache.kylin.rest;
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
@@ -80,6 +81,12 @@ public class DebugTomcat {
private static void overrideDevJobJarLocations() {
KylinConfig conf = KylinConfig.getInstanceFromEnv();
File devJobJar = findFile("../assembly/target", "kylin-assembly-.*-SNAPSHOT-job.jar");
+ File sparkJar = findFile("../storage-parquet/target", "kylin-storage-parquet-.*-SNAPSHOT-spark.jar");
+ try {
+ System.setProperty("kylin.query.parquet-additional-jars", sparkJar.getCanonicalPath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
if (devJobJar != null) {
conf.overrideMRJobJarPath(devJobJar.getAbsolutePath());
}
@@ -110,8 +117,11 @@ public class DebugTomcat {
File webBase = new File("../webapp/app");
File webInfDir = new File(webBase, "WEB-INF");
+ File metaInfDir = new File(webBase, "META-INF");
FileUtils.deleteDirectory(webInfDir);
+ FileUtils.deleteDirectory(metaInfDir);
FileUtils.copyDirectoryToDirectory(new File("../server/src/main/webapp/WEB-INF"), webBase);
+ FileUtils.copyDirectoryToDirectory(new File("../examples/test_case_data/webapps/META-INF"), webBase);
Tomcat tomcat = new Tomcat();
tomcat.setPort(port);
diff --git a/server/src/main/webapp/META-INF/context.xml b/server/src/main/webapp/META-INF/context.xml
new file mode 100644
index 0000000..d988ae1
--- /dev/null
+++ b/server/src/main/webapp/META-INF/context.xml
@@ -0,0 +1,38 @@
+<?xml version='1.0' encoding='utf-8'?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!-- The contents of this file will be loaded for each web application -->
+<Context allowLinking="true">
+
+ <!-- Default set of monitored resources -->
+ <WatchedResource>WEB-INF/web.xml</WatchedResource>
+
+ <!-- Uncomment this to disable session persistence across Tomcat restarts -->
+ <!--
+ <Manager pathname="" />
+ -->
+
+ <!-- Uncomment this to enable Comet connection tacking (provides events
+ on session expiration as well as webapp lifecycle) -->
+ <!--
+ <Valve className="org.apache.catalina.valves.CometConnectionManagerValve" />
+ -->
+
+ <Loader loaderClass="org.apache.kylin.ext.TomcatClassLoader"/>
+
+</Context>
diff --git a/storage-parquet/pom.xml b/storage-parquet/pom.xml
new file mode 100644
index 0000000..3778031
--- /dev/null
+++ b/storage-parquet/pom.xml
@@ -0,0 +1,159 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kylin-storage-parquet</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache Kylin - Parquet Storage</name>
+ <description>Apache Kylin - Parquet Storage</description>
+
+ <parent>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin</artifactId>
+ <version>2.6.0-SNAPSHOT</version>
+ </parent>
+
+ <properties>
+ <shadeBase>org.apache.kylin.coprocessor.shaded</shadeBase>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-metadata</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-mr</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-spark</artifactId>
+ </dependency>
+
+ <!-- Env & Test -->
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-storage</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mrunit</groupId>
+ <artifactId>mrunit</artifactId>
+ <classifier>hadoop2</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4-rule-agent</artifactId>
+ <version>${powermock.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Spark dependency -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_2.11</artifactId>
+ <version>${spark.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>spark</shadedClassifierName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ </transformers>
+ <artifactSet>
+ <includes>
+ <include>org.apache.kylin:kylin-core-common</include>
+ <include>org.apache.kylin:kylin-core-metadata</include>
+ <include>org.apache.kylin:kylin-core-dictionary</include>
+ <include>org.apache.kylin:kylin-core-cube</include>
+ <include>org.apache.kylin:kylin-engine-spark</include>
+ <include>com.tdunning:t-digest</include>
+ </includes>
+ </artifactSet>
+ <relocations>
+ <relocation>
+ <pattern>com.tdunning</pattern>
+ <shadedPattern>${shadeBase}.com.tdunning</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ s
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
index 1322da8..5009a51 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeSparkRPC.java
@@ -73,6 +73,11 @@ public class CubeSparkRPC implements IGTStorage {
JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment, "");
+<<<<<<< HEAD
+=======
+ String cubooidRootPath = jobBuilderSupport.getCuboidRootPath();
+
+>>>>>>> 198041d63... KYLIN-3625 Init query
List<List<Long>> layeredCuboids = cubeSegment.getCuboidScheduler().getCuboidsByLayer();
int level = 0;
for (List<Long> levelCuboids : layeredCuboids) {
@@ -82,9 +87,13 @@ public class CubeSparkRPC implements IGTStorage {
level++;
}
+<<<<<<< HEAD
String dataFolderName;
String parquetRootPath = jobBuilderSupport.getParquetOutputPath();
dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(parquetRootPath, level) + "/" + cuboid.getId();
+=======
+ String dataFolderName = JobBuilderSupport.getCuboidOutputPathsByLevel(cubooidRootPath, level) + "/" + cuboid.getId();
+>>>>>>> 198041d63... KYLIN-3625 Init query
builder.setGtScanRequest(scanRequest.toByteArray()).setGtScanRequestId(scanReqId)
.setKylinProperties(KylinConfig.getInstanceFromEnv().exportAllToString())
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
index 6a3ad59..a3ce76f 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/cube/CubeStorageQuery.java
@@ -18,6 +18,7 @@
package org.apache.kylin.storage.parquet.cube;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.metadata.realization.SQLDigest;
import org.apache.kylin.metadata.tuple.ITupleIterator;
@@ -38,6 +39,6 @@ public class CubeStorageQuery extends GTCubeStorageQueryBase {
@Override
protected String getGTStorage() {
- return null;
+ return KylinConfig.getInstanceFromEnv().getSparkCubeGTStorage();
}
}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
new file mode 100644
index 0000000..9096679
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetPayload.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark;
+
+import java.util.List;
+
+public class ParquetPayload {
+ private byte[] gtScanRequest;
+ private String gtScanRequestId;
+ private String kylinProperties;
+ private String realizationId;
+ private String segmentId;
+ private String dataFolderName;
+ private int maxRecordLength;
+ private List<Integer> parquetColumns;
+ private boolean isUseII;
+ private String realizationType;
+ private String queryId;
+ private boolean spillEnabled;
+ private long maxScanBytes;
+ private long startTime;
+ private int storageType;
+
+ private ParquetPayload(byte[] gtScanRequest, String gtScanRequestId, String kylinProperties, String realizationId,
+ String segmentId, String dataFolderName, int maxRecordLength, List<Integer> parquetColumns,
+ boolean isUseII, String realizationType, String queryId, boolean spillEnabled, long maxScanBytes,
+ long startTime, int storageType) {
+ this.gtScanRequest = gtScanRequest;
+ this.gtScanRequestId = gtScanRequestId;
+ this.kylinProperties = kylinProperties;
+ this.realizationId = realizationId;
+ this.segmentId = segmentId;
+ this.dataFolderName = dataFolderName;
+ this.maxRecordLength = maxRecordLength;
+ this.parquetColumns = parquetColumns;
+ this.isUseII = isUseII;
+ this.realizationType = realizationType;
+ this.queryId = queryId;
+ this.spillEnabled = spillEnabled;
+ this.maxScanBytes = maxScanBytes;
+ this.startTime = startTime;
+ this.storageType = storageType;
+ }
+
+ public byte[] getGtScanRequest() {
+ return gtScanRequest;
+ }
+
+ public String getGtScanRequestId() {
+ return gtScanRequestId;
+ }
+
+ public String getKylinProperties() {
+ return kylinProperties;
+ }
+
+ public String getRealizationId() {
+ return realizationId;
+ }
+
+ public String getSegmentId() {
+ return segmentId;
+ }
+
+ public String getDataFolderName() {
+ return dataFolderName;
+ }
+
+ public int getMaxRecordLength() {
+ return maxRecordLength;
+ }
+
+ public List<Integer> getParquetColumns() {
+ return parquetColumns;
+ }
+
+ public boolean isUseII() {
+ return isUseII;
+ }
+
+ public String getRealizationType() {
+ return realizationType;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public boolean isSpillEnabled() {
+ return spillEnabled;
+ }
+
+ public long getMaxScanBytes() {
+ return maxScanBytes;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public int getStorageType() {
+ return storageType;
+ }
+
+ static public class ParquetPayloadBuilder {
+ private byte[] gtScanRequest;
+ private String gtScanRequestId;
+ private String kylinProperties;
+ private String realizationId;
+ private String segmentId;
+ private String dataFolderName;
+ private int maxRecordLength;
+ private List<Integer> parquetColumns;
+ private boolean isUseII;
+ private String realizationType;
+ private String queryId;
+ private boolean spillEnabled;
+ private long maxScanBytes;
+ private long startTime;
+ private int storageType;
+
+ public ParquetPayloadBuilder() {
+ }
+
+ public ParquetPayloadBuilder setGtScanRequest(byte[] gtScanRequest) {
+ this.gtScanRequest = gtScanRequest;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setGtScanRequestId(String gtScanRequestId) {
+ this.gtScanRequestId = gtScanRequestId;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setKylinProperties(String kylinProperties) {
+ this.kylinProperties = kylinProperties;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setRealizationId(String realizationId) {
+ this.realizationId = realizationId;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setSegmentId(String segmentId) {
+ this.segmentId = segmentId;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setDataFolderName(String dataFolderName) {
+ this.dataFolderName = dataFolderName;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setMaxRecordLength(int maxRecordLength) {
+ this.maxRecordLength = maxRecordLength;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setParquetColumns(List<Integer> parquetColumns) {
+ this.parquetColumns = parquetColumns;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setIsUseII(boolean isUseII) {
+ this.isUseII = isUseII;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setRealizationType(String realizationType) {
+ this.realizationType = realizationType;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setQueryId(String queryId) {
+ this.queryId = queryId;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setSpillEnabled(boolean spillEnabled) {
+ this.spillEnabled = spillEnabled;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setMaxScanBytes(long maxScanBytes) {
+ this.maxScanBytes = maxScanBytes;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setStartTime(long startTime) {
+ this.startTime = startTime;
+ return this;
+ }
+
+ public ParquetPayloadBuilder setStorageType(int storageType) {
+ this.storageType = storageType;
+ return this;
+ }
+
+ public ParquetPayload createParquetPayload() {
+ return new ParquetPayload(gtScanRequest, gtScanRequestId, kylinProperties, realizationId, segmentId,
+ dataFolderName, maxRecordLength, parquetColumns, isUseII, realizationType, queryId, spillEnabled,
+ maxScanBytes, startTime, storageType);
+ }
+ }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
new file mode 100644
index 0000000..611ee44
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/ParquetTask.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.QueryContext;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparderEnv$;
+import org.apache.spark.sql.manager.UdfManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.spark.sql.functions.asc;
+import static org.apache.spark.sql.functions.callUDF;
+import static org.apache.spark.sql.functions.col;
+import static org.apache.spark.sql.functions.max;
+import static org.apache.spark.sql.functions.min;
+import static org.apache.spark.sql.functions.sum;
+
+@SuppressWarnings("serial")
+public class ParquetTask implements Serializable {
+ public static final Logger logger = LoggerFactory.getLogger(org.apache.kylin.storage.parquet.spark.ParquetTask.class);
+
+ private final transient JavaSparkContext sc;
+ private final KylinConfig kylinConfig;
+ private final transient Configuration conf;
+ private final transient String[] parquetPaths;
+ private final transient GTScanRequest scanRequest;
+ private final transient CuboidToGridTableMapping mapping;
+
+ ParquetTask(ParquetPayload request) {
+ try {
+ this.sc = JavaSparkContext.fromSparkContext(SparderEnv$.MODULE$.getSparkSession().sparkContext());
+ this.kylinConfig = KylinConfig.getInstanceFromEnv();
+ this.conf = HadoopUtil.getCurrentConfiguration();
+
+ scanRequest = GTScanRequest.serializer
+ .deserialize(ByteBuffer.wrap(request.getGtScanRequest()));
+
+ long startTime = System.currentTimeMillis();
+ sc.setLocalProperty("spark.job.description", Thread.currentThread().getName());
+
+ if (QueryContext.current().isHighPriorityQuery()) {
+ sc.setLocalProperty("spark.scheduler.pool", "vip_tasks");
+ } else {
+ sc.setLocalProperty("spark.scheduler.pool", "lightweight_tasks");
+ }
+
+ String dataFolderName = request.getDataFolderName();
+
+ String baseFolder = dataFolderName.substring(0, dataFolderName.lastIndexOf('/'));
+ String cuboidId = dataFolderName.substring(dataFolderName.lastIndexOf("/") + 1);
+ String prefix = "cuboid_" + cuboidId + "_";
+
+ CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(request.getRealizationId());
+ CubeSegment cubeSegment = cubeInstance.getSegmentById(request.getSegmentId());
+ mapping = new CuboidToGridTableMapping(Cuboid.findById(cubeSegment.getCuboidScheduler(), Long.valueOf(cuboidId)));
+
+ Path[] filePaths = HadoopUtil.getFilteredPath(HadoopUtil.getWorkingFileSystem(conf), new Path(baseFolder), prefix);
+ parquetPaths = new String[filePaths.length];
+
+ for (int i = 0; i < filePaths.length; i++) {
+ parquetPaths[i] = filePaths[i].toString();
+ }
+
+ cleanHadoopConf(conf);
+
+ logger.info("SparkVisit Init takes {} ms", System.currentTimeMillis() - startTime);
+
+ StringBuilder pathBuilder = new StringBuilder();
+ for (Path p : filePaths) {
+ pathBuilder.append(p.toString()).append(";");
+ }
+
+ logger.info("Columnar path is " + pathBuilder.toString());
+ logger.info("Required Measures: " + StringUtils.join(request.getParquetColumns(), ","));
+ logger.info("Max GT length: " + request.getMaxRecordLength());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * the updatingResource part of Configuration will incur gzip compression in Configuration.write
+ * we cleaned them out to improve qps
+ */
+ private void cleanHadoopConf(Configuration c) {
+ try {
+ //updatingResource will get compressed by gzip, which is costly
+ Field updatingResourceField = Configuration.class.getDeclaredField("updatingResource");
+ updatingResourceField.setAccessible(true);
+ Map<String, String[]> map = (Map<String, String[]>) updatingResourceField.get(c);
+ map.clear();
+
+ } catch (IllegalAccessException | NoSuchFieldException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public Iterator<Object[]> executeTask() {
+ logger.info("Start to visit cube data with Spark SQL <<<<<<");
+
+ SQLContext sqlContext = new SQLContext(SparderEnv.getSparkSession().sparkContext());
+
+ Dataset<Row> dataset = sqlContext.read().parquet(parquetPaths);
+ ImmutableBitSet dimensions = scanRequest.getDimensions();
+ ImmutableBitSet metrics = scanRequest.getAggrMetrics();
+ ImmutableBitSet groupBy = scanRequest.getAggrGroupBy();
+
+ // select
+ Column[] selectColumn = getSelectColumn(dimensions, metrics, mapping);
+ dataset = dataset.select(selectColumn);
+
+ // where
+ String where = scanRequest.getFilterPushDownSQL();
+ if (where != null) {
+ dataset = dataset.filter(where);
+ }
+
+ //groupby agg
+ Column[] aggCols = getAggColumns(metrics, mapping);
+ Column[] tailCols;
+
+ if (aggCols.length >= 1) {
+ tailCols = new Column[aggCols.length - 1];
+ System.arraycopy(aggCols, 1, tailCols, 0, tailCols.length);
+ dataset = dataset.groupBy(getGroupByColumn(dimensions, mapping)).agg(aggCols[0], tailCols);
+ }
+
+ // sort
+ dataset = dataset.sort(getSortColumn(groupBy, mapping));
+
+ JavaRDD<Row> rowRDD = dataset.javaRDD();
+
+ JavaRDD<Object[]> objRDD = rowRDD.map(new Function<Row, Object[]>() {
+ @Override
+ public Object[] call(Row row) throws Exception {
+ Object[] objects = new Object[row.length()];
+ for (int i = 0; i < row.length(); i++) {
+ objects[i] = row.get(i);
+ }
+ return objects;
+ }
+ });
+
+ logger.info("partitions: {}", objRDD.getNumPartitions());
+
+ List<Object[]> result = objRDD.collect();
+ return result.iterator();
+ }
+
+ private Column[] getAggColumns(ImmutableBitSet metrics, CuboidToGridTableMapping mapping) {
+ Column[] columns = new Column[metrics.trueBitCount()];
+ Map<MeasureDesc, Integer> met2gt = mapping.getMet2gt();
+
+ for (int i = 0; i < metrics.trueBitCount(); i++) {
+ int c = metrics.trueBitAt(i);
+ for (Map.Entry<MeasureDesc, Integer> entry : met2gt.entrySet()) {
+ if (entry.getValue() == c) {
+ MeasureDesc measureDesc = entry.getKey();
+ String func = measureDesc.getFunction().getExpression();
+ columns[i] = getAggColumn(measureDesc.getName(), func, measureDesc.getFunction().getReturnDataType());
+ break;
+ }
+ }
+ }
+
+ return columns;
+ }
+
+ private Column getAggColumn(String metName, String func, DataType dataType) {
+ Column column;
+ switch (func) {
+ case "SUM":
+ column = sum(metName);
+ break;
+ case "MIN":
+ column = min(metName);
+ break;
+ case "MAX":
+ column = max(metName);
+ break;
+ case "COUNT":
+ column = sum(metName);
+ break;
+ case "TOP_N":
+ case "COUNT_DISTINCT":
+ case "EXTENDED_COLUMN":
+ case "PERCENTILE_APPROX":
+ case "RAW":
+ String udf = UdfManager.register(dataType, func);
+ column = callUDF(udf, col(metName));
+ break;
+ default:
+ throw new IllegalArgumentException("Function " + func + " is not supported");
+
+ }
+ return column.alias(metName);
+ }
+
+ private void getDimColumn(ImmutableBitSet dimensions, Column[] columns, int from, CuboidToGridTableMapping mapping) {
+ Map<TblColRef, Integer> dim2gt = mapping.getDim2gt();
+
+ for (int i = 0; i < dimensions.trueBitCount(); i++) {
+ int c = dimensions.trueBitAt(i);
+ for (Map.Entry<TblColRef, Integer> entry : dim2gt.entrySet()) {
+ if (entry.getValue() == c) {
+ columns[i + from] = col(entry.getKey().getTableAlias() + "_" + entry.getKey().getName());
+ break;
+ }
+ }
+ }
+ }
+
+ private void getMetColumn(ImmutableBitSet metrics, Column[] columns, int from, CuboidToGridTableMapping mapping) {
+ Map<MeasureDesc, Integer> met2gt = mapping.getMet2gt();
+
+ for (int i = 0; i < metrics.trueBitCount(); i++) {
+ int m = metrics.trueBitAt(i);
+ for (Map.Entry<MeasureDesc, Integer> entry : met2gt.entrySet()) {
+ if (entry.getValue() == m) {
+ columns[i + from] = col(entry.getKey().getName());
+ break;
+ }
+ }
+ }
+ }
+
+ private Column[] getSelectColumn(ImmutableBitSet dimensions, ImmutableBitSet metrics, CuboidToGridTableMapping mapping) {
+ Column[] columns = new Column[dimensions.trueBitCount() + metrics.trueBitCount()];
+
+ getDimColumn(dimensions, columns, 0, mapping);
+
+ getMetColumn(metrics, columns, dimensions.trueBitCount(), mapping);
+
+ return columns;
+ }
+
+ private Column[] getGroupByColumn(ImmutableBitSet dimensions, CuboidToGridTableMapping mapping) {
+ Column[] columns = new Column[dimensions.trueBitCount()];
+
+ getDimColumn(dimensions, columns, 0, mapping);
+
+ return columns;
+ }
+
+ private Column[] getSortColumn(ImmutableBitSet dimensions, CuboidToGridTableMapping mapping) {
+ Column[] columns = new Column[dimensions.trueBitCount()];
+ Map<TblColRef, Integer> dim2gt = mapping.getDim2gt();
+
+ for (int i = 0; i < dimensions.trueBitCount(); i++) {
+ int c = dimensions.trueBitAt(i);
+ for (Map.Entry<TblColRef, Integer> entry : dim2gt.entrySet()) {
+ if (entry.getValue() == c) {
+ columns[i] = asc(entry.getKey().getTableAlias() + "_" + entry.getKey().getName());
+ break;
+ }
+ }
+ }
+
+ return columns;
+ }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
new file mode 100644
index 0000000..1c8c246
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/SparkSubmitter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark;
+
+import org.apache.kylin.ext.ClassLoaderUtils;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.parquet.spark.gtscanner.ParquetRecordGTScanner;
+import org.apache.kylin.storage.parquet.spark.gtscanner.ParquetRecordGTScanner4Cube;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SparkSubmitter {
+ public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);
+
+ public static IGTScanner submitParquetTask(GTScanRequest scanRequest, ParquetPayload payload) {
+
+ Thread.currentThread().setContextClassLoader(ClassLoaderUtils.getSparkClassLoader());
+ ParquetTask parquetTask = new ParquetTask(payload);
+
+ ParquetRecordGTScanner scanner = new ParquetRecordGTScanner4Cube(scanRequest.getInfo(), parquetTask.executeTask(), scanRequest,
+ payload.getMaxScanBytes());
+
+ return scanner;
+ }
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
new file mode 100644
index 0000000..322aec1
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark.gtscanner;
+
+import com.google.common.collect.Iterators;
+import org.apache.kylin.common.exceptions.KylinTimeoutException;
+import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * this class tracks resource
+ */
+public abstract class ParquetRecordGTScanner implements IGTScanner {
+
+ private Iterator<Object[]> iterator;
+ private GTInfo info;
+ private GTRecord gtrecord;
+ private ImmutableBitSet columns;
+
+ private long maxScannedBytes;
+
+ private long scannedRows;
+ private long scannedBytes;
+
+ private ImmutableBitSet[] columnBlocks;
+
+ public ParquetRecordGTScanner(GTInfo info, Iterator<Object[]> iterator, GTScanRequest scanRequest,
+ long maxScannedBytes) {
+ this.iterator = iterator;
+ this.info = info;
+ this.gtrecord = new GTRecord(info);
+ this.columns = scanRequest.getColumns();
+ this.maxScannedBytes = maxScannedBytes;
+ this.columnBlocks = getParquetCoveredColumnBlocks(scanRequest);
+ }
+
+ @Override
+ public GTInfo getInfo() {
+ return info;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ public long getTotalScannedRowCount() {
+ return scannedRows;
+ }
+
+ public long getTotalScannedRowBytes() {
+ return scannedBytes;
+ }
+
+ @Override
+ public Iterator<GTRecord> iterator() {
+ return Iterators.transform(iterator, new com.google.common.base.Function<Object[], GTRecord>() {
+ @Nullable
+ @Override
+ public GTRecord apply(@Nullable Object[] input) {
+ gtrecord.setValuesParquet(ParquetRecordGTScanner.this.columns, new ByteArray(info.getMaxColumnLength(ParquetRecordGTScanner.this.columns)), input);
+
+ scannedBytes += info.getMaxColumnLength(ParquetRecordGTScanner.this.columns);
+ if ((++scannedRows % GTScanRequest.terminateCheckInterval == 1) && Thread.interrupted()) {
+ throw new KylinTimeoutException("Query timeout");
+ }
+ if (scannedBytes > maxScannedBytes) {
+ throw new ResourceLimitExceededException(
+ "Partition scanned bytes " + scannedBytes + " exceeds threshold " + maxScannedBytes
+ + ", consider increase kylin.storage.partition.max-scan-bytes");
+ }
+ return gtrecord;
+ }
+ });
+ }
+
+ abstract protected ImmutableBitSet getParquetCoveredColumns(GTScanRequest scanRequest);
+
+ abstract protected ImmutableBitSet[] getParquetCoveredColumnBlocks(GTScanRequest scanRequest);
+
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
new file mode 100644
index 0000000..3bf670e
--- /dev/null
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/spark/gtscanner/ParquetRecordGTScanner4Cube.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.parquet.spark.gtscanner;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTScanRequest;
+
+import java.util.BitSet;
+import java.util.Iterator;
+
+public class ParquetRecordGTScanner4Cube extends ParquetRecordGTScanner {
+ public ParquetRecordGTScanner4Cube(GTInfo info, Iterator<Object[]> iterator, GTScanRequest scanRequest,
+ long maxScannedBytes) {
+ super(info, iterator, scanRequest, maxScannedBytes);
+ }
+
+ protected ImmutableBitSet getParquetCoveredColumns(GTScanRequest scanRequest) {
+ BitSet bs = new BitSet();
+
+ ImmutableBitSet dimensions = scanRequest.getInfo().getPrimaryKey();
+ for (int i = 0; i < dimensions.trueBitCount(); ++i) {
+ bs.set(dimensions.trueBitAt(i));
+ }
+
+ ImmutableBitSet queriedColumns = scanRequest.getColumns();
+ for (int i = 0; i < queriedColumns.trueBitCount(); ++i) {
+ bs.set(queriedColumns.trueBitAt(i));
+ }
+ return new ImmutableBitSet(bs);
+ }
+
+ protected ImmutableBitSet[] getParquetCoveredColumnBlocks(GTScanRequest scanRequest) {
+
+ ImmutableBitSet selectedColBlocksBitSet = scanRequest.getSelectedColBlocks();
+
+ ImmutableBitSet[] selectedColBlocks = new ImmutableBitSet[selectedColBlocksBitSet.trueBitCount()];
+
+ for(int i = 0; i < selectedColBlocksBitSet.trueBitCount(); i++) {
+
+ selectedColBlocks[i] = scanRequest.getInfo().getColumnBlock(selectedColBlocksBitSet.trueBitAt(i));
+
+ }
+
+ return selectedColBlocks;
+ }
+
+}
diff --git a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
index 2a7c1ee..def4d8d 100644
--- a/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
+++ b/storage-parquet/src/main/java/org/apache/kylin/storage/parquet/steps/SparkCubeParquet.java
@@ -46,6 +46,7 @@ import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.cube.kv.RowKeyDecoderParquet;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.dimension.AbstractDateDimEnc;
import org.apache.kylin.dimension.DimensionEncoding;
@@ -67,6 +68,8 @@ import org.apache.kylin.measure.basic.BasicMeasureType;
import org.apache.kylin.measure.basic.BigDecimalIngester;
import org.apache.kylin.measure.basic.DoubleIngester;
import org.apache.kylin.measure.basic.LongIngester;
+import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.parquet.example.data.Group;
@@ -90,10 +93,10 @@ import scala.Tuple2;
import java.io.IOException;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
-import java.util.Random;
public class SparkCubeParquet extends AbstractApplication implements Serializable{
@@ -151,6 +154,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
KylinSparkJobListener jobListener = new KylinSparkJobListener();
try (JavaSparkContext sc = new JavaSparkContext(conf)){
sc.sc().addSparkListener(jobListener);
+
+ HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
final SerializableConfiguration sConf = new SerializableConfiguration(sc.hadoopConfiguration());
final KylinConfig envConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
@@ -158,7 +163,6 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
final CubeInstance cubeInstance = CubeManager.getInstance(envConfig).getCube(cubeName);
final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
-
final FileSystem fs = new Path(inputPath).getFileSystem(sc.hadoopConfiguration());
final int totalLevels = cubeSegment.getCuboidScheduler().getBuildLevel();
@@ -175,6 +179,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
saveToParquet(allRDDs[level], metaUrl, cubeName, cubeSegment, outputPath, level, job, envConfig);
}
+ logger.info("HDFS: Number of bytes written={}", jobListener.metrics.getBytesWritten());
+
Map<String, String> counterMap = Maps.newHashMap();
counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, String.valueOf(jobListener.metrics.getBytesWritten()));
@@ -217,11 +223,12 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
}
static class CuboidPartitioner extends Partitioner {
-
private CuboidToPartitionMapping mapping;
+ private boolean enableSharding;
- public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping) {
+ public CuboidPartitioner(CuboidToPartitionMapping cuboidToPartitionMapping, boolean enableSharding) {
this.mapping = cuboidToPartitionMapping;
+ this.enableSharding =enableSharding;
}
@Override
@@ -330,6 +337,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
int partition = (int) (cuboidSize / (rddCut * 10));
partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
partition = Math.min(kylinConfig.getSparkMaxPartition(), partition);
+ logger.info("cuboid:{}, est_size:{}, partitions:{}", cuboidId, cuboidSize, partition);
return partition;
}
@@ -377,6 +385,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
private Map<MeasureDesc, String> meaTypeMap;
private GroupFactory factory;
private BufferedMeasureCodec measureCodec;
+ private BigDecimalSerializer serializer;
+ private int count = 0;
public GenerateGroupRDDFunction(String cubeName, String segmentId, String metaurl, SerializableConfiguration conf, Map<TblColRef, String> colTypeMap, Map<MeasureDesc, String> meaTypeMap) {
this.cubeName = cubeName;
@@ -394,15 +404,15 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
CubeDesc cubeDesc = cubeInstance.getDescriptor();
CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
measureDescs = cubeDesc.getMeasures();
- decoder = new RowKeyDecoder(cubeSegment);
+ decoder = new RowKeyDecoderParquet(cubeSegment);
factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(conf.get()));
measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
+ serializer = new BigDecimalSerializer(DataType.getType("decimal"));
+ initialized = true;
}
@Override
public Tuple2<Void, Group> call(Tuple2<Text, Text> tuple) throws Exception {
-
- logger.debug("call: transfer Text to byte[]");
if (initialized == false) {
synchronized (SparkCubeParquet.class) {
if (initialized == false) {
@@ -412,7 +422,7 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
}
}
- long cuboid = decoder.decode(tuple._1.getBytes());
+ long cuboid = decoder.decode4Parquet(tuple._1.getBytes());
List<String> values = decoder.getValues();
List<TblColRef> columns = decoder.getColumns();
@@ -426,8 +436,9 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
parseColValue(group, column, values.get(i));
}
+ count ++;
- byte[] encodedBytes = tuple._2().copyBytes();
+ byte[] encodedBytes = tuple._2().getBytes();
int[] valueLengths = measureCodec.getCodec().getPeekLength(ByteBuffer.wrap(encodedBytes));
int valueOffset = 0;
@@ -465,11 +476,16 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
}
switch (meaTypeMap.get(measureDesc)) {
case "long":
- group.append(measureDesc.getName(), BytesUtil.readLong(value, offset, length));
+ group.append(measureDesc.getName(), BytesUtil.readVLong(ByteBuffer.wrap(value, offset, length)));
break;
case "double":
group.append(measureDesc.getName(), ByteBuffer.wrap(value, offset, length).getDouble());
break;
+ case "decimal":
+ BigDecimal decimal = serializer.deserialize(ByteBuffer.wrap(value, offset, length));
+ decimal = decimal.setScale(4);
+ group.append(measureDesc.getName(), Binary.fromConstantByteArray(decimal.unscaledValue().toByteArray()));
+ break;
default:
group.append(measureDesc.getName(), Binary.fromConstantByteArray(value, offset, length));
break;
@@ -492,14 +508,8 @@ public class SparkCubeParquet extends AbstractApplication implements Serializabl
colTypeMap.put(colRef, "long");
} else if (dimEnc instanceof FixedLenDimEnc || dimEnc instanceof FixedLenHexDimEnc) {
org.apache.kylin.metadata.datatype.DataType colDataType = colRef.getType();
- if (colDataType.isNumberFamily() || colDataType.isDateTimeFamily()){
- builder.optional(PrimitiveType.PrimitiveTypeName.INT64).named(getColName(colRef));
- colTypeMap.put(colRef, "long");
- } else {
- // stringFamily && default
- builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
- colTypeMap.put(colRef, "string");
- }
+ builder.optional(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(getColName(colRef));
+ colTypeMap.put(colRef, "string");
} else {
builder.optional(PrimitiveType.PrimitiveTypeName.INT32).named(getColName(colRef));
colTypeMap.put(colRef, "int");
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ClassLoaderUtils.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ClassLoaderUtils.java
new file mode 100644
index 0000000..3888883
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ClassLoaderUtils.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URLClassLoader;
+
+public final class ClassLoaderUtils {
+ static URLClassLoader sparkClassLoader = null;
+ static URLClassLoader originClassLoader = null;
+ private static Logger logger = LoggerFactory.getLogger(ClassLoaderUtils.class);
+ private static Boolean isPlus;
+ static {
+ isPlus = true;
+ }
+
+ public static File findFile(String dir, String ptn) {
+ File[] files = new File(dir).listFiles();
+ if (files != null) {
+ for (File f : files) {
+ if (f.getName().matches(ptn))
+ return f;
+ }
+ }
+ return null;
+ }
+
+ public static ClassLoader getSparkClassLoader() {
+ if (!isPlus) {
+ return Thread.currentThread().getContextClassLoader();
+ }
+ if (sparkClassLoader == null) {
+ logger.error("sparkClassLoader not init");
+ return Thread.currentThread().getContextClassLoader();
+ } else {
+ return sparkClassLoader;
+ }
+ }
+
+ public static void setSparkClassLoader(URLClassLoader classLoader) {
+ if (sparkClassLoader != null) {
+ logger.error("sparkClassLoader already initialized");
+ }
+ logger.info("set sparkClassLoader :" + classLoader);
+ if (System.getenv("DEBUG_SPARK_CLASSLOADER") != null) {
+ return;
+ }
+ sparkClassLoader = classLoader;
+ }
+
+ public static ClassLoader getOriginClassLoader() {
+ if (!isPlus) {
+ return Thread.currentThread().getContextClassLoader();
+ }
+ if (originClassLoader == null) {
+ logger.error("originClassLoader not init");
+ return Thread.currentThread().getContextClassLoader();
+ } else {
+ return originClassLoader;
+ }
+ }
+
+ public static void setOriginClassLoader(URLClassLoader classLoader) {
+ if (originClassLoader != null) {
+ logger.error("originClassLoader already initialized");
+ }
+ logger.info("set originClassLoader :" + classLoader);
+ originClassLoader = classLoader;
+ }
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
new file mode 100644
index 0000000..a0c212c
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/DebugTomcatClassLoader.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.apache.catalina.loader.ParallelWebappClassLoader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashSet;
+import java.util.Set;
+
+public class DebugTomcatClassLoader extends ParallelWebappClassLoader {
+ private static final String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
+ // Java standard library:
+ "com.sun.", "launcher.", "javax.", "org.ietf", "java", "org.omg", "org.w3c", "org.xml", "sunw.",
+ // logging
+ "org.slf4j", "org.apache.commons.logging", "org.apache.log4j", "org.apache.catalina", "org.apache.tomcat" };
+ private static final String[] THIS_CL_PRECEDENT_CLASSES = new String[] { "org.apache.kylin",
+ "org.apache.calcite" };
+ private static final String[] CODE_GEN_CLASS = new String[] { "org.apache.spark.sql.catalyst.expressions.Object",
+ "Baz" };
+
+ private static final Set<String> wontFindClasses = new HashSet<>();
+
+ static {
+ wontFindClasses.add("Class");
+ wontFindClasses.add("Object");
+ wontFindClasses.add("org");
+ wontFindClasses.add("java.lang.org");
+ wontFindClasses.add("java.lang$org");
+ wontFindClasses.add("java$lang$org");
+ wontFindClasses.add("org.apache");
+ wontFindClasses.add("org.apache.calcite");
+ wontFindClasses.add("org.apache.calcite.runtime");
+ wontFindClasses.add("org.apache.calcite.linq4j");
+ wontFindClasses.add("Long");
+ wontFindClasses.add("String");
+ }
+
+ private static Logger logger = LoggerFactory.getLogger(DebugTomcatClassLoader.class);
+ private SparkClassLoader sparkClassLoader;
+
+ /**
+ * Creates a DynamicClassLoader that can load classes dynamically
+ * from jar files under a specific folder.
+ *
+ * @param parent the parent ClassLoader to set.
+ */
+ public DebugTomcatClassLoader(ClassLoader parent) throws IOException {
+ super(parent);
+ sparkClassLoader = new SparkClassLoader(this);
+ ClassLoaderUtils.setSparkClassLoader(sparkClassLoader);
+ ClassLoaderUtils.setOriginClassLoader(this);
+ init();
+ }
+
+ public void init() {
+
+ String classPath = System.getProperty("java.class.path");
+ if (classPath == null) {
+ throw new RuntimeException("");
+ }
+ String[] jars = classPath.split(File.pathSeparator);
+ for (String jar : jars) {
+ try {
+ URL url = new File(jar).toURI().toURL();
+ addURL(url);
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ @Override
+ public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ if (isWontFind(name)) {
+ throw new ClassNotFoundException();
+ }
+ if (isCodeGen(name)) {
+ throw new ClassNotFoundException();
+ }
+
+ if (name.startsWith("org.apache.kylin.ext")) {
+ return parent.loadClass(name);
+ }
+
+ if (name.startsWith("org.apache.spark.sql.execution.datasources.sparder.batch.SparderBatchFileFormat")) {
+ return super.loadClass(name, resolve);
+ }
+
+ if (sparkClassLoader.classNeedPreempt(name)) {
+ return sparkClassLoader.loadClass(name);
+ }
+ if (isParentCLPrecedent(name)) {
+ logger.debug("Skipping exempt class " + name + " - delegating directly to parent");
+ return parent.loadClass(name);
+ }
+ return super.loadClass(name, resolve);
+ }
+
+ @Override
+ public InputStream getResourceAsStream(String name) {
+ if (sparkClassLoader.fileNeedPreempt(name)) {
+ return sparkClassLoader.getResourceAsStream(name);
+ }
+ return super.getResourceAsStream(name);
+
+ }
+
+ private boolean isParentCLPrecedent(String name) {
+ for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isWontFind(String name) {
+ return wontFindClasses.contains(name);
+ }
+
+ private boolean isCodeGen(String name) {
+ for (String exemptPrefix : CODE_GEN_CLASS) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
new file mode 100644
index 0000000..0590999
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItClassLoader.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.apache.kylin.ext.ClassLoaderUtils.findFile;
+
+public class ItClassLoader extends URLClassLoader {
+ private static final String[] PARENT_CL_PRECEDENT_CLASS = new String[] {
+ // Java standard library:
+ "com.sun.", "launcher.", "javax.", "org.ietf", "java", "org.omg", "org.w3c", "org.xml", "sunw.",
+ // logging
+ "org.slf4j", "org.apache.commons.logging", "org.apache.log4j", "sun", "org.apache.catalina",
+ "org.apache.tomcat", };
+ private static final String[] THIS_CL_PRECEDENT_CLASS = new String[] { "org.apache.kylin",
+ "org.apache.calcite" };
+ private static final String[] CODE_GEN_CLASS = new String[] { "org.apache.spark.sql.catalyst.expressions.Object" };
+ public static ItClassLoader defaultClassLoad = null;
+ private static Logger logger = LoggerFactory.getLogger(ItClassLoader.class);
+ public ItSparkClassLoader sparkClassLoader;
+ ClassLoader parent;
+
+ /**
+ * Creates a DynamicClassLoader that can load classes dynamically
+ * from jar files under a specific folder.
+ *
+ * @param parent the parent ClassLoader to set.
+ */
+ public ItClassLoader(ClassLoader parent) throws IOException {
+ super(((URLClassLoader) getSystemClassLoader()).getURLs());
+ this.parent = parent;
+ sparkClassLoader = new ItSparkClassLoader(this);
+ ClassLoaderUtils.setSparkClassLoader(sparkClassLoader);
+ ClassLoaderUtils.setOriginClassLoader(this);
+ defaultClassLoad = this;
+ init();
+ }
+
+ public void init() {
+
+ String classPath = System.getProperty("java.class.path");
+ if (classPath == null) {
+ throw new RuntimeException("");
+ }
+ String[] jars = classPath.split(File.pathSeparator);
+ for (String jar : jars) {
+ if (jar.contains("spark-")) {
+ continue;
+ }
+ try {
+ URL url = new File(jar).toURI().toURL();
+ addURL(url);
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+ }
+ String spark_home = System.getenv("SPARK_HOME");
+ try {
+ File sparkJar = findFile(spark_home + "/jars", "spark-yarn_.*.jar");
+ addURL(sparkJar.toURI().toURL());
+ addURL(new File("../examples/test_case_data/sandbox").toURI().toURL());
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ if (isCodeGen(name)) {
+ throw new ClassNotFoundException();
+ }
+ if (name.startsWith("org.apache.kylin.ext")) {
+ return parent.loadClass(name);
+ }
+ if (isThisCLPrecedent(name)) {
+ synchronized (getClassLoadingLock(name)) {
+ // Check whether the class has already been loaded:
+ Class<?> clasz = findLoadedClass(name);
+ if (clasz != null) {
+ logger.debug("Class " + name + " already loaded");
+ } else {
+ try {
+ // Try to find this class using the URLs passed to this ClassLoader
+ logger.debug("Finding class: " + name);
+ clasz = super.findClass(name);
+ } catch (ClassNotFoundException e) {
+ // Class not found using this ClassLoader, so delegate to parent
+ logger.debug("Class " + name + " not found - delegating to parent");
+ try {
+ clasz = parent.loadClass(name);
+ } catch (ClassNotFoundException e2) {
+ // Class not found in this ClassLoader or in the parent ClassLoader
+ // Log some debug output before re-throwing ClassNotFoundException
+ logger.debug("Class " + name + " not found in parent loader");
+ throw e2;
+ }
+ }
+ }
+ return clasz;
+ }
+ }
+ //交换位置 为了让codehua 被父类加载
+ if (isParentCLPrecedent(name)) {
+ logger.debug("Skipping exempt class " + name + " - delegating directly to parent");
+ return parent.loadClass(name);
+ }
+ if (sparkClassLoader.classNeedPreempt(name)) {
+ return sparkClassLoader.loadClass(name);
+ }
+ return super.loadClass(name, resolve);
+ }
+
+ @Override
+ public InputStream getResourceAsStream(String name) {
+ if (sparkClassLoader.fileNeedPreempt(name)) {
+ return sparkClassLoader.getResourceAsStream(name);
+ }
+ return super.getResourceAsStream(name);
+
+ }
+
+ private boolean isParentCLPrecedent(String name) {
+ for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASS) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isThisCLPrecedent(String name) {
+ for (String exemptPrefix : THIS_CL_PRECEDENT_CLASS) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isCodeGen(String name) {
+ for (String exemptPrefix : CODE_GEN_CLASS) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
new file mode 100644
index 0000000..c69ef9c
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/ItSparkClassLoader.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import com.google.common.collect.Sets;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Set;
+
+import static org.apache.kylin.ext.ClassLoaderUtils.findFile;
+
+public class ItSparkClassLoader extends URLClassLoader {
+ private static final String[] SPARK_CL_PREEMPT_CLASSES = new String[] { "org.apache.spark", "scala.",
+ "org.spark_project"
+ // "javax.ws.rs.core.Application",
+ // "javax.ws.rs.core.UriBuilder", "org.glassfish.jersey", "javax.ws.rs.ext"
+ //user javax.ws.rs.api 2.01 not jersey-core-1.9.jar
+ };
+ private static final String[] SPARK_CL_PREEMPT_FILES = new String[] { "spark-version-info.properties",
+ "HiveClientImpl", "org/apache/spark" };
+
+ private static final String[] THIS_CL_PRECEDENT_CLASSES = new String[] { "javax.ws.rs", "org.apache.hadoop.hive" };
+
+ private static final String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
+ // // Java standard library:
+ "com.sun.", "launcher.", "java.", "javax.", "org.ietf", "org.omg", "org.w3c", "org.xml", "sunw.", "sun.",
+ // logging
+ "org.apache.commons.logging", "org.apache.log4j", "com.hadoop", "org.slf4j",
+ // Hadoop/HBase/ZK:
+ "org.apache.hadoop", "org.apache.zookeeper", "org.apache.kylin", "com.intellij",
+ "org.apache.calcite", "org.roaringbitmap", "org.apache.parquet" };
+ private static final Set<String> classNotFoundCache = Sets.newHashSet();
+ private static Logger logger = LoggerFactory.getLogger(ItSparkClassLoader.class);
+
+ /**
+ * Creates a DynamicClassLoader that can load classes dynamically
+ * from jar files under a specific folder.
+ * CubeControllerTest
+ * @param parent the parent ClassLoader to set.
+ */
+ protected ItSparkClassLoader(ClassLoader parent) throws IOException {
+ super(new URL[] {}, parent);
+ init();
+ }
+
+ public void init() throws MalformedURLException {
+ String spark_home = System.getenv("SPARK_HOME");
+ if (spark_home == null) {
+ spark_home = System.getProperty("SPARK_HOME");
+ if (spark_home == null) {
+ throw new RuntimeException(
+ "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
+ }
+ }
+ File file = new File(spark_home + "/jars");
+ File[] jars = file.listFiles();
+ for (File jar : jars) {
+ addURL(jar.toURI().toURL());
+ }
+ File sparkJar = findFile("../storage-parquet/target", "kylin-storage-parquet-.*-SNAPSHOT-spark.jar");
+
+ try {
+ // sparder and query module has org.apache.spark class ,if not add,
+ // that will be load by system classloader
+ // (find class api will be find the parent classloader first,
+ // so ,parent classloader can not load it ,spark class will not found)
+ // why SparkClassLoader is unnecessary?
+ // DebugTomcatClassLoader and TomcatClassLoader find class api will be find itself first
+ // so, parent classloader can load it , spark class will be found
+ addURL(new File("../engine-spark/target/classes").toURI().toURL());
+ addURL(new File("../engine-spark/target/test-classes").toURI().toURL());
+ addURL(new File("../storage-parquet/target/classes").toURI().toURL());
+ addURL(new File("../query/target/classes").toURI().toURL());
+ addURL(new File("../query/target/test-classes").toURI().toURL());
+ addURL(new File("../udf/target/classes").toURI().toURL());
+ System.setProperty("kylin.query.parquet-additional-jars", sparkJar.getCanonicalPath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ if (needToUseGlobal(name)) {
+ logger.debug("Skipping exempt class " + name + " - delegating directly to parent");
+ try {
+ return getParent().loadClass(name);
+ } catch (ClassNotFoundException e) {
+ return super.findClass(name);
+ }
+ }
+
+ synchronized (getClassLoadingLock(name)) {
+ // Check whether the class has already been loaded:
+ Class<?> clasz = findLoadedClass(name);
+ if (clasz != null) {
+ logger.debug("Class " + name + " already loaded");
+ } else {
+ try {
+ // Try to find this class using the URLs passed to this ClassLoader
+ logger.debug("Finding class: " + name);
+ clasz = super.findClass(name);
+ if (clasz == null) {
+ logger.debug("cannot find class" + name);
+ }
+ } catch (ClassNotFoundException e) {
+ classNotFoundCache.add(name);
+ // Class not found using this ClassLoader, so delegate to parent
+ logger.debug("Class " + name + " not found - delegating to parent");
+ try {
+ clasz = getParent().loadClass(name);
+ } catch (ClassNotFoundException e2) {
+ // Class not found in this ClassLoader or in the parent ClassLoader
+ // Log some debug output before re-throwing ClassNotFoundException
+ logger.debug("Class " + name + " not found in parent loader");
+ throw e2;
+ }
+ }
+ }
+ return clasz;
+ }
+ }
+
+ private boolean isThisCLPrecedent(String name) {
+ for (String exemptPrefix : THIS_CL_PRECEDENT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isParentCLPrecedent(String name) {
+ for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean needToUseGlobal(String name) {
+ return !isThisCLPrecedent(name) && !classNeedPreempt(name) && isParentCLPrecedent(name);
+ }
+
+ boolean classNeedPreempt(String name) {
+ if (classNotFoundCache.contains(name)) {
+ return false;
+ }
+ for (String exemptPrefix : SPARK_CL_PREEMPT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ boolean fileNeedPreempt(String name) {
+
+ for (String exemptPrefix : SPARK_CL_PREEMPT_FILES) {
+ if (name.contains(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
new file mode 100644
index 0000000..dba782b
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/SparkClassLoader.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kylin.ext.ClassLoaderUtils.findFile;
+
+public class SparkClassLoader extends URLClassLoader {
+ //preempt these classes from parent
+ private static String[] SPARK_CL_PREEMPT_CLASSES = new String[] { "org.apache.spark", "scala.",
+ "org.spark_project" };
+
+ //preempt these files from parent
+ private static String[] SPARK_CL_PREEMPT_FILES = new String[] { "spark-version-info.properties", "HiveClientImpl",
+ "org/apache/spark" };
+
+ //when loading class (indirectly used by SPARK_CL_PREEMPT_CLASSES), some of them should NOT use parent's first
+ private static String[] THIS_CL_PRECEDENT_CLASSES = new String[] { "javax.ws.rs", "org.apache.hadoop.hive" };
+
+ //when loading class (indirectly used by SPARK_CL_PREEMPT_CLASSES), some of them should use parent's first
+ private static String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
+ // // Java standard library:
+ "com.sun.", "launcher.", "java.", "javax.", "org.ietf", "org.omg", "org.w3c", "org.xml", "sunw.", "sun.",
+ // logging
+ "org.apache.commons.logging", "org.apache.log4j", "org.slf4j", "org.apache.hadoop",
+ // Hadoop/HBase/ZK:
+ "org.apache.kylin", "com.intellij", "org.apache.calcite" };
+
+ private static final Set<String> classNotFoundCache = new HashSet<>();
+ private static Logger logger = LoggerFactory.getLogger(SparkClassLoader.class);
+
+ static {
+ String sparkclassloader_spark_cl_preempt_classes = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_CLASSES");
+ if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_classes)) {
+ SPARK_CL_PREEMPT_CLASSES = StringUtils.split(sparkclassloader_spark_cl_preempt_classes, ",");
+ }
+
+ String sparkclassloader_spark_cl_preempt_files = System.getenv("SPARKCLASSLOADER_SPARK_CL_PREEMPT_FILES");
+ if (!StringUtils.isEmpty(sparkclassloader_spark_cl_preempt_files)) {
+ SPARK_CL_PREEMPT_FILES = StringUtils.split(sparkclassloader_spark_cl_preempt_files, ",");
+ }
+
+ String sparkclassloader_this_cl_precedent_classes = System.getenv("SPARKCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
+ if (!StringUtils.isEmpty(sparkclassloader_this_cl_precedent_classes)) {
+ THIS_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_this_cl_precedent_classes, ",");
+ }
+
+ String sparkclassloader_parent_cl_precedent_classes = System
+ .getenv("SPARKCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
+ if (!StringUtils.isEmpty(sparkclassloader_parent_cl_precedent_classes)) {
+ PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(sparkclassloader_parent_cl_precedent_classes, ",");
+ }
+
+ try {
+ final Method registerParallel = ClassLoader.class.getDeclaredMethod("registerAsParallelCapable");
+ AccessController.doPrivileged(new PrivilegedAction<Object>() {
+ public Object run() {
+ registerParallel.setAccessible(true);
+ return null;
+ }
+ });
+ Boolean result = (Boolean) registerParallel.invoke(null);
+ if (!result) {
+ logger.warn("registrationFailed");
+ }
+ } catch (Exception ignore) {
+
+ }
+ }
+
+ /**
+ * Creates a DynamicClassLoader that can load classes dynamically
+ * from jar files under a specific folder.
+ *
+ * @param parent the parent ClassLoader to set.
+ */
+ protected SparkClassLoader(ClassLoader parent) throws IOException {
+ super(new URL[] {}, parent);
+ init();
+ }
+
+ public void init() throws MalformedURLException {
+ String spark_home = System.getenv("SPARK_HOME");
+ if (spark_home == null) {
+ spark_home = System.getProperty("SPARK_HOME");
+ if (spark_home == null) {
+ throw new RuntimeException(
+ "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
+ }
+ }
+ File file = new File(spark_home + "/jars");
+ File[] jars = file.listFiles();
+ for (File jar : jars) {
+ addURL(jar.toURI().toURL());
+ }
+ if (System.getenv("KYLIN_HOME") != null) {
+ // for prod
+ String kylin_home = System.getenv("KYLIN_HOME");
+ File sparkJar = findFile(kylin_home + "/lib", "kylin-udf-.*.jar");
+ if (sparkJar != null) {
+ logger.info("Add kylin UDF jar to spark classloader : " + sparkJar.getName());
+ addURL(sparkJar.toURI().toURL());
+ } else {
+ logger.warn("Can not found kylin UDF jar, please set KYLIN_HOME and make sure the kylin-udf-*.jar exists in $KYLIN_HOME/lib");
+ }
+ } else if (Files.exists(Paths.get("../udf/target/classes"))) {
+ // for debugtomcat
+ logger.info("Add kylin UDF classes to spark classloader");
+ addURL(new File("../udf/target/classes").toURI().toURL());
+ }
+
+ }
+
+ @Override
+ public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+
+ if (needToUseGlobal(name)) {
+ logger.debug("delegate " + name + " directly to parent");
+ return super.loadClass(name, resolve);
+ }
+ return doLoadclass(name);
+ }
+
+ private Class<?> doLoadclass(String name) throws ClassNotFoundException {
+ synchronized (getClassLoadingLock(name)) {
+ // Check whether the class has already been loaded:
+ Class<?> clasz = findLoadedClass(name);
+ if (clasz != null) {
+ logger.debug("Class " + name + " already loaded");
+ } else {
+ try {
+ // Try to find this class using the URLs passed to this ClassLoader
+ logger.debug("Finding class: " + name);
+ clasz = super.findClass(name);
+ if (clasz == null) {
+ logger.debug("cannot find class" + name);
+ }
+ } catch (ClassNotFoundException e) {
+ classNotFoundCache.add(name);
+ // Class not found using this ClassLoader, so delegate to parent
+ logger.debug("Class " + name + " not found - delegating to parent");
+ try {
+ // sparder and query module has some class start with org.apache.spark,
+ // We need to use some lib that does not exist in spark/jars
+ clasz = getParent().loadClass(name);
+ } catch (ClassNotFoundException e2) {
+ // Class not found in this ClassLoader or in the parent ClassLoader
+ // Log some debug output before re-throwing ClassNotFoundException
+ logger.debug("Class " + name + " not found in parent loader");
+ throw e2;
+ }
+ }
+ }
+ return clasz;
+ }
+ }
+
+ private boolean isThisCLPrecedent(String name) {
+ for (String exemptPrefix : THIS_CL_PRECEDENT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isParentCLPrecedent(String name) {
+ for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean needToUseGlobal(String name) {
+ if (name.startsWith("org.apache.spark.sql.execution.datasources.sparder.batch.SparderBatchFileFormat")) {
+ return true;
+ }
+
+ return !isThisCLPrecedent(name) && !classNeedPreempt(name) && isParentCLPrecedent(name);
+ }
+
+ boolean classNeedPreempt(String name) {
+ if (classNotFoundCache.contains(name)) {
+ return false;
+ }
+ for (String exemptPrefix : SPARK_CL_PREEMPT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ boolean fileNeedPreempt(String name) {
+ for (String exemptPrefix : SPARK_CL_PREEMPT_FILES) {
+ if (name.contains(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java b/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
new file mode 100644
index 0000000..89717ec
--- /dev/null
+++ b/tomcat-ext/src/main/java/org/apache/kylin/ext/TomcatClassLoader.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.ext;
+
+import org.apache.catalina.loader.ParallelWebappClassLoader;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kylin.ext.ClassLoaderUtils.findFile;
+
+public class TomcatClassLoader extends ParallelWebappClassLoader {
+ private static String[] PARENT_CL_PRECEDENT_CLASSES = new String[] {
+ // Java standard library:
+ "com.sun.", "launcher.", "javax.", "org.ietf", "java", "org.omg", "org.w3c", "org.xml", "sunw.",
+ // logging
+ "org.slf4j", "org.apache.commons.logging", "org.apache.log4j", "org.apache.catalina", "org.apache.tomcat" };
+
+ private static String[] THIS_CL_PRECEDENT_CLASSES = new String[] { "org.apache.kylin",
+ "org.apache.calcite" };
+
+ private static String[] CODEGEN_CLASSES = new String[] { "org.apache.spark.sql.catalyst.expressions.Object",
+ "Baz" };
+
+ private static final Set<String> wontFindClasses = new HashSet<>();
+
+ static {
+ String tomcatclassloader_parent_cl_precedent_classes = System
+ .getenv("TOMCATCLASSLOADER_PARENT_CL_PRECEDENT_CLASSES");
+ if (!StringUtils.isEmpty(tomcatclassloader_parent_cl_precedent_classes)) {
+ PARENT_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_parent_cl_precedent_classes, ",");
+ }
+
+ String tomcatclassloader_this_cl_precedent_classes = System
+ .getenv("TOMCATCLASSLOADER_THIS_CL_PRECEDENT_CLASSES");
+ if (!StringUtils.isEmpty(tomcatclassloader_this_cl_precedent_classes)) {
+ THIS_CL_PRECEDENT_CLASSES = StringUtils.split(tomcatclassloader_this_cl_precedent_classes, ",");
+ }
+
+ String tomcatclassloader_codegen_classes = System.getenv("TOMCATCLASSLOADER_CODEGEN_CLASSES");
+ if (!StringUtils.isEmpty(tomcatclassloader_codegen_classes)) {
+ CODEGEN_CLASSES = StringUtils.split(tomcatclassloader_codegen_classes, ",");
+ }
+
+ wontFindClasses.add("Class");
+ wontFindClasses.add("Object");
+ wontFindClasses.add("org");
+ wontFindClasses.add("java.lang.org");
+ wontFindClasses.add("java.lang$org");
+ wontFindClasses.add("java$lang$org");
+ wontFindClasses.add("org.apache");
+ wontFindClasses.add("org.apache.calcite");
+ wontFindClasses.add("org.apache.calcite.runtime");
+ wontFindClasses.add("org.apache.calcite.linq4j");
+ wontFindClasses.add("Long");
+ wontFindClasses.add("String");
+ }
+
+ public static TomcatClassLoader defaultClassLoad = null;
+ private static Logger logger = LoggerFactory.getLogger(TomcatClassLoader.class);
+ public SparkClassLoader sparkClassLoader;
+
+ /**
+ * Creates a DynamicClassLoader that can load classes dynamically
+ * from jar files under a specific folder.
+ *
+ * @param parent the parent ClassLoader to set.
+ */
+ public TomcatClassLoader(ClassLoader parent) throws IOException {
+ super(parent);
+ sparkClassLoader = new SparkClassLoader(this);
+ ClassLoaderUtils.setSparkClassLoader(sparkClassLoader);
+ ClassLoaderUtils.setOriginClassLoader(this);
+ defaultClassLoad = this;
+ init();
+ }
+
+ public void init() {
+ String spark_home = System.getenv("SPARK_HOME");
+ try {
+ // SparkContext use spi to match deploy mode
+ // otherwise SparkContext init fail ,can not find yarn deploy mode
+ File yarnJar = findFile(spark_home + "/jars", "spark-yarn.*.jar");
+ addURL(yarnJar.toURI().toURL());
+ // jersey in spark will attempt find @Path class file in current classloader.
+ // Not possible to delegate to spark loader
+ // otherwise spark web ui executors tab can not render
+ File coreJar = findFile(spark_home + "/jars", "spark-core.*.jar");
+ addURL(coreJar.toURI().toURL());
+ } catch (MalformedURLException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+ // when calcite compile class, some stupid class name will be proposed, not worth to actually lookup
+ if (isWontFind(name)) {
+ throw new ClassNotFoundException();
+ }
+ // spark codegen classload parent is Thread.currentThread().getContextClassLoader()
+ // and calcite baz classloader is EnumerableInterpretable.class's classloader
+ if (isCodeGen(name)) {
+ throw new ClassNotFoundException();
+ }
+ // class loaders should conform to global's
+ if (name.startsWith("org.apache.kylin.ext")) {
+ return parent.loadClass(name);
+ }
+
+ if (name.startsWith("org.apache.spark.sql.execution.datasources.sparder.batch.SparderBatchFileFormat")) {
+ return super.loadClass(name, resolve);
+ }
+
+ // if spark CL needs preempt
+ if (sparkClassLoader.classNeedPreempt(name)) {
+ return sparkClassLoader.loadClass(name);
+ }
+ // tomcat classpath include KAP_HOME/lib , ensure this classload can load kap class
+ if (isParentCLPrecedent(name) && !isThisCLPrecedent(name)) {
+ logger.debug("delegate " + name + " directly to parent");
+ return parent.loadClass(name);
+ }
+ return super.loadClass(name, resolve);
+ }
+
+ @Override
+ public InputStream getResourceAsStream(String name) {
+ if (sparkClassLoader.fileNeedPreempt(name)) {
+ return sparkClassLoader.getResourceAsStream(name);
+ }
+ return super.getResourceAsStream(name);
+
+ }
+
+ private boolean isParentCLPrecedent(String name) {
+ for (String exemptPrefix : PARENT_CL_PRECEDENT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isThisCLPrecedent(String name) {
+ for (String exemptPrefix : THIS_CL_PRECEDENT_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isWontFind(String name) {
+ return wontFindClasses.contains(name);
+ }
+
+ private boolean isCodeGen(String name) {
+ for (String exemptPrefix : CODEGEN_CLASSES) {
+ if (name.startsWith(exemptPrefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/tool/pom.xml b/tool/pom.xml
index 958bd55..1b3e305 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -47,6 +47,10 @@
</dependency>
<dependency>
<groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-storage-parquet</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
<artifactId>kylin-engine-mr</artifactId>
</dependency>
diff --git a/webapp/app/META-INF/context.xml b/webapp/app/META-INF/context.xml
new file mode 100644
index 0000000..0ad90dc
--- /dev/null
+++ b/webapp/app/META-INF/context.xml
@@ -0,0 +1,38 @@
+<?xml version='1.0' encoding='utf-8'?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!-- The contents of this file will be loaded for each web application -->
+<Context allowLinking="true">
+
+ <!-- Default set of monitored resources -->
+ <WatchedResource>WEB-INF/web.xml</WatchedResource>
+
+ <!-- Uncomment this to disable session persistence across Tomcat restarts -->
+ <!--
+ <Manager pathname="" />
+ -->
+
+ <!-- Uncomment this to enable Comet connection tacking (provides events
+ on session expiration as well as webapp lifecycle) -->
+ <!--
+ <Valve className="org.apache.catalina.valves.CometConnectionManagerValve" />
+ -->
+
+ <Loader loaderClass="org.apache.kylin.ext.DebugTomcatClassLoader"/>
+
+</Context>