You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/04/28 04:46:44 UTC

[pinot] branch master updated: [pinot-spark-connector] Add option to connect using GRPC (#8481)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 232b946419 [pinot-spark-connector] Add option to connect using GRPC (#8481)
232b946419 is described below

commit 232b946419d05b785610e9b2daf7467f5f8bee82
Author: Caner Balci <ca...@gmail.com>
AuthorDate: Wed Apr 27 21:46:37 2022 -0700

    [pinot-spark-connector] Add option to connect using GRPC (#8481)
---
 pinot-connectors/pinot-spark-connector/pom.xml     | 48 ++++++++++++++
 .../spark/connector/PinotClusterClient.scala       | 27 ++++++++
 .../connector/PinotGrpcServerDataFetcher.scala     | 77 ++++++++++++++++++++++
 .../spark/connector/PinotServerDataFetcher.scala   |  2 +-
 .../connector/spark/connector/PinotSplitter.scala  | 37 +++++------
 .../datasource/PinotDataSourceReadOptions.scala    |  9 ++-
 .../spark/datasource/PinotDataSourceReader.scala   | 11 +++-
 .../datasource/PinotInputPartitionReader.scala     | 16 +++--
 .../spark/ExampleSparkPinotConnectorTest.scala     | 32 ++++++++-
 .../spark/connector/PinotSplitterTest.scala        | 71 ++++++++++++++------
 .../PinotDataSourceReadOptionsTest.scala           |  8 ++-
 .../org/apache/pinot/tools/HybridQuickstart.java   |  9 +++
 12 files changed, 294 insertions(+), 53 deletions(-)

diff --git a/pinot-connectors/pinot-spark-connector/pom.xml b/pinot-connectors/pinot-spark-connector/pom.xml
index bf969a0215..14a4e667e9 100644
--- a/pinot-connectors/pinot-spark-connector/pom.xml
+++ b/pinot-connectors/pinot-spark-connector/pom.xml
@@ -140,6 +140,19 @@
             </exclusion>
           </exclusions>
         </dependency>
+        <dependency>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-netty-shaded</artifactId>
+          <scope>runtime</scope>
+        </dependency>
+        <dependency>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-protobuf</artifactId>
+        </dependency>
+        <dependency>
+          <groupId>io.grpc</groupId>
+          <artifactId>grpc-stub</artifactId>
+        </dependency>
         <dependency>
           <groupId>org.scala-lang</groupId>
           <artifactId>scala-library</artifactId>
@@ -167,6 +180,41 @@
       <build>
         <plugins>
           <!-- scala build -->
+          <plugin>
+            <groupId>org.xolstice.maven.plugins</groupId>
+            <artifactId>protobuf-maven-plugin</artifactId>
+            <version>0.6.1</version>
+            <configuration>
+              <protocArtifact>com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier}</protocArtifact>
+              <pluginId>grpc-java</pluginId>
+              <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.44.1:exe:${os.detected.classifier}</pluginArtifact>
+            </configuration>
+            <executions>
+              <execution>
+                <goals>
+                  <goal>compile</goal>
+                  <goal>compile-custom</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+          <plugin>
+            <artifactId>maven-assembly-plugin</artifactId>
+            <configuration>
+              <descriptorRefs>
+                <descriptorRef>jar-with-dependencies</descriptorRef>
+              </descriptorRefs>
+            </configuration>
+            <executions>
+              <execution>
+                <id>assemble-all</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>single</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
           <plugin>
             <groupId>net.alchim31.maven</groupId>
             <artifactId>scala-maven-plugin</artifactId>
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
index 8ccbd437eb..48abd38c95 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
@@ -40,6 +40,7 @@ private[pinot] object PinotClusterClient extends Logging {
   private val TABLE_BROKER_INSTANCES_TEMPLATE = "http://%s//brokers/tables/%s"
   private val TIME_BOUNDARY_TEMPLATE = "http://%s/debug/timeBoundary/%s"
   private val ROUTING_TABLE_TEMPLATE = "http://%s/debug/routingTable/sql?query=%s"
+  private val INSTANCES_API_TEMPLATE = "http://%s/instances/%s"
 
   def getTableSchema(controllerUrl: String, tableName: String): Schema = {
     val rawTableName = TableNameBuilder.extractRawTableName(tableName)
@@ -176,6 +177,27 @@ private[pinot] object PinotClusterClient extends Logging {
     routingTables
   }
 
+  /**
+   * Get host information for a Pinot instance
+   *
+   * @return InstanceInfo
+   */
+  def getInstanceInfo(controllerUrl: String, instance: String): InstanceInfo = {
+    Try {
+      val uri = new URI(String.format(INSTANCES_API_TEMPLATE, controllerUrl, instance))
+      val response = HttpUtils.sendGetRequest(uri)
+      decodeTo[InstanceInfo](response)
+    } match {
+      case Success(decodedReponse) =>
+        decodedReponse
+      case Failure(exception) =>
+        throw PinotException(
+          s"An error occured while reading instance info for: '$instance'",
+          exception
+        )
+    }
+  }
+
   private def getRoutingTableForQuery(brokerUrl: String, sql: String): Map[String, List[String]] = {
     Try {
       val encodedPqlQueryParam = URLEncoder.encode(sql, "UTF-8")
@@ -201,3 +223,8 @@ private[pinot] case class TimeBoundaryInfo(timeColumn: String, timeValue: String
 
   def getRealtimePredicate: String = s"$timeColumn >= $timeValue"
 }
+
+private[pinot] case class InstanceInfo(instanceName: String,
+                                       hostName: String,
+                                       port: String,
+                                       grpcPort: Int)
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
new file mode 100644
index 0000000000..ff63fa8ab2
--- /dev/null
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import io.grpc.ManagedChannelBuilder
+import org.apache.pinot.common.proto.PinotQueryServerGrpc
+import org.apache.pinot.common.proto.Server.ServerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.common.datatable.DataTableFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * Data fetcher from Pinot Grpc server with specific segments.
+ * Eg: offline-server1: segment1, segment2, segment3
+ */
+private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
+  extends Logging {
+
+  private val channel = ManagedChannelBuilder
+    .forAddress(pinotSplit.serverAndSegments.serverHost, pinotSplit.serverAndSegments.serverGrpcPort)
+    .usePlaintext()
+    .asInstanceOf[ManagedChannelBuilder[_]].build()
+  private val pinotServerBlockingStub = PinotQueryServerGrpc.newBlockingStub(channel)
+
+  def fetchData(): List[DataTable] = {
+    val requestStartTime = System.nanoTime()
+    val request = ServerRequest.newBuilder()
+      .putMetadata("enableStreaming", "true")
+      .addAllSegments(pinotSplit.serverAndSegments.segments.asJava)
+      .setSql(pinotSplit.generatedSQLs.offlineSelectQuery)
+      .build()
+    val serverResponse = pinotServerBlockingStub.submit(request)
+    logInfo(s"Pinot server total response time in millis: ${System.nanoTime() - requestStartTime}")
+
+    try {
+      val dataTables = (for {
+        serverResponse <- serverResponse.asScala.toList
+        if serverResponse.getMetadataMap.get("responseType") == "data"
+      } yield DataTableFactory.getDataTable(serverResponse.getPayload().toByteArray))
+        .filter(_.getNumberOfRows > 0)
+
+      if (dataTables.isEmpty) {
+        throw PinotException(s"Empty response from ${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}")
+      }
+
+      dataTables
+    } finally {
+      channel.shutdown()
+      logInfo("Pinot server connection closed")
+    }
+  }
+}
+
+object PinotGrpcServerDataFetcher {
+  def apply(pinotSplit: PinotSplit): PinotGrpcServerDataFetcher = {
+    new PinotGrpcServerDataFetcher(pinotSplit)
+  }
+}
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
index 900ee35c50..651b87d718 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
@@ -95,7 +95,7 @@ private[pinot] class PinotServerDataFetcher(
     val instanceConfig = new InstanceConfig(nullZkId)
     instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
     instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort)
-    // TODO: support grpc and netty-sec
+    // TODO: support netty-sec
     val serverInstance = new ServerInstance(instanceConfig)
     Map(
       serverInstance -> pinotSplit.serverAndSegments.segments.asJava
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala
index ed1c2cc750..1445b3c767 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala
@@ -18,10 +18,8 @@
  */
 package org.apache.pinot.connector.spark.connector
 
-import java.util.regex.{Matcher, Pattern}
-
 import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
-import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
 import org.apache.pinot.connector.spark.utils.Logging
 import org.apache.pinot.spi.config.table.TableType
 
@@ -47,47 +45,43 @@ import org.apache.pinot.spi.config.table.TableType
  *    - partition6: offlineServer10 -> segment20
  */
 private[pinot] object PinotSplitter extends Logging {
-  private val PINOT_SERVER_PATTERN = Pattern.compile("Server_(.*)_(\\d+)")
 
   def generatePinotSplits(
       generatedSQLs: GeneratedSQLs,
       routingTable: Map[TableType, Map[String, List[String]]],
-      segmentsPerSplit: Int): List[PinotSplit] = {
+      instanceInfoReader: String => InstanceInfo,
+      readParameters: PinotDataSourceReadOptions): List[PinotSplit] = {
     routingTable.flatMap {
       case (tableType, serversToSegments) =>
         serversToSegments
-          .map { case (server, segments) => parseServerInput(server, segments) }
+          .map { case (server, segments) => (instanceInfoReader(server), segments) }
           .flatMap {
-            case (matcher, segments) =>
+            case (instanceInfo, segments) =>
               createPinotSplitsFromSubSplits(
                 tableType,
                 generatedSQLs,
-                matcher,
+                instanceInfo,
                 segments,
-                segmentsPerSplit
-              )
+                readParameters.segmentsPerSplit)
           }
     }.toList
   }
 
-  private def parseServerInput(server: String, segments: List[String]): (Matcher, List[String]) = {
-    val matcher = PINOT_SERVER_PATTERN.matcher(server)
-    if (matcher.matches() && matcher.groupCount() == 2) matcher -> segments
-    else throw PinotException(s"'$server' did not match!?")
-  }
-
   private def createPinotSplitsFromSubSplits(
       tableType: TableType,
       generatedSQLs: GeneratedSQLs,
-      serverMatcher: Matcher,
+      instanceInfo: InstanceInfo,
       segments: List[String],
       segmentsPerSplit: Int): Iterator[PinotSplit] = {
-    val serverHost = serverMatcher.group(1)
-    val serverPort = serverMatcher.group(2)
     val maxSegmentCount = Math.min(segments.size, segmentsPerSplit)
     segments.grouped(maxSegmentCount).map { subSegments =>
-      val serverAndSegments =
-        PinotServerAndSegments(serverHost, serverPort, subSegments, tableType)
+      val serverAndSegments = {
+        PinotServerAndSegments(instanceInfo.hostName,
+          instanceInfo.port,
+          instanceInfo.grpcPort,
+          subSegments,
+          tableType)
+      }
       PinotSplit(generatedSQLs, serverAndSegments)
     }
   }
@@ -100,6 +94,7 @@ private[pinot] case class PinotSplit(
 private[pinot] case class PinotServerAndSegments(
     serverHost: String,
     serverPort: String,
+    serverGrpcPort: Int,
     segments: List[String],
     serverType: TableType) {
   override def toString: String = s"$serverHost:$serverPort($serverType)"
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala
index ad9e8c8025..1aa643f99a 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala
@@ -36,10 +36,12 @@ object PinotDataSourceReadOptions {
   val CONFIG_USE_PUSH_DOWN_FILTERS = "usePushDownFilters"
   val CONFIG_SEGMENTS_PER_SPLIT = "segmentsPerSplit"
   val CONFIG_PINOT_SERVER_TIMEOUT_MS = "pinotServerTimeoutMs"
+  var CONFIG_USE_GRPC_SERVER = "useGrpcServer"
   private[pinot] val DEFAULT_CONTROLLER: String = "localhost:9000"
   private[pinot] val DEFAULT_USE_PUSH_DOWN_FILTERS: Boolean = true
   private[pinot] val DEFAULT_SEGMENTS_PER_SPLIT: Int = 3
   private[pinot] val DEFAULT_PINOT_SERVER_TIMEOUT_MS: Long = 10000
+  private[pinot] val DEFAULT_USE_GRPC_SERVER: Boolean = false
 
   private[pinot] val tableTypes = Seq("OFFLINE", "REALTIME", "HYBRID")
 
@@ -75,6 +77,7 @@ object PinotDataSourceReadOptions {
     val segmentsPerSplit = options.getInt(CONFIG_SEGMENTS_PER_SPLIT, DEFAULT_SEGMENTS_PER_SPLIT)
     val pinotServerTimeoutMs =
       options.getLong(CONFIG_PINOT_SERVER_TIMEOUT_MS, DEFAULT_PINOT_SERVER_TIMEOUT_MS)
+    val useGrpcServer = options.getBoolean(CONFIG_USE_GRPC_SERVER, DEFAULT_USE_GRPC_SERVER)
 
     PinotDataSourceReadOptions(
       tableName,
@@ -83,7 +86,8 @@ object PinotDataSourceReadOptions {
       broker,
       usePushDownFilters,
       segmentsPerSplit,
-      pinotServerTimeoutMs
+      pinotServerTimeoutMs,
+      useGrpcServer
     )
   }
 }
@@ -96,4 +100,5 @@ private[pinot] case class PinotDataSourceReadOptions(
     broker: String,
     usePushDownFilters: Boolean,
     segmentsPerSplit: Int,
-    pinotServerTimeoutMs: Long)
+    pinotServerTimeoutMs: Long,
+    useGrpcServer: Boolean)
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
index 6b9d6b291f..f806a2b268 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.v2.reader.{
 import org.apache.spark.sql.types._
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.Map
 
 /**
  * Spark-Pinot datasource reader to read metadata and create partition splits.
@@ -80,8 +81,16 @@ class PinotDataSourceReader(options: DataSourceOptions, userSchema: Option[Struc
 
     val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, generatedSQLs)
 
+    val instanceInfo : Map[String, InstanceInfo] = Map()
+    val instanceInfoReader = (instance:String) => { // cached reader to reduce network round trips
+      instanceInfo.getOrElseUpdate(
+        instance,
+        PinotClusterClient.getInstanceInfo(readParameters.controller, instance)
+      )
+    }
+
     PinotSplitter
-      .generatePinotSplits(generatedSQLs, routingTable, readParameters.segmentsPerSplit)
+      .generatePinotSplits(generatedSQLs, routingTable, instanceInfoReader, readParameters)
       .zipWithIndex
       .map {
         case (pinotSplit, partitionId) =>
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
index 9a659b3ba1..de42b7a6cf 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.connector.spark.datasource
 
-import org.apache.pinot.connector.spark.connector.{PinotServerDataFetcher, PinotSplit, PinotUtils}
+import org.apache.pinot.connector.spark.connector.{PinotGrpcServerDataFetcher, PinotServerDataFetcher, PinotSplit, PinotUtils}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
 import org.apache.spark.sql.types.StructType
@@ -51,9 +51,15 @@ class PinotInputPartitionReader(
   override def close(): Unit = {}
 
   private def fetchDataAndConvertToInternalRows(): Iterator[InternalRow] = {
-    PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions)
-      .fetchData()
-      .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
-      .toIterator
+    if (dataSourceOptions.useGrpcServer)
+      PinotGrpcServerDataFetcher(pinotSplit)
+        .fetchData()
+        .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
+        .toIterator
+    else
+      PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions)
+        .fetchData()
+        .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
+        .toIterator
   }
 }
diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
index b49e4fcfa1..0909f920a8 100644
--- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
+++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
@@ -39,8 +39,9 @@ object ExampleSparkPinotConnectorTest extends Logging {
     readOffline()
     readHybrid()
     readHybridWithSpecificSchema()
-    readOfflineWithFilters()
     readHybridWithFilters()
+    readHybridViaGrpc()
+    readHybridWithFiltersViaGrpc()
     readRealtimeWithSelectionColumns()
     applyJustSomeFilters()
   }
@@ -137,6 +138,35 @@ object ExampleSparkPinotConnectorTest extends Logging {
     data.show()
   }
 
+  def readHybridViaGrpc()(implicit spark: SparkSession): Unit = {
+    log.info("## Reading `airlineStats_OFFLINE` table... ##")
+    val data = spark.read
+      .format("pinot")
+      .option("table", "airlineStats")
+      .option("tableType", "offline")
+      .option("useGrpcServer", "true")
+      .load()
+
+    data.show()
+    print(data.count())
+  }
+
+  def readHybridWithFiltersViaGrpc()(implicit spark: SparkSession): Unit = {
+    import spark.implicits._
+    log.info("## Reading `airlineStats_OFFLINE` table with filter push down... ##")
+    val data = spark.read
+      .format("pinot")
+      .option("table", "airlineStats")
+      .option("tableType", "OFFLINE")
+      .option("useGrpcServer", "true")
+      .load()
+      .filter($"DestStateName" === "Florida")
+
+    data.show()
+    print(data.count())
+  }
+
+
   def applyJustSomeFilters()(implicit spark: SparkSession): Unit = {
     import spark.implicits._
     log.info("## Reading `airlineStats_OFFLINE and airlineStats_REALTIME` tables with filter push down... ##")
diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala
index 02061218b4..161a17fea3 100644
--- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala
+++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala
@@ -20,14 +20,20 @@ package org.apache.pinot.connector.spark.connector
 
 import org.apache.pinot.connector.spark.BaseTest
 import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
-import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
 import org.apache.pinot.spi.config.table.TableType
+import java.util.regex.Pattern
 
 /**
  * Test num of Spark partitions by routing table and input configs.
  */
 class PinotSplitterTest extends BaseTest {
   private val generatedPql = GeneratedSQLs("tbl", None, "", "")
+  private val mockInstanceInfoReader = (server: String) => {
+    val matcher = Pattern.compile("Server_(.*)_(\\d+)").matcher(server)
+    matcher.matches()
+    InstanceInfo(server, matcher.group(1), matcher.group(2), -1)
+  }
 
   private val routingTable = Map(
     TableType.OFFLINE -> Map(
@@ -41,26 +47,38 @@ class PinotSplitterTest extends BaseTest {
     )
   )
 
+  private val getReadOptionsWithSegmentsPerSplit = (segmentsPerSplit: Int) => {
+    new PinotDataSourceReadOptions(
+      "tableName",
+      Option(TableType.OFFLINE),
+      "controller",
+      "broker",
+      false,
+      segmentsPerSplit,
+      1000,
+      false)
+  }
+
   test("Total 5 partition splits should be created for maxNumSegmentPerServerRequest = 3") {
-    val maxNumSegmentPerServerRequest = 3
+    val readOptions = getReadOptionsWithSegmentsPerSplit(3)
     val splitResults =
-      PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest)
+      PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions)
 
     splitResults.size shouldEqual 5
   }
 
   test("Total 5 partition splits should be created for maxNumSegmentPerServerRequest = 90") {
-    val maxNumSegmentPerServerRequest = 90
+    val readOptions = getReadOptionsWithSegmentsPerSplit(90)
     val splitResults =
-      PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest)
+      PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions)
 
     splitResults.size shouldEqual 5
   }
 
   test("Total 10 partition splits should be created for maxNumSegmentPerServerRequest = 1") {
-    val maxNumSegmentPerServerRequest = 1
+    val readOptions = getReadOptionsWithSegmentsPerSplit(1)
     val splitResults =
-      PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest)
+      PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions)
 
     splitResults.size shouldEqual 10
   }
@@ -69,31 +87,46 @@ class PinotSplitterTest extends BaseTest {
     val inputRoutingTable = Map(
       TableType.REALTIME -> Map("Server_192.168.1.100_9000" -> List("segment1"))
     )
+    val readOptions = getReadOptionsWithSegmentsPerSplit(5)
 
-    val splitResults = PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, 5)
+    val splitResults = PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, mockInstanceInfoReader, readOptions)
     val expectedOutput = List(
       PinotSplit(
         generatedPql,
-        PinotServerAndSegments("192.168.1.100", "9000", List("segment1"), TableType.REALTIME)
+        PinotServerAndSegments("192.168.1.100", "9000", -1, List("segment1"), TableType.REALTIME)
       )
     )
 
     expectedOutput should contain theSameElementsAs splitResults
   }
 
-  test("GeneratePinotSplits method should throw exception due to wrong input Server_HOST_PORT") {
+  test("GeneratePinotSplits with Grpc port reading enabled") {
     val inputRoutingTable = Map(
-      TableType.REALTIME -> Map(
-        "Server_192.168.1.100_9000" -> List("segment1"),
-        "Server_192.168.2.100" -> List("segment5")
-      )
+      TableType.REALTIME -> Map("Server_192.168.1.100_9000" -> List("segment1"))
     )
-
-    val exception = intercept[PinotException] {
-      PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, 5)
+    val inputReadOptions = new PinotDataSourceReadOptions(
+      "tableName",
+      Option(TableType.REALTIME),
+      "controller",
+      "broker",
+      false,
+      1,
+      1000,
+      true)
+
+    val inputGrpcPortReader = (server: String) => {
+      InstanceInfo(server, "192.168.1.100", "9000", 8090)
     }
 
-    exception.getMessage shouldEqual "'Server_192.168.2.100' did not match!?"
-  }
+    val splitResults =
+      PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, inputGrpcPortReader, inputReadOptions)
+    val expectedOutput = List(
+      PinotSplit(
+        generatedPql,
+        PinotServerAndSegments("192.168.1.100", "9000", 8090, List("segment1"), TableType.REALTIME)
+      )
+    )
 
+    expectedOutput should contain theSameElementsAs splitResults
+  }
 }
diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala
index 83baf3fe3b..f395e0c543 100644
--- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala
+++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala
@@ -18,7 +18,7 @@
  */
 package org.apache.pinot.connector.spark.datasource
 
-import org.apache.pinot.connector.spark.BaseTest
+import org.apache.pinot.connector.spark.{BaseTest, datasource}
 import org.apache.pinot.connector.spark.exceptions.PinotException
 import org.apache.spark.sql.sources.v2.DataSourceOptions
 
@@ -36,7 +36,8 @@ class PinotDataSourceReadOptionsTest extends BaseTest {
       PinotDataSourceReadOptions.CONFIG_CONTROLLER -> "localhost:9000",
       PinotDataSourceReadOptions.CONFIG_BROKER -> "localhost:8000",
       PinotDataSourceReadOptions.CONFIG_SEGMENTS_PER_SPLIT -> "1",
-      PinotDataSourceReadOptions.CONFIG_USE_PUSH_DOWN_FILTERS -> "false"
+      PinotDataSourceReadOptions.CONFIG_USE_PUSH_DOWN_FILTERS -> "false",
+      PinotDataSourceReadOptions.CONFIG_USE_GRPC_SERVER -> "false",
     )
 
     val datasourceOptions = new DataSourceOptions(options.asJava)
@@ -50,7 +51,8 @@ class PinotDataSourceReadOptionsTest extends BaseTest {
         "localhost:8000",
         false,
         1,
-        10000
+        10000,
+        false
       )
 
     pinotDataSourceReadOptions shouldEqual expected
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index 7c738a40c5..61ec1b3a9a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -26,7 +26,9 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.utils.ZkStarter;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -64,6 +66,13 @@ public class HybridQuickstart extends QuickStartBase {
     PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
   }
 
+  public Map<String, Object> getConfigOverrides() {
+    Map<String, Object> overrides = new HashMap<>();
+    overrides.put("pinot.server.grpc.enable", "true");
+    overrides.put("pinot.server.grpc.port", "8090");
+    return overrides;
+  }
+
   private QuickstartTableRequest prepareTableRequest(File baseDir)
       throws IOException {
     _schemaFile = new File(baseDir, "airlineStats_schema.json");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org