You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/05 10:21:09 UTC

[kylin] 17/22: KYLIN-5322 fix select count when out of segment range

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6cba5e63396e1e2e56703b4c334e7a4df7a23dda
Author: fanshu.kong <17...@qq.com>
AuthorDate: Tue Sep 27 17:09:08 2022 +0800

    KYLIN-5322 fix select count when out of segment range
---
 .../engine/exec/sparder/SparderQueryPlanExec.java  | 10 ++---
 .../kylin/query/runtime/plan/TableScanPlan.scala   | 43 ++++++++++++++-----
 .../query/runtime/plan/SegmentEmptyTest.scala      | 50 ++++++++++++++++++++++
 .../org/apache/spark/sql/SparderTypeUtil.scala     | 13 +++---
 4 files changed, 94 insertions(+), 22 deletions(-)

diff --git a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
index a9a9963ae4..bbe7d25192 100644
--- a/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
+++ b/src/query/src/main/java/org/apache/kylin/query/engine/exec/sparder/SparderQueryPlanExec.java
@@ -32,8 +32,6 @@ import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.exception.QueryErrorCode;
 import org.apache.kylin.common.msg.MsgPicker;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.relnode.OLAPRel;
 import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.query.engine.exec.ExecuteResult;
@@ -43,6 +41,8 @@ import org.apache.kylin.query.engine.meta.SimpleDataContext;
 import org.apache.kylin.query.relnode.ContextUtil;
 import org.apache.kylin.query.relnode.KapContext;
 import org.apache.kylin.query.relnode.KapRel;
+import org.apache.kylin.query.relnode.OLAPContext;
+import org.apache.kylin.query.relnode.OLAPRel;
 import org.apache.kylin.query.runtime.SparkEngine;
 import org.apache.kylin.query.util.QueryContextCutter;
 import org.apache.spark.SparkException;
@@ -77,7 +77,7 @@ public class SparderQueryPlanExec implements QueryPlanExec {
                 || KapConfig.wrap(((SimpleDataContext) dataContext).getKylinConfig()).runConstantQueryLocally()) {
             val contexts = ContextUtil.listContexts();
             for (OLAPContext context : contexts) {
-                if (context.olapSchema != null && context.storageContext.isEmptyLayout()) {
+                if (context.olapSchema != null && context.storageContext.isEmptyLayout() && !context.isHasAgg()) {
                     QueryContext.fillEmptyResultSetMetrics();
                     return new ExecuteResult(Lists.newArrayList(), 0);
                 }
@@ -134,7 +134,7 @@ public class SparderQueryPlanExec implements QueryPlanExec {
                 QueryContext.current().getSecondStorageUsageMap().clear();
             } else if (e instanceof SQLException) {
                 handleForceToTieredStorage(e);
-            }else {
+            } else {
                 return ExceptionUtils.rethrow(e);
             }
         }
@@ -186,7 +186,7 @@ public class SparderQueryPlanExec implements QueryPlanExec {
     }
 
     private void handleForceToTieredStorage(final Exception e) {
-        if (e.getMessage().equals(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE)){
+        if (e.getMessage().equals(QueryContext.ROUTE_USE_FORCEDTOTIEREDSTORAGE)) {
             ForceToTieredStorage forcedToTieredStorage = QueryContext.current().getForcedToTieredStorage();
             boolean forceTableIndex = QueryContext.current().isForceTableIndex();
             QueryContext.current().setLastFailed(true);
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
index a10d0a16f3..5c79e0f113 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/TableScanPlan.scala
@@ -25,23 +25,21 @@ import org.apache.kylin.metadata.cube.cuboid.NLayoutCandidate
 import org.apache.kylin.metadata.cube.gridtable.NLayoutToGridTableMapping
 import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataSegment, NDataflow}
 import org.apache.kylin.metadata.cube.realization.HybridRealization
-import org.apache.kylin.metadata.model.NTableMetadataManager
-import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil}
 import org.apache.kylin.metadata.model._
+import org.apache.kylin.metadata.realization.IRealization
 import org.apache.kylin.metadata.tuple.TupleInfo
+import org.apache.kylin.query.implicits.sessionToQueryContext
+import org.apache.kylin.query.relnode.{KapRel, OLAPContext}
+import org.apache.kylin.query.util.{RuntimeHelper, SparderDerivedUtil}
 import org.apache.spark.sql.execution.utils.SchemaProcessor
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.manager.SparderLookupManager
 import org.apache.spark.sql.types.{ArrayType, DoubleType, StructField, StructType}
 import org.apache.spark.sql.util.SparderTypeUtil
-import org.apache.spark.sql.{DataFrame, _}
+import org.apache.spark.sql._
 
 import java.util.concurrent.ConcurrentHashMap
 import java.{lang, util}
-import org.apache.kylin.metadata.realization.IRealization
-import org.apache.kylin.query.implicits.sessionToQueryContext
-import org.apache.kylin.query.relnode.{KapRel, OLAPContext}
-
 import scala.collection.JavaConverters._
 
 // scalastyle:off
@@ -71,7 +69,8 @@ object TableScanPlan extends LogEx {
     val realizations = olapContext.realization.getRealizations.asScala.toList
     realizations.map(_.asInstanceOf[NDataflow])
       .filter(dataflow => (!dataflow.isStreaming && !context.isBatchCandidateEmpty) ||
-        (dataflow.isStreaming && !context.isStreamCandidateEmpty))
+        (dataflow.isStreaming && !context.isStreamCandidateEmpty) ||
+        isSegmentsEmpty(prunedSegments, prunedStreamingSegments))
       .map(dataflow => {
         if (dataflow.isStreaming) {
           tableScan(rel, dataflow, olapContext, session, prunedStreamingSegments, context.getStreamingCandidate)
@@ -81,13 +80,33 @@ object TableScanPlan extends LogEx {
       }).reduce(_.union(_))
   }
 
+  // prunedSegments is null
+  def tableScanEmptySegment(rel: KapRel): DataFrame = {
+    logInfo("prunedSegments is null")
+    val df = SparkOperation.createEmptyDataFrame(
+      StructType(
+        rel.getColumnRowType.getAllColumns.asScala
+          .map(column =>
+            StructField(column.toString.replaceAll("\\.", "_"), SparderTypeUtil.toSparkType(column.getType)))))
+    val cols = df.schema.map(structField => {
+      col(structField.name)
+    })
+    df.select(cols: _*)
+  }
+
+  def isSegmentsEmpty(prunedSegments: util.List[NDataSegment], prunedStreamingSegments: util.List[NDataSegment]): Boolean = {
+    val isPrunedSegmentsEmpty = prunedSegments == null || prunedSegments.size() == 0
+    val isPrunedStreamingSegmentsEmpty = prunedStreamingSegments == null || prunedStreamingSegments.size() == 0
+    isPrunedSegmentsEmpty && isPrunedStreamingSegmentsEmpty
+  }
+
   def tableScan(rel: KapRel, dataflow: NDataflow, olapContext: OLAPContext,
                 session: SparkSession, prunedSegments: util.List[NDataSegment], candidate: NLayoutCandidate): DataFrame = {
     val prunedPartitionMap = olapContext.storageContext.getPrunedPartitions
     olapContext.resetSQLDigest()
     //TODO: refactor
     val cuboidLayout = candidate.getLayoutEntity
-    if (cuboidLayout.getIndex.isTableIndex) {
+    if (cuboidLayout.getIndex != null && cuboidLayout.getIndex.isTableIndex) {
       QueryContext.current().getQueryTagInfo.setTableIndex(true)
     }
     val tableName = olapContext.firstTableScan.getBackupAlias
@@ -97,6 +116,9 @@ object TableScanPlan extends LogEx {
     /////////////////////////////////////////////
     val kapConfig = KapConfig.wrap(dataflow.getConfig)
     val basePath = kapConfig.getReadParquetStoragePath(dataflow.getProject)
+    if (prunedSegments == null || prunedSegments.size() == 0) {
+      return tableScanEmptySegment(rel: KapRel)
+    }
     val fileList = prunedSegments.asScala.map(
       seg => toLayoutPath(dataflow, cuboidLayout.getId, basePath, seg, prunedPartitionMap)
     )
@@ -366,8 +388,7 @@ object TableScanPlan extends LogEx {
     val session = SparderEnv.getSparkSession
     val olapContext = rel.getContext
     var instance: IRealization = null
-    if (olapContext.realization.isInstanceOf[NDataflow])
-    {
+    if (olapContext.realization.isInstanceOf[NDataflow]) {
       instance = olapContext.realization.asInstanceOf[NDataflow]
     } else {
       instance = olapContext.realization.asInstanceOf[HybridRealization]
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala
new file mode 100644
index 0000000000..03e0577013
--- /dev/null
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/runtime/plan/SegmentEmptyTest.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.runtime.plan
+
+import org.apache.kylin.metadata.cube.model.NDataSegment
+import org.apache.spark.sql.common.{LocalMetadata, SharedSparkSession, SparderBaseFunSuite}
+import org.junit.Assert
+
+import java.util
+
+class SegmentEmptyTest extends SparderBaseFunSuite with SharedSparkSession with LocalMetadata {
+
+    val prunedSegment1 = null
+    val prunedSegment2 = new util.LinkedList[NDataSegment]
+    val prunedSegment3 = new util.LinkedList[NDataSegment]
+    prunedSegment3.add(new NDataSegment())
+
+    val prunedStreamingSegment1 = null
+    val prunedStreamingSegment2 = new util.LinkedList[NDataSegment]
+    val prunedStreamingSegment3 = new util.LinkedList[NDataSegment]
+    prunedStreamingSegment3.add(new NDataSegment())
+
+    Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment1))
+    Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment2))
+    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment1, prunedStreamingSegment3))
+
+    Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment1))
+    Assert.assertTrue(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment2))
+    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment2, prunedStreamingSegment3))
+
+    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment1))
+    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment2))
+    Assert.assertFalse(TableScanPlan.isSegmentsEmpty(prunedSegment3, prunedStreamingSegment3))
+}
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
index 2a2d1d02ab..6791718885 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/spark/sql/SparderTypeUtil.scala
@@ -18,11 +18,6 @@
 
 package org.apache.spark.sql.util
 
-import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
-import java.math.BigDecimal
-import java.sql.{Date, Timestamp, Types}
-import java.time.ZoneId
-import java.util.{GregorianCalendar, Locale, TimeZone}
 import org.apache.calcite.avatica.util.TimeUnitRange
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex.RexLiteral
@@ -33,7 +28,7 @@ import org.apache.kylin.common.util.DateFormat
 import org.apache.kylin.metadata.datatype.DataType
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Column
-import org.apache.spark.sql.catalyst.expressions.{Base64, Cast}
+import org.apache.spark.sql.catalyst.expressions.Cast
 import org.apache.spark.sql.catalyst.parser.ParserUtils
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.functions._
@@ -41,6 +36,11 @@ import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.springframework.util.Base64Utils
 
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Long => JLong, Short => JShort}
+import java.math.BigDecimal
+import java.sql.{Date, Timestamp, Types}
+import java.time.ZoneId
+import java.util.{GregorianCalendar, Locale, TimeZone}
 import scala.collection.{immutable, mutable}
 
 object SparderTypeUtil extends Logging {
@@ -117,6 +117,7 @@ object SparderTypeUtil extends Logging {
       case tp if tp.startsWith("extendedcolumn") => BinaryType
       case tp if tp.startsWith("percentile") => BinaryType
       case tp if tp.startsWith("raw") => BinaryType
+      case "any" => StringType
       case _ => throw new IllegalArgumentException(dataTp.toString)
     }
   }