You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/16 07:01:44 UTC
[kylin] 13/15: fix query NPE of fusion model
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 39f87aa8a762f75ddd28f5cd9a3eac13a6391817
Author: binbin.zheng <bi...@kyligence.io>
AuthorDate: Tue Oct 25 19:34:33 2022 +0800
fix query NPE of fusion model
Co-authored-by: binbin.zheng <bi...@kyligence.io>
---
src/spark-project/sparder/pom.xml | 5 +
.../apache/spark/sql/KylinDataFrameManager.scala | 31 +++---
.../kylin/query/sql/KylinDataFrameManagerTest.java | 116 +++++++++++++++++++++
3 files changed, 137 insertions(+), 15 deletions(-)
diff --git a/src/spark-project/sparder/pom.xml b/src/spark-project/sparder/pom.xml
index e6f02b8f1d..1cccda51c6 100644
--- a/src/spark-project/sparder/pom.xml
+++ b/src/spark-project/sparder/pom.xml
@@ -128,6 +128,11 @@
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
index d23651eb8f..115e6c9f28 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinDataFrameManager.scala
@@ -19,10 +19,11 @@
package org.apache.spark.sql
import java.sql.Timestamp
-import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataflow, NDataflowManager}
-import org.apache.kylin.metadata.model.FusionModelManager
+
import io.kyligence.kap.secondstorage.SecondStorage
import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.metadata.cube.model.{LayoutEntity, NDataflow, NDataflowManager}
+import org.apache.kylin.metadata.model.FusionModelManager
import org.apache.spark.sql.datasource.storage.StorageStoreFactory
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.StructType
@@ -78,26 +79,26 @@ class KylinDataFrameManager(sparkSession: SparkSession) {
option("pruningInfo", pruningInfo)
if (dataflow.isStreaming && dataflow.getModel.isFusionModel) {
val fusionModel = FusionModelManager.getInstance(KylinConfig.getInstanceFromEnv, dataflow.getProject)
- .getFusionModel(dataflow.getModel.getFusionId)
+ .getFusionModel(dataflow.getModel.getFusionId)
val batchModelId = fusionModel.getBatchModel.getUuid
val batchDataflow = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv, dataflow.getProject).getDataflow(batchModelId)
val end = batchDataflow.getDateRangeEnd
val partition = dataflow.getModel.getPartitionDesc.getPartitionDateColumnRef
val id = layout.getOrderedDimensions.inverse().get(partition)
- SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
- var df = StorageStoreFactory.create(dataflow.getModel.getStorageType)
- .read(dataflow, layout, sparkSession, extraOptions.toMap)
- if (end != Long.MinValue) {
- df = df.filter(col(id.toString).geq(new Timestamp(end)))
- }
- df
- }
- } else {
- SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
- StorageStoreFactory.create(dataflow.getModel.getStorageType)
- .read(dataflow, layout, sparkSession, extraOptions.toMap)
+ var df = read(dataflow, layout, pruningInfo)
+ if (id != null && end != Long.MinValue) {
+ df = df.filter(col(id.toString).geq(new Timestamp(end)))
}
+ return df
+ }
+ read(dataflow, layout, pruningInfo)
+ }
+
+ def read(dataflow: NDataflow, layout: LayoutEntity, pruningInfo: String): DataFrame = {
+ SecondStorage.trySecondStorage(sparkSession, dataflow, layout, pruningInfo).getOrElse {
+ StorageStoreFactory.create(dataflow.getModel.getStorageType)
+ .read(dataflow, layout, sparkSession, extraOptions.toMap)
}
}
diff --git a/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
new file mode 100644
index 0000000000..7327c443d0
--- /dev/null
+++ b/src/spark-project/sparder/src/test/java/org/apache/kylin/query/sql/KylinDataFrameManagerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.query.sql;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.junit.annotation.MetadataInfo;
+import org.apache.kylin.metadata.cube.model.LayoutEntity;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.model.FusionModelManager;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.TimeRange;
+import org.apache.spark.sql.KylinDataFrameManager;
+import org.apache.spark.sql.SparkSession;
+import org.junit.Assert;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import org.springframework.util.ReflectionUtils;
+
+import com.google.common.collect.ImmutableBiMap;
+
+import lombok.val;
+import lombok.var;
+
+@MetadataInfo(project = "streaming_test")
+class KylinDataFrameManagerTest {
+
+ @Test
+ void testCuboidTableOfFusionModel() {
+ val ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate();
+ val config = KylinConfig.getInstanceFromEnv();
+ val dataflowManager = NDataflowManager.getInstance(config, "streaming_test");
+ var dataflow = dataflowManager.getDataflow("4965c827-fbb4-4ea1-a744-3f341a3b030d");
+ Assert.assertTrue(dataflow.isStreaming() && dataflow.getModel().isFusionModel());
+
+ val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss));
+ kylinDataFrameManager.option("isFastBitmapEnabled", "false");
+ {
+ // condition: id != null && end != Long.MinValue
+ val partitionTblCol = dataflow.getModel().getPartitionDesc().getPartitionDateColumnRef();
+ val layoutEntity = Mockito.spy(new LayoutEntity());
+ ImmutableBiMap.Builder<Integer, TblColRef> dimsBuilder = ImmutableBiMap.builder();
+ ImmutableBiMap<Integer, TblColRef> orderedDimensions = dimsBuilder.put(1, partitionTblCol).build();
+ Mockito.when(layoutEntity.getOrderedDimensions()).thenReturn(orderedDimensions);
+ val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "3e560d22-b749-48c3-9f64-d4230207f120");
+ Assert.assertEquals(1, df.columns().length);
+ }
+ {
+ // condition: id == null
+ val df = kylinDataFrameManager.cuboidTable(dataflow, new LayoutEntity(),
+ "3e560d22-b749-48c3-9f64-d4230207f120");
+ Assert.assertEquals(0, df.columns().length);
+ }
+
+ {
+ // condition: end == Long.MinValue
+ val partitionTblCol = dataflow.getModel().getPartitionDesc().getPartitionDateColumnRef();
+ val layoutEntity = Mockito.spy(new LayoutEntity());
+ ImmutableBiMap.Builder<Integer, TblColRef> dimsBuilder = ImmutableBiMap.builder();
+ ImmutableBiMap<Integer, TblColRef> orderedDimensions = dimsBuilder.put(1, partitionTblCol).build();
+ Mockito.when(layoutEntity.getOrderedDimensions()).thenReturn(orderedDimensions);
+ val fusionModel = FusionModelManager.getInstance(config, dataflow.getProject())
+ .getFusionModel(dataflow.getModel().getFusionId());
+ val batchModelId = fusionModel.getBatchModel().getUuid();
+ val batchDataflow = NDataflowManager.getInstance(config, dataflow.getProject()).getDataflow(batchModelId);
+
+ dataflowManager.updateDataflow(batchDataflow.getId(), updater -> {
+ updater.getSegments().forEach(seg -> {
+ try {
+ val timeRange = seg.getTSRange();
+ val field = TimeRange.class.getDeclaredField("end");
+ field.setAccessible(true);
+ ReflectionUtils.setField(field, timeRange, Long.MIN_VALUE);
+ seg.setTimeRange(timeRange);
+ } catch (Exception e) {
+ Assert.fail(e.getMessage());
+ }
+ });
+ });
+ val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "3e560d22-b749-48c3-9f64-d4230207f120");
+ Assert.assertEquals(1, df.columns().length);
+ }
+ ss.stop();
+ }
+
+ @Test
+ void testCuboidTableOfBatchModel() {
+ val ss = SparkSession.builder().appName("local").master("local[1]").getOrCreate();
+ val config = KylinConfig.getInstanceFromEnv();
+ val dataflowManager = NDataflowManager.getInstance(config, "streaming_test");
+ val dataflow = dataflowManager.getDataflow("cd2b9a23-699c-4699-b0dd-38c9412b3dfd");
+ Assert.assertFalse(dataflow.isStreaming());
+ val kylinDataFrameManager = Mockito.spy(new KylinDataFrameManager(ss));
+ kylinDataFrameManager.option("isFastBitmapEnabled", "false");
+ val layoutEntity = new LayoutEntity();
+ {
+ val df = kylinDataFrameManager.cuboidTable(dataflow, layoutEntity, "86b5daaa-e295-4e8c-b877-f97bda69bee5");
+ Assert.assertEquals(0, df.columns().length);
+ }
+ ss.stop();
+ }
+}