You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ro...@apache.org on 2023/02/02 16:54:55 UTC

[pinot] branch master updated: [spark-connector] Memory optimization for GRPC data fetcher (#10209)

This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a89491ce1 [spark-connector] Memory optimization for GRPC data fetcher (#10209)
5a89491ce1 is described below

commit 5a89491ce17618a60a8ed952bf7da59e291aae11
Author: Caner Balci <ca...@gmail.com>
AuthorDate: Thu Feb 2 08:54:44 2023 -0800

    [spark-connector] Memory optimization for GRPC data fetcher (#10209)
    
    * Return an Iterator from the dataFetcher rather than a List
---
 .../connector/PinotGrpcServerDataFetcher.scala     | 17 +++++++------
 .../datasource/PinotInputPartitionReader.scala     | 29 ++++++++++++++--------
 2 files changed, 28 insertions(+), 18 deletions(-)

diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
index f543b5b7c9..5c4bd3e48e 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/connector/PinotGrpcServerDataFetcher.scala
@@ -25,6 +25,7 @@ import org.apache.pinot.common.proto.Server.ServerRequest
 import org.apache.pinot.connector.spark.utils.Logging
 import org.apache.pinot.spi.config.table.TableType
 
+import java.io.Closeable
 import scala.collection.JavaConverters._
 
 /**
@@ -32,7 +33,7 @@ import scala.collection.JavaConverters._
  * Eg: offline-server1: segment1, segment2, segment3
  */
 private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
-  extends Logging {
+  extends Logging with Closeable {
 
   private val channel = ManagedChannelBuilder
     .forAddress(pinotSplit.serverAndSegments.serverHost, pinotSplit.serverAndSegments.serverGrpcPort)
@@ -41,8 +42,7 @@ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
     .asInstanceOf[ManagedChannelBuilder[_]].build()
   private val pinotServerBlockingStub = PinotQueryServerGrpc.newBlockingStub(channel)
 
-  def fetchData(): List[DataTable] = {
-    val requestStartTime = System.nanoTime()
+  def fetchData(): Iterator[DataTable] = {
     val request = ServerRequest.newBuilder()
       .putMetadata("enableStreaming", "true")
       .addAllSegments(pinotSplit.serverAndSegments.segments.asJava)
@@ -56,20 +56,23 @@ private[pinot] class PinotGrpcServerDataFetcher(pinotSplit: PinotSplit)
       )
       .build()
     val serverResponse = pinotServerBlockingStub.submit(request)
-    logInfo(s"Pinot server total response time in millis: ${System.nanoTime() - requestStartTime}")
-
     try {
       val dataTables = for {
-        serverResponse <- serverResponse.asScala.toList
+        serverResponse <- serverResponse.asScala
         if serverResponse.getMetadataMap.get("responseType") == "data"
       } yield DataTableFactory.getDataTable(serverResponse.getPayload.toByteArray)
 
       dataTables.filter(_.getNumberOfRows > 0)
+
     } catch {
       case e: io.grpc.StatusRuntimeException =>
         logError(s"Caught exception when reading data from ${pinotSplit.serverAndSegments.serverHost}:${pinotSplit.serverAndSegments.serverGrpcPort}: ${e}")
         throw e
-    } finally {
+    }
+  }
+
+  def close(): Unit = {
+    if (!channel.isShutdown) {
       channel.shutdown()
       logInfo("Pinot server connection closed")
     }
diff --git a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
index de42b7a6cf..ba3966f472 100644
--- a/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
+++ b/pinot-connectors/pinot-spark-connector/src/main/scala/org/apache/pinot/connector/spark/datasource/PinotInputPartitionReader.scala
@@ -23,9 +23,11 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources.v2.reader.InputPartitionReader
 import org.apache.spark.sql.types.StructType
 
+import java.io.Closeable
+
 /**
  * Actual data reader on spark worker side.
- * Represents a spark partition, and is receive data from specified pinot server-segment list.
+ * Represents a spark partition, and receives data from specified pinot server-segment list.
  */
 class PinotInputPartitionReader(
     schema: StructType,
@@ -33,7 +35,8 @@ class PinotInputPartitionReader(
     pinotSplit: PinotSplit,
     dataSourceOptions: PinotDataSourceReadOptions)
   extends InputPartitionReader[InternalRow] {
-  private val responseIterator: Iterator[InternalRow] = fetchDataAndConvertToInternalRows()
+
+  private val (responseIterator: Iterator[InternalRow], source: Closeable) = getIteratorAndSource()
   private[this] var currentRow: InternalRow = _
 
   override def next(): Boolean = {
@@ -48,18 +51,22 @@ class PinotInputPartitionReader(
     currentRow
   }
 
-  override def close(): Unit = {}
+  override def close(): Unit = {
+    source.close()
+  }
 
-  private def fetchDataAndConvertToInternalRows(): Iterator[InternalRow] = {
-    if (dataSourceOptions.useGrpcServer)
-      PinotGrpcServerDataFetcher(pinotSplit)
-        .fetchData()
+  private def getIteratorAndSource(): (Iterator[InternalRow], Closeable) = {
+    if (dataSourceOptions.useGrpcServer) {
+      val dataFetcher = PinotGrpcServerDataFetcher(pinotSplit)
+      val iterable = dataFetcher.fetchData()
         .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
-        .toIterator
-    else
-      PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions)
+      (iterable, dataFetcher)
+    } else {
+      (PinotServerDataFetcher(partitionId, pinotSplit, dataSourceOptions)
         .fetchData()
         .flatMap(PinotUtils.pinotDataTableToInternalRows(_, schema))
-        .toIterator
+        .toIterator,
+        () => {})
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org