You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/16 07:01:44 UTC

[kylin] 13/15: fix query NPE of fusion model

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 39f87aa8a762f75ddd28f5cd9a3eac13a6391817
Author: binbin.zheng <bi...@kyligence.io>
AuthorDate: Tue Oct 25 19:34:33 2022 +0800

    fix query NPE of fusion model
    
    Co-authored-by: binbin.zheng <bi...@kyligence.io>
---
 src/spark-project/sparder/pom.xml                  |   5 +
 .../apache/spark/sql/KylinDataFrameManager.scala   |  31 +++---
 .../kylin/query/sql/KylinDataFrameManagerTest.java | 116 +++++++++++++++++++++
 3 files changed, 137 insertions(+), 15 deletions(-)

diff --git a/src/spark-project/sparder/pom.xml b/src/spark-project/sparder/pom.xml
index e6f02b8f1d..1cccda51c6 100644
--- a/src/spark-project/sparder/pom.xml
+++ b/src/spark-project/sparder/pom.xml
@@ -128,6 +128,11 @@
             <artifactId>junit-vintage-engine</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-api</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
index d23651eb8f..115e6c9f28 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
@@ -19,10 +19,11 @@
 package org.apache.spark.sql
 
 import java.sql.Timestamp
-import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataflow, NDataflowManager}
-import org.apache.kylin.metadata.model.FusionModelManager
+
 import io.kyligence.kap.secondstorage.SecondStorage
 import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataflow, NDataflowManager}
+import org.apache.kylin.metadata.model.FusionModelManager
 import org.apache.spark.sql.datasource.storage.StorageStoreFactory
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.StructType
@@ -78,26 +79,26 @@ class KylinDataFrameManager(sparkSession: SparkSession) {
     option("pruningInfo", pruningInfo)
     if (dataflow.isStreaming && dataflow.getModel.isFusionModel) {
       val fusionModel = FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv, dataflow.getProject)
-              .getFusionModel(dataflow.getModel.getFusionId)
+        .getFusionModel(dataflow.getModel.getFusionId)
       val batchModelId = fusionModel.getBatchModel.getUuid
       val batchDataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv, dataflow.getProject).getDataflow(batchModelId)
       val end = batchDataflow.getDateRangeEnd
 
       val partition = dataflow.getModel.getPartitionDesc.getPartitionDateColumnRef
       val id = layout.getOrderedDimensions.inverse().get(partition)
-      SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
-        var df = StorageStoreFactory.create(dataflow.getModel.getStorageType)
-          .read(dataflow, layout, sparkSession, extraOptions.toMap)
-        if (end != Long.MinValue) {
-          df = df.filter(col(id.toString).geq(new Timestamp(end)))
-        }
-        df
-      }
-    } else {
-      SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
-        StorageStoreFactory.create(dataflow.getModel.getStorageType)
-          .read(dataflow, layout, sparkSession, extraOptions.toMap)
+      var df = read(dataflow, layout, pruningInfo)
+      if (id != null && end != Long.MinValue) {
+        df = df.filter(col(id.toString).geq(new Timestamp(end)))
       }
+      return df
+    }
+    read(dataflow, layout, pruningInfo)
+  }
+
+  def read(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): DataFrame = {
+    SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
+      StorageStoreFactory.create(dataflow.getModel.getStorageType)
+        .read(dataflow, layout, sparkSession, extraOptions.toMap)
     }
   }
 
diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
new file mode 100644
index 0000000000..7327c443d0
--- /dev/null
+++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.query.sql;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.model.FusionModelManager;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.TimeRange;
+import org.apache.spark.sql.KylinDataFrameManager;
+import org.apache.spark.sql.SparkSession;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.util.ReflectionUtils;
+
+import com.google.common.collect.ImmutableBiMap;
+
+import lombok.val;
+import lombok.var;
+
+@MetadataInfo(project = "streaming_test")
+class KylinDataFrameManagerTest {
+
+    @Test
+    void testCuboidTableOfFusionModel() {
+        val ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate();
+        val config = KylinConfig.getInstanceFromEnv();
+        val dataflowManager = NDataflowManager.getInstance(config, "streaming_test");
+        var dataflow = dataflowManager.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+        Assert.assertTrue(dataflow.isStreaming() && dataflow.getModel().isFusionModel());
+
+        val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss));
+        kylinDataFrameManager.option("isFastBitmapEnabled", "false");
+        {
+            // condition: id != null && end != Long.MinValue
+            val partitionTblCol = dataflow.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            val layoutEntity = Mockito.spy(new LayoutEntity());
+            ImmutableBiMap.Builder<Integer, TblColRef> dimsBuilder = ImmutableBiMap.builder();
+            ImmutableBiMap<Integer, TblColRef> orderedDimensions = dimsBuilder.put(1, partitionTblCol).build();
+            Mockito.when(layoutEntity.getOrderedDimensions()).thenReturn(orderedDimensions);
+            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "3e560d22-b749-48c3-9f64-d4230207f120");
+            Assert.assertEquals(1, df.columns().length);
+        }
+        {
+            // condition: id == null
+            val df = kylinDataFrameManager.cuboidTable(dataflow, new LayoutEntity(),
+                    "3e560d22-b749-48c3-9f64-d4230207f120");
+            Assert.assertEquals(0, df.columns().length);
+        }
+
+        {
+            // condition: end == Long.MinValue
+            val partitionTblCol = dataflow.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            val layoutEntity = Mockito.spy(new LayoutEntity());
+            ImmutableBiMap.Builder<Integer, TblColRef> dimsBuilder = ImmutableBiMap.builder();
+            ImmutableBiMap<Integer, TblColRef> orderedDimensions = dimsBuilder.put(1, partitionTblCol).build();
+            Mockito.when(layoutEntity.getOrderedDimensions()).thenReturn(orderedDimensions);
+            val fusionModel = FusionModelManager.getInstance(config, dataflow.getProject())
+                    .getFusionModel(dataflow.getModel().getFusionId());
+            val batchModelId = fusionModel.getBatchModel().getUuid();
+            val batchDataflow = NDataflowManager.getInstance(config, dataflow.getProject()).getDataflow(batchModelId);
+
+            dataflowManager.updateDataflow(batchDataflow.getId(), updater -> {
+                updater.getSegments().forEach(seg -> {
+                    try {
+                        val timeRange = seg.getTSRange();
+                        val field = TimeRange.class.getDeclaredField("end");
+                        field.setAccessible(true);
+                        ReflectionUtils.setField(field, timeRange, Long.MIN_VALUE);
+                        seg.setTimeRange(timeRange);
+                    } catch (Exception e) {
+                        Assert.fail(e.getMessage());
+                    }
+                });
+            });
+            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "3e560d22-b749-48c3-9f64-d4230207f120");
+            Assert.assertEquals(1, df.columns().length);
+        }
+        ss.stop();
+    }
+
+    @Test
+    void testCuboidTableOfBatchModel() {
+        val ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate();
+        val config = KylinConfig.getInstanceFromEnv();
+        val dataflowManager = NDataflowManager.getInstance(config, "streaming_test");
+        val dataflow = dataflowManager.getDataflow("cd2b9a23-699c-4699-b0dd-38c9412b3dfd");
+        Assert.assertFalse(dataflow.isStreaming());
+        val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss));
+        kylinDataFrameManager.option("isFastBitmapEnabled", "false");
+        val layoutEntity = new LayoutEntity();
+        {
+            val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "86b5daaa-e295-4e8c-b877-f97bda69bee5");
+            Assert.assertEquals(0, df.columns().length);
+        }
+        ss.stop();
+    }
+}