You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:21:09 UTC
[kylin] 17/22: KYLIN-5322 fix select count when out of segment range
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 6cba5e63396e1e2e56703b4c334e7a4df7a23dda
Author: fanshu.kong <17...@qq.com>
AuthorDate: Tue Sep 27 17:09:08 2022 +0800
KYLIN-5322 fix select count when out of segment range
---
.../engine/exec/sparder/SparderQueryPlanExec.java | 10 ++---
.../kylin/query/runtime/plan/TableScanPlan.scala | 43 ++++++++++++++-----
.../query/runtime/plan/SegmentEmptyTest.scala | 50 ++++++++++++++++++++++
.../org/apache/spark/sql/SparderTypeUtil.scala | 13 +++---
4 files changed, 94 insertions(+), 22 deletions(-)
diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
index a9a9963ae4..bbe7d25192 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
@@ -32,8 +32,6 @@ import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.QueryErrorCode;
import org.apache.kylin.common.msg.MsgPicker;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.relnode.OLAPRel;
import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.query.engine.exec.ExecuteResult;
@@ -43,6 +41,8 @@ import org.apache.kylin.query.engine.meta.SimpleDataContext;
import org.apache.kylin.query.relnode.ContextUtil;
import org.apache.kylin.query.relnode.KapContext;
import org.apache.kylin.query.relnode.KapRel;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.relnode.OLAPRel;
import org.apache.kylin.query.runtime.SparkEngine;
import org.apache.kylin.query.util.QueryContextCutter;
import org.apache.spark.SparkException;
@@ -77,7 +77,7 @@ public class SparderQueryPlanExec implements QueryPlanExec {
|| KapConfig.wrap(((SimpleDataContext) dataContext).getKylinConfig()).runConstantQueryLocally()) {
val contexts = ContextUtil.listContexts();
for (OLAPContext context : contexts) {
- if (context.olapSchema != null && context.storageContext.isEmptyLayout()) {
+ if (context.olapSchema != null && context.storageContext.isEmptyLayout() && !context.isHasAgg()) {
QueryContext.fillEmptyResultSetMetrics();
return new ExecuteResult(Lists.newArrayList(), 0);
}
@@ -134,7 +134,7 @@ public class SparderQueryPlanExec implements QueryPlanExec {
QueryContext.current().getSecondStorageUsageMap().clear();
} else if (e instanceof SQLException) {
handleForceToTieredStorage(e);
- }else {
+ } else {
return ExceptionUtils.rethrow(e);
}
}
@@ -186,7 +186,7 @@ public class SparderQueryPlanExec implements QueryPlanExec {
}
private void handleForceToTieredStorage(final Exception e) {
- if (e.getMessage().equals(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE)){
+ if (e.getMessage().equals(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE)) {
ForceToTieredStorage forcedToTieredStorage = QueryContext.current().getForcedToTieredStorage();
boolean forceTableIndex = QueryContext.current().isForceTableIndex();
QueryContext.current().setLastFailed(true);
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
index a10d0a16f3..5c79e0f113 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
@@ -25,23 +25,21 @@ import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate
import org.apache.kylin.metadata.cube.gridtable.NLayoutToGridTableMapping
import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, NDataflow}
import org.apache.kylin.metadata.cube.realization.HybridRealization
-import org.apache.kylin.metadata.model.NTableMetadataManager
-import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil}
import org.apache.kylin.metadata.model._
+import org.apache.kylin.metadata.realization.IRealization
import org.apache.kylin.metadata.tuple.TupleInfo
+import org.apache.kylin.query.implicits.sessionToQueryContext
+import org.apache.kylin.query.relnode.{KapRel, OLAPContext}
+import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil}
import org.apache.spark.sql.execution.utils.SchemaProcessor
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.manager.SparderLookupManager
import org.apache.spark.sql.types.{ArrayType, DoubleType, StructField, StructType}
import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.spark.sql.{DataFrame, _}
+import org.apache.spark.sql._
import java.util.concurrent.ConcurrentHashMap
import java.{lang, util}
-import org.apache.kylin.metadata.realization.IRealization
-import org.apache.kylin.query.implicits.sessionToQueryContext
-import org.apache.kylin.query.relnode.{KapRel, OLAPContext}
-
import scala.collection.JavaConverters._
// scalastyle:off
@@ -71,7 +69,8 @@ object TableScanPlan extends LogEx {
val realizations = olapContext.realization.getRealizations.asScala.toList
realizations.map(_.asInstanceOf[NDataflow])
.filter(dataflow => (!dataflow.isStreaming && !context.isBatchCandidateEmpty) ||
- (dataflow.isStreaming && !context.isStreamCandidateEmpty))
+ (dataflow.isStreaming && !context.isStreamCandidateEmpty) ||
+ isSegmentsEmpty(prunedSegments, prunedStreamingSegments))
.map(dataflow => {
if (dataflow.isStreaming) {
tableScan(rel, dataflow, olapContext, session, prunedStreamingSegments, context.getStreamingCandidate)
@@ -81,13 +80,33 @@ object TableScanPlan extends LogEx {
}).reduce(_.union(_))
}
+ // prunedSegments is null
+ def tableScanEmptySegment(rel: KapRel): DataFrame = {
+ logInfo("prunedSegments is null")
+ val df = SparkOperation.createEmptyDataFrame(
+ StructType(
+ rel.getColumnRowType.getAllColumns.asScala
+ .map(column =>
+ StructField(column.toString.replaceAll("\\.", "_"), SparderTypeUtil.toSparkType(column.getType)))))
+ val cols = df.schema.map(structField => {
+ col(structField.name)
+ })
+ df.select(cols: _*)
+ }
+
+ def isSegmentsEmpty(prunedSegments: util.List[NDataSegment], prunedStreamingSegments: util.List[NDataSegment]): Boolean = {
+ val isPrunedSegmentsEmpty = prunedSegments == null || prunedSegments.size() == 0
+ val isPrunedStreamingSegmentsEmpty = prunedStreamingSegments == null || prunedStreamingSegments.size() == 0
+ isPrunedSegmentsEmpty && isPrunedStreamingSegmentsEmpty
+ }
+
def tableScan(rel: KapRel, dataflow: NDataflow, olapContext: OLAPContext,
session: SparkSession, prunedSegments: util.List[NDataSegment], candidate: NLayoutCandidate): DataFrame = {
val prunedPartitionMap = olapContext.storageContext.getPrunedPartitions
olapContext.resetSQLDigest()
//TODO: refactor
val cuboidLayout = candidate.getLayoutEntity
- if (cuboidLayout.getIndex.isTableIndex) {
+ if (cuboidLayout.getIndex != null && cuboidLayout.getIndex.isTableIndex) {
QueryContext.current().getQueryTagInfo.setTableIndex(true)
}
val tableName = olapContext.firstTableScan.getBackupAlias
@@ -97,6 +116,9 @@ object TableScanPlan extends LogEx {
/////////////////////////////////////////////
val kapConfig = KapConfig.wrap(dataflow.getConfig)
val basePath = kapConfig.getReadParquetStoragePath(dataflow.getProject)
+ if (prunedSegments == null || prunedSegments.size() == 0) {
+ return tableScanEmptySegment(rel: KapRel)
+ }
val fileList = prunedSegments.asScala.map(
seg => toLayoutPath(dataflow, cuboidLayout.getId, basePath, seg, prunedPartitionMap)
)
@@ -366,8 +388,7 @@ object TableScanPlan extends LogEx {
val session = SparderEnv.getSparkSession
val olapContext = rel.getContext
var instance: IRealization = null
- if (olapContext.realization.isInstanceOf[NDataflow])
- {
+ if (olapContext.realization.isInstanceOf[NDataflow]) {
instance = olapContext.realization.asInstanceOf[NDataflow]
} else {
instance = olapContext.realization.asInstanceOf[HybridRealization]
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala
new file mode 100644
index 0000000000..03e0577013
--- /dev/null
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.runtime.plan
+
+import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite}
+import org.junit.Assert
+
+import java.util
+
+class SegmentEmptyTest extends SparderBaseFunSuite with SharedSparkSession with LocalMetadata {
+
+ val prunedSegment1 = null
+ val prunedSegment2 = new util.LinkedList[NDataSegment]
+ val prunedSegment3 = new util.LinkedList[NDataSegment]
+ prunedSegment3.add(new NDataSegment())
+
+ val prunedStreamingSegment1 = null
+ val prunedStreamingSegment2 = new util.LinkedList[NDataSegment]
+ val prunedStreamingSegment3 = new util.LinkedList[NDataSegment]
+ prunedStreamingSegment3.add(new NDataSegment())
+
+ Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment1))
+ Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment2))
+ Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment3))
+
+ Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment1))
+ Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment2))
+ Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment3))
+
+ Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment1))
+ Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment2))
+ Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment3))
+}
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
index 2a2d1d02ab..6791718885 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
@@ -18,11 +18,6 @@
package org.apache.spark.sql.util
-import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
-import java.math.BigDecimal
-import java.sql.{Date, Timestamp, Types}
-import java.time.ZoneId
-import java.util.{GregorianCalendar, Locale, TimeZone}
import org.apache.calcite.avatica.util.TimeUnitRange
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex.RexLiteral
@@ -33,7 +28,7 @@ import org.apache.kylin.common.util.DateFormat
import org.apache.kylin.metadata.datatype.DataType
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.expressions.{Base64, Cast}
+import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParserUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.functions._
@@ -41,6 +36,11 @@ import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.springframework.util.Base64Utils
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
+import java.math.BigDecimal
+import java.sql.{Date, Timestamp, Types}
+import java.time.ZoneId
+import java.util.{GregorianCalendar, Locale, TimeZone}
import scala.collection.{immutable, mutable}
object SparderTypeUtil extends Logging {
@@ -117,6 +117,7 @@ object SparderTypeUtil extends Logging {
case tp if tp.startsWith("extendedcolumn") => BinaryType
case tp if tp.startsWith("percentile") => BinaryType
case tp if tp.startsWith("raw") => BinaryType
+ case "any" => StringType
case _ => throw new IllegalArgumentException(dataTp.toString)
}
}