You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/02/02 16:54:55 UTC
[pinot] branch master updated: [spark-connector] Memory optimization for GRPC data fetcher (#10209)
This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5a89491ce1 [spark-connector] Memory optimization for GRPC data fetcher (#10209)
5a89491ce1 is described below
commit 5a89491ce17618a60a8ed952bf7da59e291aae11
Author: Caner Balci <ca...@gmail.com>
AuthorDate: Thu Feb 2 08:54:44 2023 -0800
[spark-connector] Memory optimization for GRPC data fetcher (#10209)
* Return an Iterator from the dataFetcher rather than a List
---
.../connector/PinotGrpcServerDataFetcher.scala | 17 +++++++------
.../datasource/PinotInputPartitionReader.scala | 29 ++++++++++++++--------
2 files changed, 28 insertions(+), 18 deletions(-)
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
index f543b5b7c9..5c4bd3e48e 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
@@ -25,6 +25,7 @@ import org.apache.pinot.common.proto.Server.ServerRequest
import org.apache.pinot.connector.spark.utils.Logging
import org.apache.pinot.spi.config.table.TableType
+import java.io.Closeable
import scala.collection.JavaConverters._
/**
@@ -32,7 +33,7 @@ import scala.collection.JavaConverters._
* Eg: offline-server1: segment1, segment2, segment3
*/
private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
- extends Logging {
+ extends Logging with Closeable {
private val channel = ManagedChannelBuilder
.forAddress(pinotSplit.serverAndSegments.serverHost, pinotSplit.serverAndSegments.serverGrpcPort)
@@ -41,8 +42,7 @@ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
.asInstanceOf[ManagedChannelBuilder[_]].build()
private val pinotServerBlockingStub = PinotQueryServerGrpc.newBlockingStub(channel)
- def fetchData(): List[DataTable] = {
- val requestStartTime = System.nanoTime()
+ def fetchData(): Iterator[DataTable] = {
val request = ServerRequest.newBuilder()
.putMetadata("enableStreaming", "true")
.addAllSegments(pinotSplit.serverAndSegments.segments.asJava)
@@ -56,20 +56,23 @@ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
)
.build()
val serverResponse = pinotServerBlockingStub.submit(request)
- logInfo(s"Pinot server total response time in millis: ${System.nanoTime() - requestStartTime}")
-
try {
val dataTables = for {
- serverResponse <- serverResponse.asScala.toList
+ serverResponse <- serverResponse.asScala
if serverResponse.getMetadataMap.get("responseType") == "data"
} yield DataTableFactory.getDataTable(serverResponse.getPayload.toByteArray)
dataTables.filter(_.getNumberOfRows > 0)
+
} catch {
case e: io.grpc.StatusRuntimeException =>
logError(s"Caught exception when reading data from ${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}: ${e}")
throw e
- } finally {
+ }
+ }
+
+ def close(): Unit = {
+ if (!channel.isShutdown) {
channel.shutdown()
logInfo("Pinot server connection closed")
}
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
index de42b7a6cf..ba3966f472 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
@@ -23,9 +23,11 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
import org.apache.spark.sql.types.StructType
+import java.io.Closeable
+
/**
* Actual data reader on spark worker side.
- * Represents a spark partition, and is receive data from specified pinot server-segment list.
+ * Represents a spark partition, and receives data from specified pinot server-segment list.
*/
class PinotInputPartitionReader(
schema: StructType,
@@ -33,7 +35,8 @@ class PinotInputPartitionReader(
pinotSplit: PinotSplit,
dataSourceOptions: PinotDataSourceReadOptions)
extends InputPartitionReader[InternalRow] {
- private val responseIterator: Iterator[InternalRow] = fetchDataAndConvertToInternalRows()
+
+ private val (responseIterator: Iterator[InternalRow], source: Closeable) = getIteratorAndSource()
private[this] var currentRow: InternalRow = _
override def next(): Boolean = {
@@ -48,18 +51,22 @@ class PinotInputPartitionReader(
currentRow
}
- override def close(): Unit = {}
+ override def close(): Unit = {
+ source.close()
+ }
- private def fetchDataAndConvertToInternalRows(): Iterator[InternalRow] = {
- if (dataSourceOptions.useGrpcServer)
- PinotGrpcServerDataFetcher(pinotSplit)
- .fetchData()
+ private def getIteratorAndSource(): (Iterator[InternalRow], Closeable) = {
+ if (dataSourceOptions.useGrpcServer) {
+ val dataFetcher = PinotGrpcServerDataFetcher(pinotSplit)
+ val iterable = dataFetcher.fetchData()
.flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
- .toIterator
- else
- PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions)
+ (iterable, dataFetcher)
+ } else {
+ (PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions)
.fetchData()
.flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
- .toIterator
+ .toIterator,
+ () => {})
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org