You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/03/20 06:20:36 UTC

[pinot] branch master updated: Add Spark connector option for passing Pinot query options (#10443)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new cb1069f11d Add Spark connector option for passing Pinot query options (#10443)
cb1069f11d is described below

commit cb1069f11d8b89291e777c6e2c7416753bbfff29
Author: Caner Balci <ca...@gmail.com>
AuthorDate: Sun Mar 19 23:20:28 2023 -0700

    Add Spark connector option for passing Pinot query options (#10443)
---
 pinot-connectors/pinot-spark-2-connector/README.md |  2 ++
 .../documentation/read_model.md                    |  1 +
 .../spark/datasource/PinotDataSourceReader.scala   |  9 +++---
 .../connector/spark/datasource/TypeConverter.scala | 14 ++++++---
 .../spark/datasource/TypeConverterTest.scala       | 31 ++++++++++++++++++++
 pinot-connectors/pinot-spark-3-connector/README.md |  1 +
 .../documentation/read_model.md                    |  1 +
 .../spark/v3/datasource/PinotScanBuilder.scala     |  3 +-
 .../spark/v3/datasource/TypeConverter.scala        | 14 ++++++---
 .../spark/v3/datasource/TypeConverterTest.scala    | 33 ++++++++++++++++++++++
 .../spark/common/PinotDataSourceReadOptions.scala  | 11 ++++++--
 .../spark/common/query/ScanQueryGenerator.scala    | 14 ++++++---
 .../common/PinotDataSourceReadOptionsTest.scala    |  4 ++-
 .../connector/spark/common/PinotSplitterTest.scala |  6 ++--
 .../common/query/ScanQueryGeneratorTest.scala      | 22 ++++++++++++---
 15 files changed, 140 insertions(+), 26 deletions(-)

diff --git a/pinot-connectors/pinot-spark-2-connector/README.md b/pinot-connectors/pinot-spark-2-connector/README.md
index 48fcd587a7..bbe02ab22a 100644
--- a/pinot-connectors/pinot-spark-2-connector/README.md
+++ b/pinot-connectors/pinot-spark-2-connector/README.md
@@ -28,12 +28,14 @@ Detailed read model documentation is here; [Spark-Pinot Connector Read Model](do
 ## Features
 - Query realtime, offline or hybrid tables
 - Distributed, parallel scan
+- Streaming reads using gRPC (optional)
 - SQL support instead of PQL
 - Column and filter push down to optimize performance
 - Overlap between realtime and offline segments is queried exactly once for hybrid tables
 - Schema discovery 
   - Dynamic inference
   - Static analysis of case class
+- Supports query options
 
 ## Quick Start
 ```scala
diff --git a/pinot-connectors/pinot-spark-2-connector/documentation/read_model.md b/pinot-connectors/pinot-spark-2-connector/documentation/read_model.md
index aa7a2f0e45..675091871d 100644
--- a/pinot-connectors/pinot-spark-2-connector/documentation/read_model.md
+++ b/pinot-connectors/pinot-spark-2-connector/documentation/read_model.md
@@ -138,3 +138,4 @@ val df = spark.read
 | segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 | 
 | pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins |
 | useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false |
+| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |
\ No newline at end of file
diff --git a/pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala b/pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
index f6112b6a79..d600764e7f 100644
--- a/pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
+++ b/pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
@@ -73,15 +73,16 @@ class PinotDataSourceReader(options: DataSourceOptions, userSchema: Option[Struc
       }
 
     val whereCondition = FilterPushDown.compileFiltersToSqlWhereClause(this.acceptedFilters)
-    val generatedSQLs = ScanQueryGenerator.generate(
+    val scanQuery = ScanQueryGenerator.generate(
       readParameters.tableName,
       readParameters.tableType,
       timeBoundaryInfo,
       schema.fieldNames,
-      whereCondition
+      whereCondition,
+      readParameters.queryOptions
     )
 
-    val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, generatedSQLs)
+    val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, scanQuery)
 
     val instanceInfo : Map[String, InstanceInfo] = Map()
     val instanceInfoReader = (instance:String) => { // cached reader to reduce network round trips
@@ -92,7 +93,7 @@ class PinotDataSourceReader(options: DataSourceOptions, userSchema: Option[Struc
     }
 
     PinotSplitter
-      .generatePinotSplits(generatedSQLs, routingTable, instanceInfoReader, readParameters)
+      .generatePinotSplits(scanQuery, routingTable, instanceInfoReader, readParameters)
       .zipWithIndex
       .map {
         case (pinotSplit, partitionId) =>
diff --git a/pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/TypeConverter.scala b/pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/TypeConverter.scala
index da85bc823e..eef46d255d 100644
--- a/pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/TypeConverter.scala
+++ b/pinot-connectors/pinot-spark-2-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/TypeConverter.scala
@@ -66,6 +66,9 @@ private[datasource] object TypeConverter {
       dataTable: DataTable,
       sparkSchema: StructType): Seq[InternalRow] = {
     val dataTableColumnNames = dataTable.getDataSchema.getColumnNames
+    val nullRowIdsByColumn = (0 until dataTable.getDataSchema.size()).map{ col =>
+      dataTable.getNullRowIds(col)
+    }
     (0 until dataTable.getNumberOfRows).map { rowIndex =>
       // spark schema is used to ensure columns order
       val columns = sparkSchema.fields.map { field =>
@@ -73,10 +76,13 @@ private[datasource] object TypeConverter {
         if (colIndex < 0) {
           throw PinotException(s"'${field.name}' not found in Pinot server response")
         } else {
-          // pinot column data type can be used directly,
-          // because all of them is supported in spark schema
-          val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex)
-          readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
+          if (nullRowIdsByColumn(colIndex) != null
+              && nullRowIdsByColumn(colIndex).contains(rowIndex)) {
+            null
+          } else {
+            val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex)
+            readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
+          }
         }
       }
       InternalRow.fromSeq(columns)
diff --git a/pinot-connectors/pinot-spark-2-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/TypeConverterTest.scala b/pinot-connectors/pinot-spark-2-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/TypeConverterTest.scala
index 80b061188b..1b04b322d5 100644
--- a/pinot-connectors/pinot-spark-2-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/TypeConverterTest.scala
+++ b/pinot-connectors/pinot-spark-2-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/TypeConverterTest.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.connector.spark.datasource
 
+import org.apache.pinot.common.datatable.DataTableFactory
 import org.apache.pinot.common.utils.DataSchema
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType
 import org.apache.pinot.connector.spark.common.PinotException
@@ -27,6 +28,7 @@ import org.apache.pinot.spi.utils.ByteArray
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+import org.roaringbitmap.RoaringBitmap
 
 import scala.io.Source
 
@@ -160,6 +162,35 @@ class TypeConverterTest extends BaseTest {
     exception.getMessage shouldEqual s"'longCol' not found in Pinot server response"
   }
 
+  test("Converter should identify and correctly return null columns") {
+    val columnNames = Array("strCol", "intCol")
+    val columnTypes = Array(ColumnDataType.STRING, ColumnDataType.INT)
+    val dataSchema = new DataSchema(columnNames, columnTypes)
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4)
+
+    val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
+    dataTableBuilder.startRow()
+    dataTableBuilder.setColumn(0, "null")
+    dataTableBuilder.setColumn(1, 5)
+    dataTableBuilder.finishRow()
+
+    val nullRowIds = new RoaringBitmap()
+    nullRowIds.add(0)
+    dataTableBuilder.setNullRowIds(nullRowIds)
+    dataTableBuilder.setNullRowIds(null)
+    val dataTable = dataTableBuilder.build()
+
+    val schema = StructType(
+      Seq(
+        StructField("strCol", StringType, true),
+        StructField("intCol", IntegerType, true)
+      )
+    )
+
+    val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head
+    result.get(0, StringType) shouldEqual null
+  }
+
   test("Pinot schema should be converted to spark schema") {
     val pinotSchemaAsString = Source.fromResource("schema/pinot-schema.json").mkString
     val resultSchema = TypeConverter.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString))
diff --git a/pinot-connectors/pinot-spark-3-connector/README.md b/pinot-connectors/pinot-spark-3-connector/README.md
index afebc5dfde..9883c90259 100644
--- a/pinot-connectors/pinot-spark-3-connector/README.md
+++ b/pinot-connectors/pinot-spark-3-connector/README.md
@@ -35,6 +35,7 @@ Detailed read model documentation is here; [Spark-Pinot Connector Read Model](do
 - Schema discovery 
   - Dynamic inference
   - Static analysis of case class
+- Supports query options
 
 ## Quick Start
 ```scala
diff --git a/pinot-connectors/pinot-spark-3-connector/documentation/read_model.md b/pinot-connectors/pinot-spark-3-connector/documentation/read_model.md
index aa7a2f0e45..c6fc1b085f 100644
--- a/pinot-connectors/pinot-spark-3-connector/documentation/read_model.md
+++ b/pinot-connectors/pinot-spark-3-connector/documentation/read_model.md
@@ -138,3 +138,4 @@ val df = spark.read
 | segmentsPerSplit | Represents the maximum segment count that will be scanned by pinot server in one connection | No | 3 | 
 | pinotServerTimeoutMs | The maximum timeout(ms) to get data from pinot server | No | 10 mins |
 | useGrpcServer | Boolean value to enable reads via gRPC. This option is more memory efficient both on Pinot server and Spark executor side because it utilizes streaming. Requires gRPC to be enabled on Pinot server. | No | false |
+| queryOptions | Comma separated list of Pinot query options (e.g. "enableNullHandling=true,skipUpsert=true") | No | "" |
diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala
index a3e1bf1b02..ff2c0b6898 100644
--- a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala
+++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/PinotScanBuilder.scala
@@ -54,7 +54,8 @@ class PinotScanBuilder(readParameters: PinotDataSourceReadOptions)
       readParameters.tableType,
       timeBoundaryInfo,
       currentSchema.fieldNames,
-      whereCondition
+      whereCondition,
+      readParameters.queryOptions
     )
 
     new PinotScan(scanQuery, currentSchema, readParameters)
diff --git a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverter.scala b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverter.scala
index 451bc5af72..a36fe6adc9 100644
--- a/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverter.scala
+++ b/pinot-connectors/pinot-spark-3-connector/src/main/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverter.scala
@@ -66,6 +66,9 @@ private[pinot] object TypeConverter {
       dataTable: DataTable,
       sparkSchema: StructType): Seq[InternalRow] = {
     val dataTableColumnNames = dataTable.getDataSchema.getColumnNames
+    val nullRowIdsByColumn = (0 until dataTable.getDataSchema.size()).map{ col =>
+      dataTable.getNullRowIds(col)
+    }
     (0 until dataTable.getNumberOfRows).map { rowIndex =>
       // spark schema is used to ensure columns order
       val columns = sparkSchema.fields.map { field =>
@@ -73,10 +76,13 @@ private[pinot] object TypeConverter {
         if (colIndex < 0) {
           throw PinotException(s"'${field.name}' not found in Pinot server response")
         } else {
-          // pinot column data type can be used directly,
-          // because all of them is supported in spark schema
-          val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex)
-          readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
+          if (nullRowIdsByColumn(colIndex) != null
+              && nullRowIdsByColumn(colIndex).contains(rowIndex)) {
+            null
+          } else {
+            val columnDataType = dataTable.getDataSchema.getColumnDataType(colIndex)
+            readPinotColumnData(dataTable, columnDataType, rowIndex, colIndex)
+          }
         }
       }
       InternalRow.fromSeq(columns)
diff --git a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverterTest.scala b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverterTest.scala
index 1b4ade0af1..caee51d0a0 100644
--- a/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverterTest.scala
+++ b/pinot-connectors/pinot-spark-3-connector/src/test/scala/org/apache/pinot/connector/spark/v3/datasource/TypeConverterTest.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.connector.spark.v3.datasource
 
+import org.apache.pinot.common.datatable.DataTableFactory
 import org.apache.pinot.common.utils.DataSchema
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType
 import org.apache.pinot.connector.spark.common.PinotException
@@ -27,6 +28,7 @@ import org.apache.pinot.spi.utils.ByteArray
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+import org.roaringbitmap.RoaringBitmap
 
 import scala.io.Source
 
@@ -160,6 +162,37 @@ class TypeConverterTest extends BaseTest {
     exception.getMessage shouldEqual s"'longCol' not found in Pinot server response"
   }
 
+  test("Converter should identify and correctly return null rows") {
+    val columnNames = Array("strCol", "intCol")
+    val columnTypes = Array(ColumnDataType.STRING, ColumnDataType.INT)
+    val dataSchema = new DataSchema(columnNames, columnTypes)
+    DataTableBuilderFactory.setDataTableVersion(DataTableFactory.VERSION_4)
+
+    val dataTableBuilder = DataTableBuilderFactory.getDataTableBuilder(dataSchema)
+    dataTableBuilder.startRow()
+    dataTableBuilder.setColumn(0, "null")
+    dataTableBuilder.setColumn(1, 5)
+    dataTableBuilder.finishRow()
+
+    val nullRowIds = new RoaringBitmap()
+    nullRowIds.add(0)
+    dataTableBuilder.setNullRowIds(nullRowIds)
+    dataTableBuilder.setNullRowIds(null)
+
+
+    val dataTable = dataTableBuilder.build()
+
+    val schema = StructType(
+      Seq(
+        StructField("strCol", StringType, true),
+        StructField("intCol", IntegerType, true)
+      )
+    )
+
+    val result = TypeConverter.pinotDataTableToInternalRows(dataTable, schema).head
+    result.get(0, StringType) shouldEqual null
+  }
+
   test("Pinot schema should be converted to spark schema") {
     val pinotSchemaAsString = Source.fromResource("schema/pinot-schema.json").mkString
     val resultSchema = TypeConverter.pinotSchemaToSparkSchema(Schema.fromString(pinotSchemaAsString))
diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptions.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptions.scala
index 03c4e56308..271cb5d7cf 100644
--- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptions.scala
+++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptions.scala
@@ -36,6 +36,8 @@ object PinotDataSourceReadOptions {
   val CONFIG_SEGMENTS_PER_SPLIT = "segmentsPerSplit"
   val CONFIG_PINOT_SERVER_TIMEOUT_MS = "pinotServerTimeoutMs"
   var CONFIG_USE_GRPC_SERVER = "useGrpcServer"
+  val CONFIG_QUERY_OPTIONS = "queryOptions"
+  val QUERY_OPTIONS_DELIMITER = ","
   private[pinot] val DEFAULT_CONTROLLER: String = "localhost:9000"
   private[pinot] val DEFAULT_USE_PUSH_DOWN_FILTERS: Boolean = true
   private[pinot] val DEFAULT_SEGMENTS_PER_SPLIT: Int = 3
@@ -83,6 +85,8 @@ object PinotDataSourceReadOptions {
     val pinotServerTimeoutMs =
       options.getLong(CONFIG_PINOT_SERVER_TIMEOUT_MS, DEFAULT_PINOT_SERVER_TIMEOUT_MS)
     val useGrpcServer = options.getBoolean(CONFIG_USE_GRPC_SERVER, DEFAULT_USE_GRPC_SERVER)
+    val queryOptions = options.getOrDefault(CONFIG_QUERY_OPTIONS, "")
+      .split(QUERY_OPTIONS_DELIMITER).filter(_.nonEmpty).toSet
 
     PinotDataSourceReadOptions(
       tableName,
@@ -92,7 +96,8 @@ object PinotDataSourceReadOptions {
       usePushDownFilters,
       segmentsPerSplit,
       pinotServerTimeoutMs,
-      useGrpcServer
+      useGrpcServer,
+      queryOptions
     )
   }
 }
@@ -106,4 +111,6 @@ private[pinot] case class PinotDataSourceReadOptions(
     usePushDownFilters: Boolean,
     segmentsPerSplit: Int,
     pinotServerTimeoutMs: Long,
-    useGrpcServer: Boolean)
+    useGrpcServer: Boolean,
+    queryOptions: Set[String])
+
diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/query/ScanQueryGenerator.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/query/ScanQueryGenerator.scala
index d1ac63fd68..4616bbd7c7 100644
--- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/query/ScanQueryGenerator.scala
+++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/query/ScanQueryGenerator.scala
@@ -29,7 +29,8 @@ private[pinot] class ScanQueryGenerator(
     tableType: Option[TableType],
     timeBoundaryInfo: Option[TimeBoundaryInfo],
     columns: Array[String],
-    whereClause: Option[String]) {
+    whereClause: Option[String],
+    queryOptions: Set[String]) {
   private val columnsExpression = columnsAsExpression()
 
   def generateSQLs(): ScanQuery = {
@@ -56,7 +57,11 @@ private[pinot] class ScanQueryGenerator(
     }
 
     val tableNameWithType = s"${tableName}_${tableType.toString}"
-    val queryBuilder = new StringBuilder(s"SELECT $columnsExpression FROM $tableNameWithType")
+    val queryBuilder = new StringBuilder()
+
+    // add Query Options and SELECT clause
+    queryOptions.foreach(opt => queryBuilder.append(s"SET $opt;"))
+    queryBuilder.append(s"SELECT $columnsExpression FROM $tableNameWithType")
 
     // add where clause if exists
     whereClause.foreach(c => queryBuilder.append(s" WHERE $c"))
@@ -84,8 +89,9 @@ private[pinot] object ScanQueryGenerator {
       tableType: Option[TableType],
       timeBoundaryInfo: Option[TimeBoundaryInfo],
       columns: Array[String],
-      whereClause: Option[String]): ScanQuery = {
-    new ScanQueryGenerator(tableName, tableType, timeBoundaryInfo, columns, whereClause)
+      whereClause: Option[String],
+      queryOptions: Set[String]): ScanQuery = {
+    new ScanQueryGenerator(tableName, tableType, timeBoundaryInfo, columns, whereClause, queryOptions)
       .generateSQLs()
   }
 }
diff --git a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptionsTest.scala b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptionsTest.scala
index 0db0a22ba7..7c08351557 100644
--- a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptionsTest.scala
+++ b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotDataSourceReadOptionsTest.scala
@@ -34,6 +34,7 @@ class PinotDataSourceReadOptionsTest extends BaseTest {
       PinotDataSourceReadOptions.CONFIG_SEGMENTS_PER_SPLIT -> "1",
       PinotDataSourceReadOptions.CONFIG_USE_PUSH_DOWN_FILTERS -> "false",
       PinotDataSourceReadOptions.CONFIG_USE_GRPC_SERVER -> "false",
+      PinotDataSourceReadOptions.CONFIG_QUERY_OPTIONS -> "a=1,b=2"
     )
 
     val pinotDataSourceReadOptions = PinotDataSourceReadOptions.from(options.asJava)
@@ -47,7 +48,8 @@ class PinotDataSourceReadOptionsTest extends BaseTest {
         false,
         1,
         10000,
-        false
+        false,
+        Set("a=1", "b=2")
       )
 
     pinotDataSourceReadOptions shouldEqual expected
diff --git a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotSplitterTest.scala b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotSplitterTest.scala
index bbab63c797..9b5a057c7a 100644
--- a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotSplitterTest.scala
+++ b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/PinotSplitterTest.scala
@@ -56,7 +56,8 @@ class PinotSplitterTest extends BaseTest {
       false,
       segmentsPerSplit,
       1000,
-      false)
+      false,
+      Set())
   }
 
   test("Total 5 partition splits should be created for maxNumSegmentPerServerRequest = 3") {
@@ -112,7 +113,8 @@ class PinotSplitterTest extends BaseTest {
       false,
       1,
       1000,
-      true)
+      true,
+      Set())
 
     val inputGrpcPortReader = (server: String) => {
       InstanceInfo(server, "192.168.1.100", "9000", 8090)
diff --git a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/query/ScanQueryGeneratorTest.scala b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/query/ScanQueryGeneratorTest.scala
index 351b02408d..9be29e3c44 100644
--- a/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/query/ScanQueryGeneratorTest.scala
+++ b/pinot-connectors/pinot-spark-common/src/test/scala/org/apache/pinot/connector/spark/common/query/ScanQueryGeneratorTest.scala
@@ -33,7 +33,7 @@ class ScanQueryGeneratorTest extends BaseTest {
 
   test("Queries should be created with given filters") {
     val pinotQueries =
-      ScanQueryGenerator.generate(tableName, tableType, None, columns, whereClause)
+      ScanQueryGenerator.generate(tableName, tableType, None, columns, whereClause, Set())
     val expectedRealtimeQuery =
       s"SELECT c1, c2 FROM ${tableName}_REALTIME WHERE ${whereClause.get} $limit"
     val expectedOfflineQuery =
@@ -46,7 +46,7 @@ class ScanQueryGeneratorTest extends BaseTest {
   test("Time boundary info should be added to existing where clause") {
     val timeBoundaryInfo = TimeBoundaryInfo("timeCol", "12345")
     val pinotQueries = ScanQueryGenerator
-      .generate(tableName, tableType, Some(timeBoundaryInfo), columns, whereClause)
+      .generate(tableName, tableType, Some(timeBoundaryInfo), columns, whereClause, Set())
 
     val realtimeWhereClause = s"${whereClause.get} AND timeCol >= 12345"
     val offlineWhereClause = s"${whereClause.get} AND timeCol < 12345"
@@ -62,7 +62,7 @@ class ScanQueryGeneratorTest extends BaseTest {
   test("Time boundary info should be added to where clause") {
     val timeBoundaryInfo = TimeBoundaryInfo("timeCol", "12345")
     val pinotQueries = ScanQueryGenerator
-      .generate(tableName, tableType, Some(timeBoundaryInfo), columns, None)
+      .generate(tableName, tableType, Some(timeBoundaryInfo), columns, None, Set())
 
     val realtimeWhereClause = s"timeCol >= 12345"
     val offlineWhereClause = s"timeCol < 12345"
@@ -77,7 +77,7 @@ class ScanQueryGeneratorTest extends BaseTest {
 
   test("Selection query should be created with '*' column expressions without filters") {
     val pinotQueries = ScanQueryGenerator
-      .generate(tableName, tableType, None, Array.empty, None)
+      .generate(tableName, tableType, None, Array.empty, None, Set())
 
     val expectedRealtimeQuery =
       s"SELECT * FROM ${tableName}_REALTIME $limit"
@@ -88,4 +88,18 @@ class ScanQueryGeneratorTest extends BaseTest {
     pinotQueries.offlineSelectQuery shouldEqual expectedOfflineQuery
   }
 
+  test("Query options should be added to the beginning of the query") {
+    val queryOptions = Set("enableNullHandling=true","skipUpsert=true")
+    val pinotQueries = ScanQueryGenerator
+      .generate(tableName, tableType, None, Array.empty, None, queryOptions)
+
+    val expectedRealtimeQuery =
+      s"SET enableNullHandling=true;SET skipUpsert=true;SELECT * FROM ${tableName}_REALTIME $limit"
+    val expectedOfflineQuery =
+      s"SET enableNullHandling=true;SET skipUpsert=true;SELECT * FROM ${tableName}_OFFLINE $limit"
+
+    pinotQueries.realtimeSelectQuery shouldEqual expectedRealtimeQuery
+    pinotQueries.offlineSelectQuery shouldEqual expectedOfflineQuery
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org