You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/11/14 13:18:07 UTC

[GitHub] [spark] dengziming opened a new pull request, #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

dengziming opened a new pull request, #38659:
URL: https://github.com/apache/spark/pull/38659

   
   ### What changes were proposed in this pull request?
   This PR supports local data for LocalRelation, we have 2 approaches to represent a row:
   1. Use Expression.Literal.Struct
   2. Use Seq[google.protobuf.Any]
   Both are not perfect, I choose to use the first approach since the latter makes more massy.
   
   
   ### Why are the changes needed?
   It's necessary to have local data to do unit test and validation.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   unit test.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1024865486


##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -213,7 +213,7 @@ message Deduplicate {
 
 message LocalRelation {
   repeated Expression.QualifiedAttribute attributes = 1;

Review Comment:
   I find we lack a `fromBatchWithSchemaIterator` method correspond to `toBatchWithSchemaIterator`, so I will implement one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1028942267


##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectProtoSuite.scala:
##########
@@ -44,14 +47,18 @@ class SparkConnectProtoSuite extends PlanTest with SparkConnectPlanTest {
 
   lazy val connectTestRelation =
     createLocalRelationProto(
-      Seq(AttributeReference("id", IntegerType)(), AttributeReference("name", StringType)()))
+      Seq(AttributeReference("id", IntegerType)(), AttributeReference("name", StringType)()),
+      Seq())
 
   lazy val connectTestRelation2 =
     createLocalRelationProto(
-      Seq(AttributeReference("id", IntegerType)(), AttributeReference("name", StringType)()))
+      Seq(AttributeReference("id", IntegerType)(), AttributeReference("name", StringType)()),
+      Seq())

Review Comment:
   ```suggestion
         Seq.empty)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1314662296

   @dengziming thanks for the contributions!
   
   I think we'd better apply Arrow batch instead of structs in this proto message.
   
   you may refer to https://github.com/apache/spark/pull/38468 on how to update the proto message, and the implementation of [fromBatchIterator](https://github.com/apache/spark/blob/a45b0811ef22e2b66d52d066784e3fd2d9107f9d/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala#L216-L266) on how to convert arrow batches into internal rows;
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1028943446


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -253,16 +253,94 @@ private[sql] object ArrowConverters extends Logging {
         val vectorLoader = new VectorLoader(root)
         vectorLoader.load(arrowRecordBatch)
         arrowRecordBatch.close()
+        vectorSchemaRootToIter(root)
+      }
+    }
+  }
+
+  /**
+   * Maps iterator from serialized ArrowRecordBatches to InternalRows. Different from
+   * [[fromBatchIterator]], each input arrow batch starts with the schema.
+   */
+  private[sql] def fromBatchWithSchemaIterator(

Review Comment:
   Sorry for late reviews. Can we dedup the logic like `ArrowBatchWithSchemaIterator` is doing?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1318074158

   You can also run the scala lint locally `./dev/lint-scala`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1318904845

   I resolved the comments and move schema to the arrow batch, there are still some TODOs left behind which I will fix after we all agree this plan. @amaliujia @grundprinzip @zhengruifeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1314254660

   The original idea was for the local data to be sent as Arrow IPC batches as well as it follows the same direction as on the return path.
   
   In addition, we have the benefit that the Arrow IPC message actually have a schema embedded so that we can do nice validation on the receive path.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1029986139


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -271,8 +273,12 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
-    val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
-    new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
+    val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(
+      Seq(rel.getData.toByteArray).iterator,

Review Comment:
   I'm fine to remove the row count, it was supported in `collect` just because it was in the initial proto message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1326994258

   @HyukjinKwon Thank you, I'm glad to have a try, but I'm new to python, it will take me some time to get familiar with it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1327737073

   @dengziming If you don't mind, I would create a quick PR that allows reading the data from a Pandas DF because thats very quick and helps us to get to a useful state quickest.
   
   If you're still interested in doing the Python side work, maybe if you can have a look at the `createDataFrame` without Pandas based on Schema and Rows.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1030416642


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -272,8 +274,12 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
-    val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
-    new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
+    val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(
+      Seq(rel.getData.toByteArray).iterator,

Review Comment:
   nit: `Iterator(rel.getData.toByteArray)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1325972726

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation
URL: https://github.com/apache/spark/pull/38659


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1024719459


##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -213,7 +213,7 @@ message Deduplicate {
 
 message LocalRelation {
   repeated Expression.QualifiedAttribute attributes = 1;
-  // TODO: support local data.
+  repeated bytes data = 2;

Review Comment:
   +1
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1024147641


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -272,7 +274,14 @@ class SparkConnectPlanner(session: SparkSession) {
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
     val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
-    new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
+
+    val rows = ArrowConverters.fromBatchIterator(
+      rel.getDataList.asScala.map(_.toByteArray).iterator,

Review Comment:
   If `data` is a regular byte array, it becomes a ByteString that you can simply extract here.



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:
##########
@@ -55,10 +61,18 @@ trait SparkConnectPlanTest extends SharedSparkSession {
    * equivalent in Catalyst and can be easily used for planner testing.
    *
    * @param attrs
+   *   the attributes of LocalRelation
+   * @param literals
+   *   the data of LocalRelation
    * @return
    */
-  def createLocalRelationProto(attrs: Seq[AttributeReference]): proto.Relation = {
+  def createLocalRelationProto(
+      attrs: Seq[AttributeReference],
+      literals: Seq[Array[Byte]]): proto.Relation = {

Review Comment:
   `literals` -> `data`?



##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -213,7 +213,7 @@ message Deduplicate {
 
 message LocalRelation {
   repeated Expression.QualifiedAttribute attributes = 1;
-  // TODO: support local data.
+  repeated bytes data = 2;

Review Comment:
   I'm not sure this needs to be `repeated bytes` here because `bytes` itself is binary data of "arbitrary" length.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1316683099

   I used the arrow format without schema here since we already defined `attributes` in `LocalRelation`, WDYT? @amaliujia @zhengruifeng @grundprinzip 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1027046810


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -271,8 +273,12 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
-    val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
-    new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
+    val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(

Review Comment:
   This looks very good. 



##########
connector/connect/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerSuite.scala:
##########
@@ -354,4 +365,16 @@ class SparkConnectPlannerSuite extends SparkFunSuite with SparkConnectPlanTest {
       transform(proto.Relation.newBuilder.setSetOp(intersect).build()))
     assert(e2.getMessage.contains("Intersect does not support union_by_name"))
   }
+
+  test("transform LocalRelation") {
+    val inputRows = (0 until 10).map(InternalRow(_))

Review Comment:
   The test here is kind of bare bones. Before we fully approve the PR we need to extend the test coverage a bit. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -76,21 +72,26 @@ private[sql] object ArrowConverters extends Logging {
       schema: StructType,
       maxRecordsPerBatch: Long,
       timeZoneId: String,
-      context: TaskContext) extends Iterator[Array[Byte]] {
+      context: TaskContext)
+      extends Iterator[Array[Byte]] {

Review Comment:
   why these changes?



##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -271,9 +271,7 @@ message Deduplicate {
 
 // A relation that does not need to be qualified by name.
 message LocalRelation {
-  // (Optional) A list qualified attributes.
-  repeated Expression.QualifiedAttribute attributes = 1;
-  // TODO: support local data.
+  bytes data = 1;

Review Comment:
   Please add a comment mentioning that the data is stored as arrow IPC message streams and that since the ipc streams contain the schema we don't need to qualify it. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -37,10 +34,9 @@ import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.ArrowUtils
-import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnarBatch, ColumnVector}
+import org.apache.spark.sql.vectorized.{ArrowColumnVector, ColumnVector, ColumnarBatch}

Review Comment:
   afaik this change should break scala Style as CB is before CV



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
hvanhovell commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1029714730


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -271,8 +273,12 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
-    val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
-    new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
+    val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(
+      Seq(rel.getData.toByteArray).iterator,

Review Comment:
   We may want to do this the other way around right? The row count in the current arrowbatch message is not needed, that information is already encoded inside the Arrow IPC stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1314224623

   also cc @hvanhovell 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1028944132


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -271,8 +273,12 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
-    val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
-    new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
+    val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(
+      Seq(rel.getData.toByteArray).iterator,

Review Comment:
   Should we use the same protobuf message you added, @zhengruifeng ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1028942760


##########
core/src/main/scala/org/apache/spark/util/Utils.scala:
##########
@@ -3257,6 +3257,14 @@ private[spark] object Utils extends Logging {
       case _ => math.max(sortedSize(len / 2), 1)
     }
   }
+
+  def closeAll(closeables: AutoCloseable*): Unit = {

Review Comment:
   I think this is too much to have it as a common util at the core module. It's only used twice ..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1029721265


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -213,58 +214,115 @@ private[sql] object ArrowConverters extends Logging {
     }.next()
   }
 
-  /**
-   * Maps iterator from serialized ArrowRecordBatches to InternalRows.
-   */
-  private[sql] def fromBatchIterator(
+  private[sql] abstract class InternalRowIterator(

Review Comment:
   Please add some comment on how this is supposed to be used. The name sounds very innocent :)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -213,58 +214,115 @@ private[sql] object ArrowConverters extends Logging {
     }.next()
   }
 
-  /**
-   * Maps iterator from serialized ArrowRecordBatches to InternalRows.
-   */
-  private[sql] def fromBatchIterator(
+  private[sql] abstract class InternalRowIterator(
       arrowBatchIter: Iterator[Array[Byte]],
-      schema: StructType,
-      timeZoneId: String,
-      context: TaskContext): Iterator[InternalRow] = {
-    val allocator =
-      ArrowUtils.rootAllocator.newChildAllocator("fromBatchIterator", 0, Long.MaxValue)
-
-    val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
-    val root = VectorSchemaRoot.create(arrowSchema, allocator)
-
-    new Iterator[InternalRow] {
-      private var rowIter = if (arrowBatchIter.hasNext) nextBatch() else Iterator.empty
-
-      if (context != null) context.addTaskCompletionListener[Unit] { _ =>
-        root.close()
-        allocator.close()
-      }
+      context: TaskContext)
+      extends Iterator[InternalRow] {
+    // Keep all the resources we have opened in order, should be closed in reverse order finally.
+    val resources = new ArrayBuffer[AutoCloseable]()
+    protected val allocator: BufferAllocator = ArrowUtils.rootAllocator.newChildAllocator(
+      s"to${this.getClass.getSimpleName}",
+      0,
+      Long.MaxValue)
+    resources.append(allocator)
+
+    private var rowIterAndSchema =
+      if (arrowBatchIter.hasNext) nextBatch() else (Iterator.empty, null)
+    // We will ensure schemas parsed from every batch are the same
+    val schema: StructType = rowIterAndSchema._2
+
+    if (context != null) context.addTaskCompletionListener[Unit] { _ =>
+      closeAll(resources.reverse: _*)
+    }
 
-      override def hasNext: Boolean = rowIter.hasNext || {
-        if (arrowBatchIter.hasNext) {
-          rowIter = nextBatch()
-          true
-        } else {
-          root.close()
-          allocator.close()
-          false
+    override def hasNext: Boolean = rowIterAndSchema._1.hasNext || {
+      if (arrowBatchIter.hasNext) {
+        rowIterAndSchema = nextBatch()
+        if (schema != rowIterAndSchema._2) {
+          throw new IllegalArgumentException(
+            s"ArrowBatch iterator contain 2 batches with" +
+              s" different schema: $schema and ${rowIterAndSchema._2}")
         }
+        rowIterAndSchema._1.hasNext
+      } else {
+        closeAll(resources.reverse: _*)
+        false
       }
+    }
 
-      override def next(): InternalRow = rowIter.next()
+    override def next(): InternalRow = rowIterAndSchema._1.next()
 
-      private def nextBatch(): Iterator[InternalRow] = {
-        val arrowRecordBatch = ArrowConverters.loadBatch(arrowBatchIter.next(), allocator)
-        val vectorLoader = new VectorLoader(root)
-        vectorLoader.load(arrowRecordBatch)
-        arrowRecordBatch.close()
+    def nextBatch(): (Iterator[InternalRow], StructType)
+  }
 
-        val columns = root.getFieldVectors.asScala.map { vector =>
-          new ArrowColumnVector(vector).asInstanceOf[ColumnVector]
-        }.toArray
+  private[sql] class InternalRowIteratorWithoutSchema(
+      arrowBatchIter: Iterator[Array[Byte]],
+      schema: StructType,
+      timeZoneId: String,
+      context: TaskContext)
+      extends InternalRowIterator(arrowBatchIter, context) {
+
+    override def nextBatch(): (Iterator[InternalRow], StructType) = {
+      val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+      val root = VectorSchemaRoot.create(arrowSchema, allocator)
+      resources.append(root)
+      val arrowRecordBatch = ArrowConverters.loadBatch(arrowBatchIter.next(), allocator)
+      val vectorLoader = new VectorLoader(root)
+      vectorLoader.load(arrowRecordBatch)
+      arrowRecordBatch.close()
+      (vectorSchemaRootToIter(root), schema)
+    }
+  }
 
-        val batch = new ColumnarBatch(columns)
-        batch.setNumRows(root.getRowCount)
-        batch.rowIterator().asScala
+  private[sql] class InternalRowIteratorWithSchema(
+      arrowBatchIter: Iterator[Array[Byte]],
+      context: TaskContext)
+      extends InternalRowIterator(arrowBatchIter, context) {
+    override def nextBatch(): (Iterator[InternalRow], StructType) = {
+      val reader =
+        new ArrowStreamReader(new ByteArrayInputStream(arrowBatchIter.next()), allocator)
+      val root = if (reader.loadNextBatch()) reader.getVectorSchemaRoot else null
+      resources.append(reader, root)
+      if (root == null) {
+        (Iterator.empty, null)
+      } else {
+        (vectorSchemaRootToIter(root), ArrowUtils.fromArrowSchema(root.getSchema))
       }
     }
   }
 
+  /**
+   * Maps iterator from serialized ArrowRecordBatches to InternalRows.
+   */
+  private[sql] def fromBatchIterator(
+      arrowBatchIter: Iterator[Array[Byte]],
+      schema: StructType,
+      timeZoneId: String,
+      context: TaskContext): Iterator[InternalRow] = new InternalRowIteratorWithoutSchema(
+    arrowBatchIter, schema, timeZoneId, context
+  )
+
+  /**
+   * Maps iterator from serialized ArrowRecordBatches to InternalRows. Different from
+   * [[fromBatchIterator]], each input arrow batch starts with the schema.
+   */
+  private[sql] def fromBatchWithSchemaIterator(
+      arrowBatchIter: Iterator[Array[Byte]],
+      context: TaskContext): (Iterator[InternalRow], StructType) = {
+    val iterator = new InternalRowIteratorWithSchema(arrowBatchIter, context)
+    (iterator, iterator.schema)
+  }
+
+  private def vectorSchemaRootToIter(root: VectorSchemaRoot): Iterator[InternalRow] = {

Review Comment:
   Is this really such a trivial change? Can we maybe add a little bit of doc?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -213,58 +214,115 @@ private[sql] object ArrowConverters extends Logging {
     }.next()
   }
 
-  /**
-   * Maps iterator from serialized ArrowRecordBatches to InternalRows.
-   */
-  private[sql] def fromBatchIterator(
+  private[sql] abstract class InternalRowIterator(
       arrowBatchIter: Iterator[Array[Byte]],
-      schema: StructType,
-      timeZoneId: String,
-      context: TaskContext): Iterator[InternalRow] = {
-    val allocator =
-      ArrowUtils.rootAllocator.newChildAllocator("fromBatchIterator", 0, Long.MaxValue)
-
-    val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
-    val root = VectorSchemaRoot.create(arrowSchema, allocator)
-
-    new Iterator[InternalRow] {
-      private var rowIter = if (arrowBatchIter.hasNext) nextBatch() else Iterator.empty
-
-      if (context != null) context.addTaskCompletionListener[Unit] { _ =>
-        root.close()
-        allocator.close()
-      }
+      context: TaskContext)
+      extends Iterator[InternalRow] {
+    // Keep all the resources we have opened in order, should be closed in reverse order finally.
+    val resources = new ArrayBuffer[AutoCloseable]()
+    protected val allocator: BufferAllocator = ArrowUtils.rootAllocator.newChildAllocator(
+      s"to${this.getClass.getSimpleName}",
+      0,
+      Long.MaxValue)
+    resources.append(allocator)
+
+    private var rowIterAndSchema =
+      if (arrowBatchIter.hasNext) nextBatch() else (Iterator.empty, null)
+    // We will ensure schemas parsed from every batch are the same

Review Comment:
   ```suggestion
       // We will ensure schemas parsed from every batch are the same.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1026348675


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala:
##########
@@ -21,24 +21,22 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.Locale
-

Review Comment:
   Let's keep these newlines. I think Scala linter would complain about this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1024753970


##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -213,7 +213,7 @@ message Deduplicate {
 
 message LocalRelation {
   repeated Expression.QualifiedAttribute attributes = 1;

Review Comment:
   each arrow_batch in collect starts with the schema, it will be consistent if we also do this in createDataFrame



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1028263508


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala:
##########
@@ -21,24 +21,22 @@ import java.nio.charset.StandardCharsets
 import java.sql.{Date, Timestamp}
 import java.text.SimpleDateFormat
 import java.util.Locale
-

Review Comment:
   Thank you, I have reverted these changes.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -76,21 +72,26 @@ private[sql] object ArrowConverters extends Logging {
       schema: StructType,
       maxRecordsPerBatch: Long,
       timeZoneId: String,
-      context: TaskContext) extends Iterator[Array[Byte]] {
+      context: TaskContext)
+      extends Iterator[Array[Byte]] {

Review Comment:
   Those are made by IDE format plugin, I have reverted them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1030009940


##########
connector/connect/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -271,8 +273,12 @@ class SparkConnectPlanner(session: SparkSession) {
   }
 
   private def transformLocalRelation(rel: proto.LocalRelation): LogicalPlan = {
-    val attributes = rel.getAttributesList.asScala.map(transformAttribute(_)).toSeq
-    new org.apache.spark.sql.catalyst.plans.logical.LocalRelation(attributes)
+    val (rows, structType) = ArrowConverters.fromBatchWithSchemaIterator(
+      Seq(rel.getData.toByteArray).iterator,

Review Comment:
   if the information is already in ARROW IPC stream, +1 to remove `row count`.
   
   I don't also think `row count` was used properly in the initial implementation (e.g. probably was not used in the CSV version).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1326211520

   @dengziming this is really awesome. Thanks for addressing all comments, and landing this feature. Since you implemented this, are you also interested in supporting `spark.createDataFrame(panadsDF)` case too? Pandas is arguably super common than just plain `spark.createDataFrame(others)`. Wouldn't be super complicated to implement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1315548580

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1328016930

   @grundprinzip Thank you, I would like to review your code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1024720949


##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -213,7 +213,7 @@ message Deduplicate {
 
 message LocalRelation {
   repeated Expression.QualifiedAttribute attributes = 1;

Review Comment:
   what about removing this field and also read the schema from the arrow batch?  @grundprinzip 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] zhengruifeng commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
zhengruifeng commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1318017962

   you may reformat the scala code by
   `./build/mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=fase -Dscalafmt.validateOnly=false -Dscalafmt.changedOnly=false -pl connector/connect`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1024749559


##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -213,7 +213,7 @@ message Deduplicate {
 
 message LocalRelation {
   repeated Expression.QualifiedAttribute attributes = 1;

Review Comment:
   For short term it should work. We probably only need `name` and `type` to ask server side construct such attributes for the local relation.
   
   For longer term I am not sure. Depending on if there are other extra information that LocalRelation needs from such attributes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1314664845

   Thank you all for you reviews @zhengruifeng @amaliujia @grundprinzip , there may be some delay since I need some time to get familiar with Arrow.🤝


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1314224467

   Question: can we re-use the ARROW collection we have done here?
   
   cc @zhengruifeng 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
amaliujia commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1315710131

   @dengziming thanks!
   
   BTW you can try to covert this PR to `draft` then re-open when you think it is ready for review again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
amaliujia commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1024759018


##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -213,7 +213,7 @@ message Deduplicate {
 
 message LocalRelation {
   repeated Expression.QualifiedAttribute attributes = 1;

Review Comment:
   Sure I am not against to use arrow schema for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on a diff in pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on code in PR #38659:
URL: https://github.com/apache/spark/pull/38659#discussion_r1024864116


##########
connector/connect/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -213,7 +213,7 @@ message Deduplicate {
 
 message LocalRelation {
   repeated Expression.QualifiedAttribute attributes = 1;
-  // TODO: support local data.
+  repeated bytes data = 2;

Review Comment:
   Thank you, I use `repeated bytes`  in case that the batch size is lager than maxRecordsPerBatch,  I think is enough to use `bytes` here since `LocalRelation` is mostly used in debugging cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dengziming commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
dengziming commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1324659424

   Thank you @grundprinzip for your review, I fixed the comments and let's wait for @hvanhovell and @cloud-fan. 🤝


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] grundprinzip commented on pull request #38659: [SPARK-41114][CONNECT] Support local data for LocalRelation

Posted by GitBox <gi...@apache.org>.
grundprinzip commented on PR #38659:
URL: https://github.com/apache/spark/pull/38659#issuecomment-1328019653

   @dengziming please have a look at https://github.com/apache/spark/pull/38803


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org