You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/18 20:26:56 UTC

[GitHub] [spark] grundprinzip opened a new pull request, #38300: [SPARK-XXX] [CONNECT] Use proper JSON encoding until we have Arrow collection.

grundprinzip opened a new pull request, #38300:
URL: https://github.com/apache/spark/pull/38300

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   This patch provides a temporary implementation of result batches as JSON instead of the 'broken' CSV format that was simply generating unescaped CSV lines. In this implementation we actually leverage the existing Spark functionality to generate JSON and then convert this into result batches for Spark Connect.
   
   ### Why are the changes needed?
   Cleanup
   
   ### Does this PR introduce _any_ user-facing change?
   No / Experimental
   
   ### How was this patch tested?
   E2E tests for the Python Client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38300: [SPARK-XXX] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38300:
URL: https://github.com/apache/spark/pull/38300#issuecomment-1284423465

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1001541171


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -57,21 +60,67 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     processRows(request.getClientId, rows)
   }
 
-  private def processRows(clientId: String, rows: DataFrame) = {
+  private[connect] def processRows(clientId: String, rows: DataFrame): Unit = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .toLocalIterator()

Review Comment:
   I moved this back to `collect()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38300:
URL: https://github.com/apache/spark/pull/38300#issuecomment-1286853167

   All related tests passed.
   
   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1001301224


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -57,21 +60,67 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     processRows(request.getClientId, rows)
   }
 
-  private def processRows(clientId: String, rows: DataFrame) = {
+  private[connect] def processRows(clientId: String, rows: DataFrame): Unit = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .toLocalIterator()

Review Comment:
   Actually it's better to avoid using `toLocalIterator` ...  this can trigger multiple Spark jobs if it includes a shuffle IIRC. We would have to implement a logic like `collectAsArrowToPython` or `collectAsArrowToR` manually.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #38300: [SPARK-XXX] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #38300:
URL: https://github.com/apache/spark/pull/38300#issuecomment-1284945658

   Looks reasonable. 
   
   Only one question: the CSV batch was producing pandas dataframe directly on the python client side. Will this the same for JSON batch or it will give List[Row]?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1000995568


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -59,19 +61,48 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
 
   private def processRows(clientId: String, rows: DataFrame) = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .collect()
+      .foreach(row => {
+        if (sb.size + row.size > MAX_BATCH_SIZE) {
+          val response = proto.Response.newBuilder().setClientId(clientId)
+          val batch = proto.Response.JSONBatch
+            .newBuilder()
+            .setData(ByteString.copyFromUtf8(sb.toString()))
+            .setRowCount(rowCount)
+            .build()
+          response.setJsonBatch(batch)
+          responseObserver.onNext(response.build())
+          // When the data is sent, we have to clear the batch data and reset the row count for
+          // this batch.
+          sb.clear()
+          rowCount = 0
+        } else {
+          // Make sure to put the newline delimiters only between items and not at the end.
+          if (rowCount > 0) {
+            sb.append("\n")

Review Comment:
   JSON cannot have `\n` without it being escaped, so using a plain newline should work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38300: [SPARK-XXX] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38300:
URL: https://github.com/apache/spark/pull/38300#issuecomment-1284477114

   I will create a proper JIRA if the patch makes sense to push.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1000317203


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -59,19 +61,48 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
 
   private def processRows(clientId: String, rows: DataFrame) = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .collect()

Review Comment:
   I think we should use `toLocalIterator` instead of `collect` here to support large dataset



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -59,19 +61,48 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
 
   private def processRows(clientId: String, rows: DataFrame) = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .collect()
+      .foreach(row => {
+        if (sb.size + row.size > MAX_BATCH_SIZE) {
+          val response = proto.Response.newBuilder().setClientId(clientId)
+          val batch = proto.Response.JSONBatch
+            .newBuilder()
+            .setData(ByteString.copyFromUtf8(sb.toString()))
+            .setRowCount(rowCount)
+            .build()
+          response.setJsonBatch(batch)
+          responseObserver.onNext(response.build())
+          // When the data is sent, we have to clear the batch data and reset the row count for
+          // this batch.
+          sb.clear()

Review Comment:
   missing `sb.append(row)` ?



##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -59,19 +61,48 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
 
   private def processRows(clientId: String, rows: DataFrame) = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .collect()
+      .foreach(row => {
+        if (sb.size + row.size > MAX_BATCH_SIZE) {
+          val response = proto.Response.newBuilder().setClientId(clientId)
+          val batch = proto.Response.JSONBatch
+            .newBuilder()
+            .setData(ByteString.copyFromUtf8(sb.toString()))
+            .setRowCount(rowCount)
+            .build()
+          response.setJsonBatch(batch)
+          responseObserver.onNext(response.build())
+          // When the data is sent, we have to clear the batch data and reset the row count for
+          // this batch.
+          sb.clear()
+          rowCount = 0
+        } else {
+          // Make sure to put the newline delimiters only between items and not at the end.
+          if (rowCount > 0) {
+            sb.append("\n")

Review Comment:
   is it safe to use `\n` as the delimiters? `row` may also contains `\n`?
   what about using `repeated string` to carry the rows?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1000995972


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -59,19 +61,48 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
 
   private def processRows(clientId: String, rows: DataFrame) = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .collect()
+      .foreach(row => {
+        if (sb.size + row.size > MAX_BATCH_SIZE) {
+          val response = proto.Response.newBuilder().setClientId(clientId)
+          val batch = proto.Response.JSONBatch
+            .newBuilder()
+            .setData(ByteString.copyFromUtf8(sb.toString()))
+            .setRowCount(rowCount)
+            .build()
+          response.setJsonBatch(batch)
+          responseObserver.onNext(response.build())
+          // When the data is sent, we have to clear the batch data and reset the row count for
+          // this batch.
+          sb.clear()

Review Comment:
   yes! Thank you



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1001332857


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -57,21 +60,67 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     processRows(request.getClientId, rows)
   }
 
-  private def processRows(clientId: String, rows: DataFrame) = {
+  private[connect] def processRows(clientId: String, rows: DataFrame): Unit = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .toLocalIterator()

Review Comment:
   Okay, I read that this is a temporary workaround. SGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1001301371


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -57,21 +60,67 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     processRows(request.getClientId, rows)
   }
 
-  private def processRows(clientId: String, rows: DataFrame) = {
+  private[connect] def processRows(clientId: String, rows: DataFrame): Unit = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .toLocalIterator()

Review Comment:
   Can we directly implement Arrow batch instead of switching to JSON? We have enough time to implement this until Spark 3.4 release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #38300: [SPARK-XXX] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #38300:
URL: https://github.com/apache/spark/pull/38300#issuecomment-1284519696

   I will take a look by the end of the day.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1001301371


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -57,21 +60,67 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
     processRows(request.getClientId, rows)
   }
 
-  private def processRows(clientId: String, rows: DataFrame) = {
+  private[connect] def processRows(clientId: String, rows: DataFrame): Unit = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .toLocalIterator()

Review Comment:
   Can we directly implement Arrow batch instead of switching to JSON? We have enough time to implement this until Spark 3.4 release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38300: [SPARK-XXX] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38300:
URL: https://github.com/apache/spark/pull/38300#issuecomment-1284359563

   @cloud-fan @amaliujia @HyukjinKwon If you have some cylces, it would be great if you could review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38300: [SPARK-XXX] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38300:
URL: https://github.com/apache/spark/pull/38300#issuecomment-1284964479

   > Looks reasonable. 
   > 
   > 
   > 
   > Only one question: the CSV batch was producing pandas dataframe directly on the python client side. Will this the same for JSON batch or it will give List[Row]?
   
   If you look at the python changes it simply changes the calling function from Pandas. In both cases we rely on pandas to do the proper deserialization for now.
   
   Once we have arrow IPC batches we can / should look into producing the pandas df and the list of rows.
   
   For now there is no public facing change. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #38300:
URL: https://github.com/apache/spark/pull/38300#issuecomment-1287280054

   Thanks for working on this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.
URL: https://github.com/apache/spark/pull/38300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38300: [SPARK-40854] [CONNECT] Use proper JSON encoding until we have Arrow collection.

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38300:
URL: https://github.com/apache/spark/pull/38300#discussion_r1001010668


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala:
##########
@@ -59,19 +61,48 @@ class SparkConnectStreamHandler(responseObserver: StreamObserver[Response]) exte
 
   private def processRows(clientId: String, rows: DataFrame) = {
     val timeZoneId = SQLConf.get.sessionLocalTimeZone
-    val schema =
-      ByteString.copyFrom(ArrowUtils.toArrowSchema(rows.schema, timeZoneId).toByteArray)
-
-    val textSchema = rows.schema.fields.map(f => f.name).mkString("|")
-    val data = rows.collect().map(x => x.toSeq.mkString("|")).mkString("\n")
-    val bbb = proto.Response.CSVBatch.newBuilder
-      .setRowCount(-1)
-      .setData(textSchema ++ "\n" ++ data)
-      .build()
-    val response = proto.Response.newBuilder().setClientId(clientId).setCsvBatch(bbb).build()
 
-    // Send all the data
-    responseObserver.onNext(response)
+    // Only process up to 10MB of data.
+    val sb = new StringBuilder
+    var rowCount = 0
+    rows.toJSON
+      .collect()

Review Comment:
   Thanks, will do!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org