You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2022/04/28 04:46:44 UTC
[pinot] branch master updated: [pinot-spark-connector] Add option to connect using GRPC (#8481)
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 232b946419 [pinot-spark-connector] Add option to connect using GRPC (#8481)
232b946419 is described below
commit 232b946419d05b785610e9b2daf7467f5f8bee82
Author: Caner Balci <ca...@gmail.com>
AuthorDate: Wed Apr 27 21:46:37 2022 -0700
[pinot-spark-connector] Add option to connect using GRPC (#8481)
---
pinot-connectors/pinot-spark-connector/pom.xml | 48 ++++++++++++++
.../spark/connector/PinotClusterClient.scala | 27 ++++++++
.../connector/PinotGrpcServerDataFetcher.scala | 77 ++++++++++++++++++++++
.../spark/connector/PinotServerDataFetcher.scala | 2 +-
.../connector/spark/connector/PinotSplitter.scala | 37 +++++------
.../datasource/PinotDataSourceReadOptions.scala | 9 ++-
.../spark/datasource/PinotDataSourceReader.scala | 11 +++-
.../datasource/PinotInputPartitionReader.scala | 16 +++--
.../spark/ExampleSparkPinotConnectorTest.scala | 32 ++++++++-
.../spark/connector/PinotSplitterTest.scala | 71 ++++++++++++++------
.../PinotDataSourceReadOptionsTest.scala | 8 ++-
.../org/apache/pinot/tools/HybridQuickstart.java | 9 +++
12 files changed, 294 insertions(+), 53 deletions(-)
diff --git a/pinot-connectors/pinot-spark-connector/pom.xml b/pinot-connectors/pinot-spark-connector/pom.xml
index bf969a0215..14a4e667e9 100644
--- a/pinot-connectors/pinot-spark-connector/pom.xml
+++ b/pinot-connectors/pinot-spark-connector/pom.xml
@@ -140,6 +140,19 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-netty-shaded</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-protobuf</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-stub</artifactId>
+ </dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
@@ -167,6 +180,41 @@
<build>
<plugins>
<!-- scala build -->
+ <plugin>
+ <groupId>org.xolstice.maven.plugins</groupId>
+ <artifactId>protobuf-maven-plugin</artifactId>
+ <version>0.6.1</version>
+ <configuration>
+ <protocArtifact>com.google.protobuf:protoc:3.19.2:exe:${os.detected.classifier}</protocArtifact>
+ <pluginId>grpc-java</pluginId>
+ <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.44.1:exe:${os.detected.classifier}</pluginArtifact>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>compile-custom</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>assemble-all</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
index 8ccbd437eb..48abd38c95 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotClusterClient.scala
@@ -40,6 +40,7 @@ private[pinot] object PinotClusterClient extends Logging {
private val TABLE_BROKER_INSTANCES_TEMPLATE = "http://%s//brokers/tables/%s"
private val TIME_BOUNDARY_TEMPLATE = "http://%s/debug/timeBoundary/%s"
private val ROUTING_TABLE_TEMPLATE = "http://%s/debug/routingTable/sql?query=%s"
+ private val INSTANCES_API_TEMPLATE = "http://%s/instances/%s"
def getTableSchema(controllerUrl: String, tableName: String): Schema = {
val rawTableName = TableNameBuilder.extractRawTableName(tableName)
@@ -176,6 +177,27 @@ private[pinot] object PinotClusterClient extends Logging {
routingTables
}
+ /**
+ * Get host information for a Pinot instance
+ *
+ * @return InstanceInfo
+ */
+ def getInstanceInfo(controllerUrl: String, instance: String): InstanceInfo = {
+ Try {
+ val uri = new URI(String.format(INSTANCES_API_TEMPLATE, controllerUrl, instance))
+ val response = HttpUtils.sendGetRequest(uri)
+ decodeTo[InstanceInfo](response)
+ } match {
+ case Success(decodedReponse) =>
+ decodedReponse
+ case Failure(exception) =>
+ throw PinotException(
+ s"An error occured while reading instance info for: '$instance'",
+ exception
+ )
+ }
+ }
+
private def getRoutingTableForQuery(brokerUrl: String, sql: String): Map[String, List[String]] = {
Try {
val encodedPqlQueryParam = URLEncoder.encode(sql, "UTF-8")
@@ -201,3 +223,8 @@ private[pinot] case class TimeBoundaryInfo(timeColumn: String, timeValue: String
def getRealtimePredicate: String = s"$timeColumn >= $timeValue"
}
+
+private[pinot] case class InstanceInfo(instanceName: String,
+ hostName: String,
+ port: String,
+ grpcPort: Int)
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
new file mode 100644
index 0000000000..ff63fa8ab2
--- /dev/null
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.connector.spark.connector
+
+import io.grpc.ManagedChannelBuilder
+import org.apache.pinot.common.proto.PinotQueryServerGrpc
+import org.apache.pinot.common.proto.Server.ServerRequest
+import org.apache.pinot.common.utils.DataTable
+import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.utils.Logging
+import org.apache.pinot.core.common.datatable.DataTableFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * Data fetcher from Pinot Grpc server with specific segments.
+ * Eg: offline-server1: segment1, segment2, segment3
+ */
+private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
+ extends Logging {
+
+ private val channel = ManagedChannelBuilder
+ .forAddress(pinotSplit.serverAndSegments.serverHost, pinotSplit.serverAndSegments.serverGrpcPort)
+ .usePlaintext()
+ .asInstanceOf[ManagedChannelBuilder[_]].build()
+ private val pinotServerBlockingStub = PinotQueryServerGrpc.newBlockingStub(channel)
+
+ def fetchData(): List[DataTable] = {
+ val requestStartTime = System.nanoTime()
+ val request = ServerRequest.newBuilder()
+ .putMetadata("enableStreaming", "true")
+ .addAllSegments(pinotSplit.serverAndSegments.segments.asJava)
+ .setSql(pinotSplit.generatedSQLs.offlineSelectQuery)
+ .build()
+ val serverResponse = pinotServerBlockingStub.submit(request)
+ logInfo(s"Pinot server total response time in millis: ${System.nanoTime() - requestStartTime}")
+
+ try {
+ val dataTables = (for {
+ serverResponse <- serverResponse.asScala.toList
+ if serverResponse.getMetadataMap.get("responseType") == "data"
+ } yield DataTableFactory.getDataTable(serverResponse.getPayload().toByteArray))
+ .filter(_.getNumberOfRows > 0)
+
+ if (dataTables.isEmpty) {
+ throw PinotException(s"Empty response from ${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}")
+ }
+
+ dataTables
+ } finally {
+ channel.shutdown()
+ logInfo("Pinot server connection closed")
+ }
+ }
+}
+
+object PinotGrpcServerDataFetcher {
+ def apply(pinotSplit: PinotSplit): PinotGrpcServerDataFetcher = {
+ new PinotGrpcServerDataFetcher(pinotSplit)
+ }
+}
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
index 900ee35c50..651b87d718 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotServerDataFetcher.scala
@@ -95,7 +95,7 @@ private[pinot] class PinotServerDataFetcher(
val instanceConfig = new InstanceConfig(nullZkId)
instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
instanceConfig.setPort(pinotSplit.serverAndSegments.serverPort)
- // TODO: support grpc and netty-sec
+ // TODO: support netty-sec
val serverInstance = new ServerInstance(instanceConfig)
Map(
serverInstance -> pinotSplit.serverAndSegments.segments.asJava
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala
index ed1c2cc750..1445b3c767 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotSplitter.scala
@@ -18,10 +18,8 @@
*/
package org.apache.pinot.connector.spark.connector
-import java.util.regex.{Matcher, Pattern}
-
import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
-import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
import org.apache.pinot.connector.spark.utils.Logging
import org.apache.pinot.spi.config.table.TableType
@@ -47,47 +45,43 @@ import org.apache.pinot.spi.config.table.TableType
* - partition6: offlineServer10 -> segment20
*/
private[pinot] object PinotSplitter extends Logging {
- private val PINOT_SERVER_PATTERN = Pattern.compile("Server_(.*)_(\\d+)")
def generatePinotSplits(
generatedSQLs: GeneratedSQLs,
routingTable: Map[TableType, Map[String, List[String]]],
- segmentsPerSplit: Int): List[PinotSplit] = {
+ instanceInfoReader: String => InstanceInfo,
+ readParameters: PinotDataSourceReadOptions): List[PinotSplit] = {
routingTable.flatMap {
case (tableType, serversToSegments) =>
serversToSegments
- .map { case (server, segments) => parseServerInput(server, segments) }
+ .map { case (server, segments) => (instanceInfoReader(server), segments) }
.flatMap {
- case (matcher, segments) =>
+ case (instanceInfo, segments) =>
createPinotSplitsFromSubSplits(
tableType,
generatedSQLs,
- matcher,
+ instanceInfo,
segments,
- segmentsPerSplit
- )
+ readParameters.segmentsPerSplit)
}
}.toList
}
- private def parseServerInput(server: String, segments: List[String]): (Matcher, List[String]) = {
- val matcher = PINOT_SERVER_PATTERN.matcher(server)
- if (matcher.matches() && matcher.groupCount() == 2) matcher -> segments
- else throw PinotException(s"'$server' did not match!?")
- }
-
private def createPinotSplitsFromSubSplits(
tableType: TableType,
generatedSQLs: GeneratedSQLs,
- serverMatcher: Matcher,
+ instanceInfo: InstanceInfo,
segments: List[String],
segmentsPerSplit: Int): Iterator[PinotSplit] = {
- val serverHost = serverMatcher.group(1)
- val serverPort = serverMatcher.group(2)
val maxSegmentCount = Math.min(segments.size, segmentsPerSplit)
segments.grouped(maxSegmentCount).map { subSegments =>
- val serverAndSegments =
- PinotServerAndSegments(serverHost, serverPort, subSegments, tableType)
+ val serverAndSegments = {
+ PinotServerAndSegments(instanceInfo.hostName,
+ instanceInfo.port,
+ instanceInfo.grpcPort,
+ subSegments,
+ tableType)
+ }
PinotSplit(generatedSQLs, serverAndSegments)
}
}
@@ -100,6 +94,7 @@ private[pinot] case class PinotSplit(
private[pinot] case class PinotServerAndSegments(
serverHost: String,
serverPort: String,
+ serverGrpcPort: Int,
segments: List[String],
serverType: TableType) {
override def toString: String = s"$serverHost:$serverPort($serverType)"
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala
index ad9e8c8025..1aa643f99a 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptions.scala
@@ -36,10 +36,12 @@ object PinotDataSourceReadOptions {
val CONFIG_USE_PUSH_DOWN_FILTERS = "usePushDownFilters"
val CONFIG_SEGMENTS_PER_SPLIT = "segmentsPerSplit"
val CONFIG_PINOT_SERVER_TIMEOUT_MS = "pinotServerTimeoutMs"
+ var CONFIG_USE_GRPC_SERVER = "useGrpcServer"
private[pinot] val DEFAULT_CONTROLLER: String = "localhost:9000"
private[pinot] val DEFAULT_USE_PUSH_DOWN_FILTERS: Boolean = true
private[pinot] val DEFAULT_SEGMENTS_PER_SPLIT: Int = 3
private[pinot] val DEFAULT_PINOT_SERVER_TIMEOUT_MS: Long = 10000
+ private[pinot] val DEFAULT_USE_GRPC_SERVER: Boolean = false
private[pinot] val tableTypes = Seq("OFFLINE", "REALTIME", "HYBRID")
@@ -75,6 +77,7 @@ object PinotDataSourceReadOptions {
val segmentsPerSplit = options.getInt(CONFIG_SEGMENTS_PER_SPLIT, DEFAULT_SEGMENTS_PER_SPLIT)
val pinotServerTimeoutMs =
options.getLong(CONFIG_PINOT_SERVER_TIMEOUT_MS, DEFAULT_PINOT_SERVER_TIMEOUT_MS)
+ val useGrpcServer = options.getBoolean(CONFIG_USE_GRPC_SERVER, DEFAULT_USE_GRPC_SERVER)
PinotDataSourceReadOptions(
tableName,
@@ -83,7 +86,8 @@ object PinotDataSourceReadOptions {
broker,
usePushDownFilters,
segmentsPerSplit,
- pinotServerTimeoutMs
+ pinotServerTimeoutMs,
+ useGrpcServer
)
}
}
@@ -96,4 +100,5 @@ private[pinot] case class PinotDataSourceReadOptions(
broker: String,
usePushDownFilters: Boolean,
segmentsPerSplit: Int,
- pinotServerTimeoutMs: Long)
+ pinotServerTimeoutMs: Long,
+ useGrpcServer: Boolean)
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
index 6b9d6b291f..f806a2b268 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReader.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.v2.reader.{
import org.apache.spark.sql.types._
import scala.collection.JavaConverters._
+import scala.collection.mutable.Map
/**
* Spark-Pinot datasource reader to read metadata and create partition splits.
@@ -80,8 +81,16 @@ class PinotDataSourceReader(options: DataSourceOptions, userSchema: Option[Struc
val routingTable = PinotClusterClient.getRoutingTable(readParameters.broker, generatedSQLs)
+ val instanceInfo : Map[String, InstanceInfo] = Map()
+ val instanceInfoReader = (instance:String) => { // cached reader to reduce network round trips
+ instanceInfo.getOrElseUpdate(
+ instance,
+ PinotClusterClient.getInstanceInfo(readParameters.controller, instance)
+ )
+ }
+
PinotSplitter
- .generatePinotSplits(generatedSQLs, routingTable, readParameters.segmentsPerSplit)
+ .generatePinotSplits(generatedSQLs, routingTable, instanceInfoReader, readParameters)
.zipWithIndex
.map {
case (pinotSplit, partitionId) =>
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
index 9a659b3ba1..de42b7a6cf 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.connector.spark.datasource
-import org.apache.pinot.connector.spark.connector.{PinotServerDataFetcher, PinotSplit, PinotUtils}
+import org.apache.pinot.connector.spark.connector.{PinotGrpcServerDataFetcher, PinotServerDataFetcher, PinotSplit, PinotUtils}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
import org.apache.spark.sql.types.StructType
@@ -51,9 +51,15 @@ class PinotInputPartitionReader(
override def close(): Unit = {}
private def fetchDataAndConvertToInternalRows(): Iterator[InternalRow] = {
- PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions)
- .fetchData()
- .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
- .toIterator
+ if (dataSourceOptions.useGrpcServer)
+ PinotGrpcServerDataFetcher(pinotSplit)
+ .fetchData()
+ .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
+ .toIterator
+ else
+ PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions)
+ .fetchData()
+ .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
+ .toIterator
}
}
diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
index b49e4fcfa1..0909f920a8 100644
--- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
+++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/ExampleSparkPinotConnectorTest.scala
@@ -39,8 +39,9 @@ object ExampleSparkPinotConnectorTest extends Logging {
readOffline()
readHybrid()
readHybridWithSpecificSchema()
- readOfflineWithFilters()
readHybridWithFilters()
+ readHybridViaGrpc()
+ readHybridWithFiltersViaGrpc()
readRealtimeWithSelectionColumns()
applyJustSomeFilters()
}
@@ -137,6 +138,35 @@ object ExampleSparkPinotConnectorTest extends Logging {
data.show()
}
+ def readHybridViaGrpc()(implicit spark: SparkSession): Unit = {
+ log.info("## Reading `airlineStats_OFFLINE` table... ##")
+ val data = spark.read
+ .format("pinot")
+ .option("table", "airlineStats")
+ .option("tableType", "offline")
+ .option("useGrpcServer", "true")
+ .load()
+
+ data.show()
+ print(data.count())
+ }
+
+ def readHybridWithFiltersViaGrpc()(implicit spark: SparkSession): Unit = {
+ import spark.implicits._
+ log.info("## Reading `airlineStats_OFFLINE` table with filter push down... ##")
+ val data = spark.read
+ .format("pinot")
+ .option("table", "airlineStats")
+ .option("tableType", "OFFLINE")
+ .option("useGrpcServer", "true")
+ .load()
+ .filter($"DestStateName" === "Florida")
+
+ data.show()
+ print(data.count())
+ }
+
+
def applyJustSomeFilters()(implicit spark: SparkSession): Unit = {
import spark.implicits._
log.info("## Reading `airlineStats_OFFLINE and airlineStats_REALTIME` tables with filter push down... ##")
diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala
index 02061218b4..161a17fea3 100644
--- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala
+++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/connector/PinotSplitterTest.scala
@@ -20,14 +20,20 @@ package org.apache.pinot.connector.spark.connector
import org.apache.pinot.connector.spark.BaseTest
import org.apache.pinot.connector.spark.connector.query.GeneratedSQLs
-import org.apache.pinot.connector.spark.exceptions.PinotException
+import org.apache.pinot.connector.spark.datasource.PinotDataSourceReadOptions
import org.apache.pinot.spi.config.table.TableType
+import java.util.regex.Pattern
/**
* Test num of Spark partitions by routing table and input configs.
*/
class PinotSplitterTest extends BaseTest {
private val generatedPql = GeneratedSQLs("tbl", None, "", "")
+ private val mockInstanceInfoReader = (server: String) => {
+ val matcher = Pattern.compile("Server_(.*)_(\\d+)").matcher(server)
+ matcher.matches()
+ InstanceInfo(server, matcher.group(1), matcher.group(2), -1)
+ }
private val routingTable = Map(
TableType.OFFLINE -> Map(
@@ -41,26 +47,38 @@ class PinotSplitterTest extends BaseTest {
)
)
+ private val getReadOptionsWithSegmentsPerSplit = (segmentsPerSplit: Int) => {
+ new PinotDataSourceReadOptions(
+ "tableName",
+ Option(TableType.OFFLINE),
+ "controller",
+ "broker",
+ false,
+ segmentsPerSplit,
+ 1000,
+ false)
+ }
+
test("Total 5 partition splits should be created for maxNumSegmentPerServerRequest = 3") {
- val maxNumSegmentPerServerRequest = 3
+ val readOptions = getReadOptionsWithSegmentsPerSplit(3)
val splitResults =
- PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest)
+ PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions)
splitResults.size shouldEqual 5
}
test("Total 5 partition splits should be created for maxNumSegmentPerServerRequest = 90") {
- val maxNumSegmentPerServerRequest = 90
+ val readOptions = getReadOptionsWithSegmentsPerSplit(90)
val splitResults =
- PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest)
+ PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions)
splitResults.size shouldEqual 5
}
test("Total 10 partition splits should be created for maxNumSegmentPerServerRequest = 1") {
- val maxNumSegmentPerServerRequest = 1
+ val readOptions = getReadOptionsWithSegmentsPerSplit(1)
val splitResults =
- PinotSplitter.generatePinotSplits(generatedPql, routingTable, maxNumSegmentPerServerRequest)
+ PinotSplitter.generatePinotSplits(generatedPql, routingTable, mockInstanceInfoReader, readOptions)
splitResults.size shouldEqual 10
}
@@ -69,31 +87,46 @@ class PinotSplitterTest extends BaseTest {
val inputRoutingTable = Map(
TableType.REALTIME -> Map("Server_192.168.1.100_9000" -> List("segment1"))
)
+ val readOptions = getReadOptionsWithSegmentsPerSplit(5)
- val splitResults = PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, 5)
+ val splitResults = PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, mockInstanceInfoReader, readOptions)
val expectedOutput = List(
PinotSplit(
generatedPql,
- PinotServerAndSegments("192.168.1.100", "9000", List("segment1"), TableType.REALTIME)
+ PinotServerAndSegments("192.168.1.100", "9000", -1, List("segment1"), TableType.REALTIME)
)
)
expectedOutput should contain theSameElementsAs splitResults
}
- test("GeneratePinotSplits method should throw exception due to wrong input Server_HOST_PORT") {
+ test("GeneratePinotSplits with Grpc port reading enabled") {
val inputRoutingTable = Map(
- TableType.REALTIME -> Map(
- "Server_192.168.1.100_9000" -> List("segment1"),
- "Server_192.168.2.100" -> List("segment5")
- )
+ TableType.REALTIME -> Map("Server_192.168.1.100_9000" -> List("segment1"))
)
-
- val exception = intercept[PinotException] {
- PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, 5)
+ val inputReadOptions = new PinotDataSourceReadOptions(
+ "tableName",
+ Option(TableType.REALTIME),
+ "controller",
+ "broker",
+ false,
+ 1,
+ 1000,
+ true)
+
+ val inputGrpcPortReader = (server: String) => {
+ InstanceInfo(server, "192.168.1.100", "9000", 8090)
}
- exception.getMessage shouldEqual "'Server_192.168.2.100' did not match!?"
- }
+ val splitResults =
+ PinotSplitter.generatePinotSplits(generatedPql, inputRoutingTable, inputGrpcPortReader, inputReadOptions)
+ val expectedOutput = List(
+ PinotSplit(
+ generatedPql,
+ PinotServerAndSegments("192.168.1.100", "9000", 8090, List("segment1"), TableType.REALTIME)
+ )
+ )
+ expectedOutput should contain theSameElementsAs splitResults
+ }
}
diff --git a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala
index 83baf3fe3b..f395e0c543 100644
--- a/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala
+++ b/pinot-connectors/pinot-spark-connector/src/test/scala/org/apache/pinot/connector/spark/datasource/PinotDataSourceReadOptionsTest.scala
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.connector.spark.datasource
-import org.apache.pinot.connector.spark.BaseTest
+import org.apache.pinot.connector.spark.{BaseTest, datasource}
import org.apache.pinot.connector.spark.exceptions.PinotException
import org.apache.spark.sql.sources.v2.DataSourceOptions
@@ -36,7 +36,8 @@ class PinotDataSourceReadOptionsTest extends BaseTest {
PinotDataSourceReadOptions.CONFIG_CONTROLLER -> "localhost:9000",
PinotDataSourceReadOptions.CONFIG_BROKER -> "localhost:8000",
PinotDataSourceReadOptions.CONFIG_SEGMENTS_PER_SPLIT -> "1",
- PinotDataSourceReadOptions.CONFIG_USE_PUSH_DOWN_FILTERS -> "false"
+ PinotDataSourceReadOptions.CONFIG_USE_PUSH_DOWN_FILTERS -> "false",
+ PinotDataSourceReadOptions.CONFIG_USE_GRPC_SERVER -> "false",
)
val datasourceOptions = new DataSourceOptions(options.asJava)
@@ -50,7 +51,8 @@ class PinotDataSourceReadOptionsTest extends BaseTest {
"localhost:8000",
false,
1,
- 10000
+ 10000,
+ false
)
pinotDataSourceReadOptions shouldEqual expected
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index 7c738a40c5..61ec1b3a9a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -26,7 +26,9 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -64,6 +66,13 @@ public class HybridQuickstart extends QuickStartBase {
PinotAdministrator.main(arguments.toArray(new String[arguments.size()]));
}
+ public Map<String, Object> getConfigOverrides() {
+ Map<String, Object> overrides = new HashMap<>();
+ overrides.put("pinot.server.grpc.enable", "true");
+ overrides.put("pinot.server.grpc.port", "8090");
+ return overrides;
+ }
+
private QuickstartTableRequest prepareTableRequest(File baseDir)
throws IOException {
_schemaFile = new File(baseDir, "airlineStats_schema.json");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org