You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/01/04 10:23:46 UTC

[kylin] branch master updated: Fix ci test

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new c4a3b49  Fix ci test
c4a3b49 is described below

commit c4a3b4948c589b06cb6a483535eebaed0377c7de
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Fri Jan 3 14:25:19 2020 +0800

    Fix ci test
---
 .../kylin/engine/mr/streaming/RowRecordReader.java | 30 ++++++--------
 .../apache/kylin/query/util/PushDownExecutor.java  | 47 +++++++++++++++++++++-
 .../org/apache/kylin/query/util/PushDownUtil.java  | 29 -------------
 3 files changed, 58 insertions(+), 48 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
index b4ab6a9..a8808b3 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/RowRecordReader.java
@@ -113,24 +113,18 @@ public class RowRecordReader extends ColumnarFilesReader {
         metricsDataTransformers = Lists.newArrayList();
         for (MetricMetaInfo metricMetaInfo : basicCuboidMetaInfo.getMetricsInfo()) {
             FSDataInputStream metricsInputStream = fs.open(dataFilePath);
-            try {
-                MeasureDesc measure = findMeasure(metricMetaInfo.getName());
-                DataType metricsDataType = measure.getFunction().getReturnDataType();
-                ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory.create(metricsDataType);
-                ColumnarStoreMetricsDesc metricsDesc = new ColumnarStoreMetricsDesc(metricsEncoding,
-                        metricMetaInfo.getCompressionType());
-                ColumnDataReader metricsDataReader = metricsDesc.getMetricsReaderFromFSInput(metricsInputStream,
-                        metricMetaInfo.getStartOffset(), metricMetaInfo.getMetricLength(),
-                        (int) basicCuboidMetaInfo.getNumberOfRows());
-                metricsColumnReaders.add(metricsDataReader);
-                metricsColumnReaderItrs.add(metricsDataReader.iterator());
-                metricsDataTransformers.add(new MetricsDataTransformer(metricsEncoding.asDataTypeSerializer(),
-                        DataTypeSerializer.create(metricsDataType)));
-            } finally {
-                if (null != metricsInputStream) {
-                    metricsInputStream.close();
-                }
-            }
+            MeasureDesc measure = findMeasure(metricMetaInfo.getName());
+            DataType metricsDataType = measure.getFunction().getReturnDataType();
+            ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory.create(metricsDataType);
+            ColumnarStoreMetricsDesc metricsDesc = new ColumnarStoreMetricsDesc(metricsEncoding,
+                    metricMetaInfo.getCompressionType());
+            ColumnDataReader metricsDataReader = metricsDesc.getMetricsReaderFromFSInput(metricsInputStream,
+                    metricMetaInfo.getStartOffset(), metricMetaInfo.getMetricLength(),
+                    (int) basicCuboidMetaInfo.getNumberOfRows());
+            metricsColumnReaders.add(metricsDataReader);
+            metricsColumnReaderItrs.add(metricsDataReader.iterator());
+            metricsDataTransformers.add(new MetricsDataTransformer(metricsEncoding.asDataTypeSerializer(),
+                    DataTypeSerializer.create(metricsDataType)));
         }
         rowMetricsValues = new byte[metricsColumnReaders.size()][];
     }
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java b/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
index 2db3aea..b4b08cd 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownExecutor.java
@@ -18,16 +18,22 @@
 
 package org.apache.kylin.query.util;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import org.apache.calcite.sql.parser.SqlParseException;
 
+import org.apache.calcite.sql.validate.SqlValidatorException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.exception.QueryOnCubeException;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
+import org.apache.kylin.metadata.realization.NoRealizationFoundException;
+import org.apache.kylin.metadata.realization.RoutingIndicatorException;
 import org.apache.kylin.query.adhoc.PushDownRunnerJdbcImpl;
 import org.apache.kylin.source.adhocquery.IPushDownRunner;
 
@@ -53,7 +59,23 @@ public class PushDownExecutor {
             String sql, String defaultSchema, SQLException sqlException, boolean isSelect,
             boolean isPrepare) throws Exception {
         List<String> ids = kylinConfig.getPushDownRunnerIds();
-        if (ids.isEmpty() && kylinConfig.getPushDownRunnerClassName() != null) {
+
+        if (kylinConfig.isPushDownEnabled()) {
+            return null;
+        }
+
+        if (isSelect) {
+            logger.info("Query failed to utilize pre-calculation, routing to other engines", sqlException);
+            if (!isExpectedCause(sqlException)) {
+                logger.info("quit doPushDownQuery because prior exception thrown is unexpected");
+                return null;
+            }
+        } else {
+            Preconditions.checkState(sqlException == null);
+            logger.info("Kylin cannot support non-select queries, routing to other engines");
+        }
+
+        if (ids.isEmpty() && StringUtils.isNotEmpty(kylinConfig.getPushDownRunnerClassName())) {
             IPushDownRunner runner = (IPushDownRunner) ClassUtil.newInstance(
                     kylinConfig.getPushDownRunnerClassName()
             );
@@ -66,6 +88,29 @@ public class PushDownExecutor {
         }
     }
 
+    private static boolean isExpectedCause(SQLException sqlException) {
+        Preconditions.checkArgument(sqlException != null);
+        Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
+
+        //SqlValidatorException is not an excepted exception in the origin design.But in the multi pass scene,
+        //query pushdown may create tables, and the tables are not in the model, so will throw SqlValidatorException.
+        boolean isPushDownUpdateEnabled = KylinConfig.getInstanceFromEnv().isPushDownUpdateEnabled();
+
+        if (!isPushDownUpdateEnabled) {
+            return rootCause != null //
+                    && (rootCause instanceof NoRealizationFoundException //
+                    || rootCause instanceof RoutingIndicatorException
+                    || rootCause instanceof QueryOnCubeException);
+        } else {
+            return (rootCause != null //
+                    && (rootCause instanceof NoRealizationFoundException //
+                    || rootCause instanceof SqlValidatorException //
+                    || rootCause instanceof RoutingIndicatorException //
+                    || rootCause instanceof QueryOnCubeException)); //
+        }
+    }
+
+
     private Pair<List<List<String>>, List<SelectedColumnMeta>> queryBySingleRunner(IPushDownRunner runner,
             String project, String sql, String defaultSchema, SQLException sqlException,
             boolean isSelect, boolean isPrepare) throws Exception {
diff --git a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
index 01b6611..4d69272 100644
--- a/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/PushDownUtil.java
@@ -24,7 +24,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 
-import com.google.common.base.Preconditions;
 
 import org.apache.calcite.sql.SqlBasicCall;
 import org.apache.calcite.sql.SqlCall;
@@ -42,18 +41,12 @@ import org.apache.calcite.sql.SqlWith;
 import org.apache.calcite.sql.SqlWithItem;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.util.SqlVisitor;
-import org.apache.calcite.sql.validate.SqlValidatorException;
 
 import org.apache.commons.lang.text.StrBuilder;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.exception.QueryOnCubeException;
 import org.apache.kylin.metadata.model.tool.CalciteParser;
 import org.apache.kylin.metadata.querymeta.SelectedColumnMeta;
-import org.apache.kylin.metadata.realization.NoRealizationFoundException;
-import org.apache.kylin.metadata.realization.RoutingIndicatorException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,28 +71,6 @@ public class PushDownUtil {
         return executor.pushDownQuery(project, sql, defaultSchema, null, true, isPrepare);
     }
 
-    private static boolean isExpectedCause(SQLException sqlException) {
-        Preconditions.checkArgument(sqlException != null);
-        Throwable rootCause = ExceptionUtils.getRootCause(sqlException);
-
-        //SqlValidatorException is not an excepted exception in the origin design.But in the multi pass scene,
-        //query pushdown may create tables, and the tables are not in the model, so will throw SqlValidatorException.
-        boolean isPushDownUpdateEnabled = KylinConfig.getInstanceFromEnv().isPushDownUpdateEnabled();
-
-        if (!isPushDownUpdateEnabled) {
-            return rootCause != null //
-                    && (rootCause instanceof NoRealizationFoundException //
-                            || rootCause instanceof RoutingIndicatorException
-                            || rootCause instanceof QueryOnCubeException);
-        } else {
-            return (rootCause != null //
-                    && (rootCause instanceof NoRealizationFoundException //
-                            || rootCause instanceof SqlValidatorException //
-                            || rootCause instanceof RoutingIndicatorException //
-                            || rootCause instanceof QueryOnCubeException)); //
-        }
-    }
-
     static String schemaCompletion(String inputSql, String schema) throws SqlParseException {
         if (inputSql == null || inputSql.equals("")) {
             return "";