You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/01/04 10:23:46 UTC
[kylin] branch master updated: Fix ci test
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new c4a3b49 Fix ci test
c4a3b49 is described below
commit c4a3b4948c589b06cb6a483535eebaed0377c7de
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Fri Jan 3 14:25:19 2020 +0800
Fix ci test
---
.../kylin/engine/mr/streaming/RowRecordReader.java | 30 ++++++--------
.../apache/kylin/query/util/PushDownExecutor.java | 47 +++++++++++++++++++++-
.../org/apache/kylin/query/util/PushDownUtil.java | 29 -------------
3 files changed, 58 insertions(+), 48 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
index b4ab6a9..a8808b3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
@@ -113,24 +113,18 @@ public class RowRecordReader extends ColumnarFilesReader {
metricsDataTransformers = Lists.newArrayList();
for (MetricMetaInfo metricMetaInfo : basicCuboidMetaInfo.getMetricsInfo()) {
FSDataInputStream metricsInputStream = fs.open(dataFilePath);
- try {
- MeasureDesc measure = findMeasure(metricMetaInfo.getName());
- DataType metricsDataType = measure.getFunction().getReturnDataType();
- ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory.create(metricsDataType);
- ColumnarStoreMetricsDesc metricsDesc = new ColumnarStoreMetricsDesc(metricsEncoding,
- metricMetaInfo.getCompressionType());
- ColumnDataReader metricsDataReader = metricsDesc.getMetricsReaderFromFSInput(metricsInputStream,
- metricMetaInfo.getStartOffset(), metricMetaInfo.getMetricLength(),
- (int) basicCuboidMetaInfo.getNumberOfRows());
- metricsColumnReaders.add(metricsDataReader);
- metricsColumnReaderItrs.add(metricsDataReader.iterator());
- metricsDataTransformers.add(new MetricsDataTransformer(metricsEncoding.asDataTypeSerializer(),
- DataTypeSerializer.create(metricsDataType)));
- } finally {
- if (null != metricsInputStream) {
- metricsInputStream.close();
- }
- }
+ MeasureDesc measure = findMeasure(metricMetaInfo.getName());
+ DataType metricsDataType = measure.getFunction().getReturnDataType();
+ ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory.create(metricsDataType);
+ ColumnarStoreMetricsDesc metricsDesc = new ColumnarStoreMetricsDesc(metricsEncoding,
+ metricMetaInfo.getCompressionType());
+ ColumnDataReader metricsDataReader = metricsDesc.getMetricsReaderFromFSInput(metricsInputStream,
+ metricMetaInfo.getStartOffset(), metricMetaInfo.getMetricLength(),
+ (int) basicCuboidMetaInfo.getNumberOfRows());
+ metricsColumnReaders.add(metricsDataReader);
+ metricsColumnReaderItrs.add(metricsDataReader.iterator());
+ metricsDataTransformers.add(new MetricsDataTransformer(metricsEncoding.asDataTypeSerializer(),
+ DataTypeSerializer.create(metricsDataType)));
}
rowMetricsValues = new byte[metricsColumnReaders.size()][];
}
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java b/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
index 2db3aea..b4b08cd 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
@@ -18,16 +18,22 @@
package org.apache.kylin.query.util;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.exception.QueryOnCubeException;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
+import org.apache.kylin.metadata.realization.NoRealizationFoundException;
+import org.apache.kylin.metadata.realization.RoutingIndicatorException;
import org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl;
import org.apache.kylin.source.adhocquery.IPushDownRunner;
@@ -53,7 +59,23 @@ public class PushDownExecutor {
String sql, String defaultSchema, SQLException sqlException, boolean isSelect,
boolean isPrepare) throws Exception {
List<String> ids = kylinConfig.getPushDownRunnerIds();
- if (ids.isEmpty() && kylinConfig.getPushDownRunnerClassName() != null) {
+
+ if (kylinConfig.isPushDownEnabled()) {
+ return null;
+ }
+
+ if (isSelect) {
+ logger.info("Query failed to utilize pre-calculation, routing to other engines", sqlException);
+ if (!isExpectedCause(sqlException)) {
+ logger.info("quit doPushDownQuery because prior exception thrown is unexpected");
+ return null;
+ }
+ } else {
+ Preconditions.checkState(sqlException == null);
+ logger.info("Kylin cannot support non-select queries, routing to other engines");
+ }
+
+ if (ids.isEmpty() && StringUtils.isNotEmpty(kylinConfig.getPushDownRunnerClassName())) {
IPushDownRunner runner = (IPushDownRunner) ClassUtil.newInstance(
kylinConfig.getPushDownRunnerClassName()
);
@@ -66,6 +88,29 @@ public class PushDownExecutor {
}
}
+ private static boolean isExpectedCause(SQLException sqlException) {
+ Preconditions.checkArgument(sqlException != null);
+ Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
+
+ //SqlValidatorException is not an excepted exception in the origin design.But in the multi pass scene,
+ //query pushdown may create tables, and the tables are not in the model, so will throw SqlValidatorException.
+ boolean isPushDownUpdateEnabled = KylinConfig.getInstanceFromEnv().isPushDownUpdateEnabled();
+
+ if (!isPushDownUpdateEnabled) {
+ return rootCause != null //
+ && (rootCause instanceof NoRealizationFoundException //
+ || rootCause instanceof RoutingIndicatorException
+ || rootCause instanceof QueryOnCubeException);
+ } else {
+ return (rootCause != null //
+ && (rootCause instanceof NoRealizationFoundException //
+ || rootCause instanceof SqlValidatorException //
+ || rootCause instanceof RoutingIndicatorException //
+ || rootCause instanceof QueryOnCubeException)); //
+ }
+ }
+
+
private Pair<List<List<String>>, List<SelectedColumnMeta>> queryBySingleRunner(IPushDownRunner runner,
String project, String sql, String defaultSchema, SQLException sqlException,
boolean isSelect, boolean isPrepare) throws Exception {
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index 01b6611..4d69272 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -24,7 +24,6 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import com.google.common.base.Preconditions;
import org.apache.calcite.sql.SqlBasicCall;
import org.apache.calcite.sql.SqlCall;
@@ -42,18 +41,12 @@ import org.apache.calcite.sql.SqlWith;
import org.apache.calcite.sql.SqlWithItem;
import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.sql.util.SqlVisitor;
-import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.commons.lang.text.StrBuilder;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.exception.QueryOnCubeException;
import org.apache.kylin.metadata.model.tool.CalciteParser;
import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
-import org.apache.kylin.metadata.realization.NoRealizationFoundException;
-import org.apache.kylin.metadata.realization.RoutingIndicatorException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,28 +71,6 @@ public class PushDownUtil {
return executor.pushDownQuery(project, sql, defaultSchema, null, true, isPrepare);
}
- private static boolean isExpectedCause(SQLException sqlException) {
- Preconditions.checkArgument(sqlException != null);
- Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
-
- //SqlValidatorException is not an excepted exception in the origin design.But in the multi pass scene,
- //query pushdown may create tables, and the tables are not in the model, so will throw SqlValidatorException.
- boolean isPushDownUpdateEnabled = KylinConfig.getInstanceFromEnv().isPushDownUpdateEnabled();
-
- if (!isPushDownUpdateEnabled) {
- return rootCause != null //
- && (rootCause instanceof NoRealizationFoundException //
- || rootCause instanceof RoutingIndicatorException
- || rootCause instanceof QueryOnCubeException);
- } else {
- return (rootCause != null //
- && (rootCause instanceof NoRealizationFoundException //
- || rootCause instanceof SqlValidatorException //
- || rootCause instanceof RoutingIndicatorException //
- || rootCause instanceof QueryOnCubeException)); //
- }
- }
-
static String schemaCompletion(String inputSql, String schema) throws SqlParseException {
if (inputSql == null || inputSql.equals("")) {
return "";