You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/27 01:00:23 UTC

[GitHub] [spark] xuechendi opened a new pull request #34396: [SPARK-37124]Add ArrowWritableColumnVector

xuechendi opened a new pull request #34396:
URL: https://github.com/apache/spark/pull/34396


   ### What changes were proposed in this pull request?
   This PR is aim to add Arrow format as an alternative for ColumnVector solution.
   
   ### Why are the changes needed?
   Current ArrowColumnVector is not fully equivalent to OnHeap/OffHeapColumnVector in spark, and since Arrow API is now being more stable, and using pandas udf will perform much better than python udf.
   
   ### What has been done in this pull request?
   What I did in this PR is to create a new class in the same package with OnHeap/OffHeapColumnVector and extend from WritableColumnVector to support all put APIs.
   
   ### How was this patch tested?
   UTs are covering all Data Format with testing on writing to columnVector and reading from columnVector. I also added 3 UTs for testing on loading from ArrowRecordBatch and allocateColumns .
   
   ### Does this PR introduce _any_ user-facing change?
   NO
   
   Signed-off-by: Chendi Xue <ch...@intel.com>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #34396:
URL: https://github.com/apache/spark/pull/34396


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r748648880



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Sounds like this is working towards similar things. @xuechendi the reason I brought up the intermediate conversion to bytes is that it's an expensive step and not necessary if you are just converting  `ArrowRecordBatch` <-> `ColumnarBatch`. It's done in `ArrowConverters` specifically to send/read Arrow messages over a socket with Spark.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747197776



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Same question here too. Why would we need another conversion step of Arrow format between `InternalRow` and `ColumnarBatch`? Are you looking for a Scala version of https://github.com/apache/spark/pull/34505?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
xuechendi commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-953449947


   @sunchao , yes, That is what we planned and did. We implemented mostly used operators/expressions based on arrow-based ColumnVector(ex: project/filter, join, sort, aggr, window). And for spark, we are planning to submit our Arrow Datasource as a new spark.read.format to directly load parquet/orc. We also planned to submit our arrow based InMemoryStore(RDD cache), RowToColumnarExec, PandasUDF serizalizer(which directly copy ArrowColumnVector underlying buffers to python context) once spark community is aligned on using ArrowWritableColumnVector as an alternative solution for current WritableColumnVector.
   And the quick answer of adding ArrowWritableColumnVector here instead of using simply load to arrow and use ArrowColumnVector to get data is because we want to use WritableColumnVector APIs in RowToColumnarExec and Parquet Reader to write to arrow format. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r750306067



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
##########
@@ -71,61 +71,114 @@ private[sql] class ArrowBatchStreamWriter(
 private[sql] object ArrowConverters {
 
   /**
-   * Maps Iterator from InternalRow to serialized ArrowRecordBatches. Limit ArrowRecordBatch size
+   * Maps Iterator from InternalRow to ArrowRecordBatche. Limit ArrowRecordBatch size
    * in a batch by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
    */
-  private[sql] def toBatchIterator(
+  private[sql] def toArrowRecordBatchIterator(

Review comment:
       @BryanCutler and @HyukjinKwon , based on our previous discussion, I moved all arrow related conversions to `ArrowConverters`, so now I added or modified previous c onversions as below 6 functions
   * `toArrowRecordBatchIterartor`  :   `Iterator[InternalRow]` -> `Iterator[ArrowRecordBatch]`
   * `fromArrowRecordBatchIterartor`  :   `Iterator[ArrowRecordBatch]` -> `Iterator[InternalRow]`
   * 
   * `toBatchIterator` `Iterator[InteralRow]` -> `Iterator[Array[Byte]]` (will internally call `toArrowRecordBatchIterartor` )
   * `fromBatchIterator` `Iterator[Array[Byte]]`-> `Iterator[InteralRow]` (will internally call `fromArrowRecordBatchIterator`)
   * 
   * `toColumnarBatchIterator` `Iterator[ArrowRecordBatch]` -> `Iterator[ColumnarBatch]` arrow backend
   * `fromColumnarBatchIterator` `Iterator[ColumnarBatch]` -> `Iterator[ArrowRecordBatch]`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r745365973



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @BryanCutler , my goal here is to convert Iterator[InternalRow] to Iterator[ColumnarBatch] with ArrowColumnVectors as internal ColumnVector. The reason that I converted it to byte arrays first is inspired by @sunchao in previous comment, Quoted: 
   >  instead of having a writable Arrow vector, currently other projects such as Iceberg and Spark RAPIDS choose to first construct the Arrow ValueVectors and then wrap them to Spark with ArrowColumnVector."
   
   So I figured when I need to read from internalRow and write to arrow columnar format, there will be two ways to make that happen, 1) one is to read and write to ArrowRecordBatch and load as ColumnarBatch; 2) the other option is to use Arrow JAVA put APIs to write. 
   And from @sunchao , I think option 1 would make more sense since Spark already has `ArrowConverters.toBatchIterator` function




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747267707



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @HyukjinKwon , I just realized I misunderstand what you meant, what do you mean by `ColumnarToArrow` and `ArrowToColumnar`? so Arrow here is referring to Array Bytes of `ArrowRecordBatch`? And Columnar is `ArrowColumnVector`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r750308135



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +461,24 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        ArrowConverters.toColumnarBatchIterator(
+          ArrowConverters
+            .toArrowRecordBatchIterator(
+              rowIterator,
+              localSchema,
+              maxRecordsPerBatch,
+              timeZoneId,
+              context),
+          localSchema,
+          timeZoneId,
+          context)
+      }
+    }

Review comment:
       @BryanCutler and @HyukjinKwon , for the implementation of RowToColumnarExec, I now use `toArrowRecordBatchIterator` + `toColumnarBatchIterator` to do the conversion, so there will be no serialization and deserialization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r737237402



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowWritableColumnVector.java
##########
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.lang.*;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.vectorized.*;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte
+ * per value and a java array for the values.
+ */
+public final class ArrowWritableColumnVector extends WritableColumnVector {

Review comment:
       I'm fine to use arrow format more widely in operators and/or data sources. But I still don't get it that why do we need a new builder for arrow vectors. Can't we use arrow APIs directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r745359478



##########
File path: sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
##########
@@ -38,6 +43,29 @@
   private final ArrowVectorAccessor accessor;
   private ArrowColumnVector[] childColumns;
 
+  public static ArrowColumnVector[] loadColumns(StructType schema,
+                                                ArrowRecordBatch recordBatch) {

Review comment:
       @BryanCutler, yes, the codes is very similiar because that is how arrow designed to load ArrowRecordBatch to java List[Vector], do you mean I should put this loadColumns function to ArrowConverters, and named as something like <code>ArrowConverters.toColumnarBatchIterator : Iterator[ColumnarBatch]</code> ? I am OK to do that, I just felt it could fit in both scope, in <code>ArrowConverters</code> or in <code>ArrowColumnVector</code>




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r737197861



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowWritableColumnVector.java
##########
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.lang.*;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.vectorized.*;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte
+ * per value and a java array for the values.
+ */
+public final class ArrowWritableColumnVector extends WritableColumnVector {

Review comment:
       @cloud-fan , thanks for reviewing
   yes, <code>ArrowWritableColumnVector</code> is wrapping <code>ArrowColumnVector</code> and then add all put APIs.
   The reason I want to do so is by doing this, <code>ArrowWritableColumnVector</code> can be the third ColumnVector option, and operators which now support ColumnVector such as RowToArrowColumnarExec /  InMemoryStore / ArrowUDF can use arrow as its memory layout format.(Codes are ready in our repo)
   Also, we are also thinking to add arrow datasource as another spark read format to load parquet/orc can directly as Arrow format ColumnVectors in future.
   So for this PR, I am thinking to propose this idea and see if this makes sense to you guys.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747959540



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Oh, okay. I guess there was a bit of miscommunication here.
   
   I am more worrying about symmetry, and I am more talking about SQL physical plan level. I am thinking about having plans such as:
   
   ```scala
   RowsToColumnarExec
   ColumnarToRowsExec
   ArrowToColumnarExec
   ColumnarToArrowExec
   ```
   
   What you want to do is I guess use Arrow-backed Columnar instances, right? You could, for example, pattern match on `RowsToColumnarExec(child)` and replace to something like `ColumnarToArrowExec(RowsToColumnarExec(child))` in Spark extensions.
   
   I presume the overhead between `ColumnarBatch` and `ArrowBatch` is trivial.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-952464162


   I think this is something @BryanCutler pointed out before. cc @tgravescs @revans2 @cloud-fan FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r748941892



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @BryanCutler, are you saying converting between `ArrowRecordBatch` <-> `ColumnarBatch` is expensive? As far as I understand the codes, using Arrow `VectorSchemaRoot` to load `ArrowRecordBatch` just doing `ArrowBuf` reference_count increment then construct as a Arrow List[ValueVector], and this is the same method `ArrowConverters.fromBatchIterator` uses.
   It will be great if you can help me to understand your expected way, thanks!
   I would like to use any existing method in ArrowConverters or add some to it, the thing is the method you previously mentioned I replied before [`fromBatchIterator`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala#L132-L136) is converting to RDD[InternalRow] not RDD[ColumnarBatch](This is why I didn't use it), but I can extract the shared part of the codes as a new function, and called by both `fromBatchIterator`(RDD[InternalRow]) and a new function, let's name it as `toColumnarBatchIterator`(RDD[ColumnarBatch])
   Do you think it makes sense?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r737922896



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowWritableColumnVector.java
##########
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.lang.*;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.vectorized.*;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte
+ * per value and a java array for the values.
+ */
+public final class ArrowWritableColumnVector extends WritableColumnVector {

Review comment:
       @cloud-fan , yes, that is what I wanted, to use <code>WritableColumnVector</code> as the interface for any ColumnVector implementation




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747267707



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @HyukjinKwon , I just realized I misunderstand what you meant, what do you mean by 'ColumnarToArrow` and `ArrowToColumnar`? so Arrow here is referring to Array Bytes of ArrowRecordBatch? And Columnar is ArrowColumnVector?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-957064031


   @cloud-fan @viirya @sunchao , hi, all, I realized that what you guys said makes sense to me, instead of writing data to arrow like what other WritableColumnVector does, I can simply enabling a Load API in ArrowColumnVector and called from RowToColumnarExec, and other operators only needs 'get' APIs which already exsiting in ArrowColumnVector.
   So I updated my PR and wrote some UT by testing load APIs and RowToColumnarExec operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r737262703



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowWritableColumnVector.java
##########
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.lang.*;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.vectorized.*;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte
+ * per value and a java array for the values.
+ */
+public final class ArrowWritableColumnVector extends WritableColumnVector {

Review comment:
       @cloud-fan , and I think only if we can seamlessly call WritableColumnVector APIs to get / put Arrow, this class can be an alternative to OnHeap/OffHeapColumnVector for other operators and readers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747265107



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @HyukjinKwon , `ArrowToColumnarExec`  is not needed because `ArrowColumnVector` is derived from `ColumnVector`, so current `ColumnarToRowExec` is capable of converting Arrow ColumnarBatch to InternalRow. 
   And for `ColumnarToArrowExec`, there is some built-in rules [codes link](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L512-L558) will add a  `ColumnarToRowExec` between two SparkPlans, if we use a new SparkPlan then we need to change above codes as well. So I am hoping we don't add these two new physical plans.
   For mapInArrow case, how about we add a new parameter in RowToColumnarExec to tell it to use Arrow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747240849



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Will you be able to add a separate `SparkPlan` that converts rows (or Columnar) to Arrow, and Arrow to Columnar instead of adding a configuration? Then I think I can leverage that physical plan in `mapInArrow`, and the change won't be invasive too.
   
   For example,  you can add some physical plans like:
   
   ```scala
   ColumnarToArrowExec
   ArrowToColumnarExec
   ```
   
   and then you use it via `SparkExtensions`. Does it make sense?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747990678



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       (FWIW, I think we can deduplicate `toArrowBatchRdd` by using that new physical plan.) And also cc @tgravescs and @revans2 FYI




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747960153



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       or you could replace `RowsToColumnarExec(child)` to `RowsToArrowExec(child)` as an example.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-976099176


   @BryanCutler and @HyukjinKwon, Please help me take a review, I updated this PR per our discussion last week
   Here is my last update
   I moved all arrow related conversions to ArrowConverters, so now I added or modified previous c onversions as below 6 functions
   
   toArrowRecordBatchIterartor : Iterator[InternalRow] -> Iterator[ArrowRecordBatch]
   fromArrowRecordBatchIterartor : Iterator[ArrowRecordBatch] -> Iterator[InternalRow]
   toBatchIterator Iterator[InteralRow] -> Iterator[Array[Byte]] (will internally call toArrowRecordBatchIterartor )
   fromBatchIterator Iterator[Array[Byte]]-> Iterator[InteralRow] (will internally call fromArrowRecordBatchIterator)
   toColumnarBatchIterator Iterator[ArrowRecordBatch] -> Iterator[ColumnarBatch] arrow backend
   fromColumnarBatchIterator Iterator[ColumnarBatch] -> Iterator[ArrowRecordBatch]


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-953470870


   I think this only makes sense if there are operators wanting to consume directly Arrow vectors from ColumnVector interface (i.e. they use Arrow API), so the producer operators must to write out Arrow-based ColumnVector. 
   
   Currently in iceberg and spark-rapids, they have some scan operators that directly output Arrow. To make them compatible with Spark, they just wrap into `ArrowColumnVector` as @sunchao mentioned.
   
   If the requirement is to make some operators that output Arrow vectors fit into Spark, `ArrowColumnVector` is enough for the purpose.
   
   If these operators are all work with ColumnVector interface, they should be fine with any kind of underlying data format (onheap, offheap, or other if any).
   
   
   > And the quick answer of adding ArrowWritableColumnVector here instead of using simply load to arrow and use ArrowColumnVector to get data is because we want to use WritableColumnVector APIs in RowToColumnarExec and Parquet Reader to write to arrow format.
   
   If the Parquet reader already writes arrow format, it seems easy to wrap into `ArrowColumnVector`? Why there is a need to change it to use `WritableColumnVector` API? I think Arrow API is more widely used outside Spark than the internal and private `WritableColumnVector` API. So I suppose the Parquet read should output Arrow vectors? Except for the case that you write a new Parquet reader from scratch and write into `WritableColumnVector`. But if it is true, as it can write into `ColumnVector`, why Arrow or not matters for the reader?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747988830



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       I see, I am not sure if you are familiar with [toArrowBatchRdd](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3857-L3872) in Dataset,  it really looks like what `RowsToArrowExec` is.
   And if your are looking for 
   
   > converts 2. More specifically Rows -> 3 -> 2 transition.
   
   I think that is what I am proposing in `RowToColumnarExec`, so if I think we can add a parameter in `RowToColumnarExec` to specify Arrow as backend, it already gets what we wanted.
   
   After all, if you think adding `RowToArrowExec`  won't be duplicated to `Dataset.toArrowBatchRdd`, I am ok to add these two new physical plans, then do a combination of physical plan of converting Row -> 3 -> 2. I can do it in a seperate PR , but I still hope to make sure adding two new physical plans is a desired way. @cloud-fan @sunchao @BryanCutler
   
   So the proposal here will be
   1. add `ArrowToColumnarExec` which converts `RDD[ArrowRecordBatch]` to `RDD[ColumnarBatch backend by Arrow]`
   2. add `ColumnarToArrowExec` which converts `RDD[ColumnarBatch backend by Arrow]` to `RDD[ArrowRecordBatch]`
   3. add `RowToArrowExec` doing what current [toArrowBatchRdd](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3857-L3872) in Dataset does
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747972652



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Okay, so there are:
   1. `ColumnarBatch`
   2. `ColumnarBatch` backed by Arrow
   3. `ArrowRecordBatch`
   
   and what this PR does creates 2. from rows (instead of 1.). More specifically: `Rows -> 2` transition.
   
   I was thinking something like:
   
   `RowsToArrowExec` converts rows to 3. and `ArrowToColumnarExec` converts 2. More specifically `Rows -> 3 -> 2` transition.
   
   I haven't taken a deeper look but I was thinking about that we should better have such physical plans and separations. I have been looking through related Arrow feature requests .. and my guts say it would be very useful to have these. At least I would like to reuse it.
   
   Does it make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747993176



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Cool, I'll work that. Will send out a PR later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747265107



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @HyukjinKwon , `ArrowToColumnarExec`  is not needed because `ArrowColumnVector` is derived from `ColumnVector`, so current `ColumnarToRowExec` is capable of converting Arrow ColumnarBatch to InternalRow. 
   And for `RowToArrowExec`, there is some built-in rules [codes link](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L512-L558) will add a  `RowToColumnarExec` between two SparkPlans, if we use a new SparkPlan then we need to change above codes as well. So I am hoping we don't add these two new physical plans.
   For mapInArrow case, how about we add a new parameter in `RowToColumnarExec` to tell it to use Arrow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-963903913


   > @xuechendi thanks for the PR, I'd like to hear a little more about your use case especially how you will with batches serialized to byte arrays, and I'm also curious how much this has in common with RAPIDS use of columnar batches.
   
   my use case is that we intend to use Arrow ColumnarBatch as our ColumnarBatch, and we have some operators implemented based on Arrow ColumnarBatch. if this PR is acceptable, then spark will support Arrow as a third candidate for ColumnarBatch Solution(since it can convert Row To Arrow Based Columnar).
   
   I think our use case will be very similar with how RAPIDS use of columnar, so this PR should work for both side?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-957064031


   @cloud-fan @viirya @sunchao , hi, all, I realized that what you guys said makes sense to me, instead of writing data to arrow like what other WritableColumnVector does, I can simply enabling a Load API in ArrowColumnVector and called from RowToColumnarExec, and other operators only needs 'get' APIs which already exsiting in ArrowColumnVector.
   So I updated my PR and wrote some UT by testing load APIs and RowToColumnarExec operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747230550



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Yeah, I am working on Scala version of it at SPARK-37229, and it will require to have a Spark physical plan that converts rows (or Columnar) to Arrow, and Arrow to Columnar. I am wondering if you can leverage this physical plan via using Spark extensions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747988830



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       I see, I am not sure if you are familiar with [toArrowBatchRdd](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L3857-L3872) in Dataset,  it really looks like what `RowsToArrowExec` is.
   And if your are looking for 
   
   > converts 2. More specifically Rows -> 3 -> 2 transition.
   
   I think that is what I am proposing in `RowToColumnarExec`, so if I think we can add a parameter in `RowToColumnarExec` to specify Arrow as backend, it already gets what we wanted.
   
   After all, if you think adding `RowToArrowExec`  won't be duplicated to `Dataset.toArrowBatchRdd`, I am ok to add these two new physical plans, then do a combination of physical plan of converting Row -> 3 -> 2. I can do it in a seperate PR , but I still hope to make sure adding two new physical plans is a desired way. @cloud-fan @sunchao @BryanCutler 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747990678



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       (FWIW, I think we can deduplicate `toArrowBatchRdd` by using that new physical plan.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r748657399



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Oh yeah. I think we shouldn't do the byte conversion step.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r750311542



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala
##########
@@ -1393,6 +1393,55 @@ class ArrowConvertersSuite extends SharedSparkSession {
     assert(count == inputRows.length)
   }
 
+  test("convert from ArrowRecordBatch to ColumnarBatch") {

Review comment:
       @BryanCutler and @HyukjinKwon , new UT is added in `ArrowConvertersSuite.scala`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r750310289



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -426,7 +428,8 @@ trait RowToColumnarTransition extends UnaryExecNode
  * populate with [[RowToColumnConverter]], but the performance requirements are different and it
  * would only be to reduce code.
  */
-case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
+case class RowToColumnarExec(child: SparkPlan, useArrow: Boolean = false)

Review comment:
       @HyukjinKwon , after I added 4 extra conversion functions in `ArrowConverters`, I felt those can also be used by mapInArrow function, so I only added a parameter here to force specify Arrow as Columnar Backend




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747208132



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @HyukjinKwon , I looked into your PR and I am not sure what is the difference between your mapInArrow vs using pandas_udf, so I want to explain my PR vs pandas_udf here to avoid misunderstanding of your PR. 
   
   Compared passing internalRows to pandas_udf, yes, the implementation here is very similiar while purpose is very different. My PR here is to support RowToColumnarExec sparkplan with arrow. By doing so, the following operators like sort, join, aggregate, project/filter can do computation based on Arrow format instead of using On/OffHeapColumnVector. 
   
   And I think pandas_udf is used to do data transformation using pandas built-in computation or calling other thirdparty python package.
   
   Not sure if I answered your question?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r748942238



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @HyukjinKwon , what is the byte conversion step you are mentioning, is this for Iterator[InternalRow] to Iterator[ArrowRecordBatch]  or Iterator[ArrowRecordBatch] to Iterator[ColumnarBatch](From my understanding, this part is not doing heavy memory copy, as replied with some detail in last comment) ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34396: [SPARK-37124]Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-952450706


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r737271557



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowWritableColumnVector.java
##########
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.lang.*;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.vectorized.*;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte
+ * per value and a java array for the values.
+ */
+public final class ArrowWritableColumnVector extends WritableColumnVector {

Review comment:
       So basically you want to always use `WritableColumnVector` to simplify the code, and under the hood, the implementation can be either arrow or Spark on/off heap vectors, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi closed pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi closed pull request #34396:
URL: https://github.com/apache/spark/pull/34396


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-957064031


   @cloud-fan @viirya @sunchao , hi, all, I realized that what you guys said makes sense to me, instead of writing data to arrow like what other WritableColumnVector does, I can simply enabling a Load API in ArrowColumnVector and called from RowToColumnarExec, and other operators only needs 'get' APIs which already exsiting in ArrowColumnVector.
   So I updated my PR and wrote some UT by testing load APIs and RowToColumnarExec operator.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747962594



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       yes, my question is why we need to add two extra physical plans?
   If we make ArrowColumnVector as another backend of ColumnarBatch in `RowsToColumnarExec`, it can also be used by scala version of `mapInArrow`, doesn't it?
   And another thing I am confused here is what is the `Arrow ` and `Columnar`  you are referring to in `ArrowToColumnarExec` and `ColumnarToArrowExec `context? To me, arrow is columnar, so are your talking about adding two Physical Plans to convert between `ArrowRecordBatch(type of RDD[Byte[Array]])` and `ColumnarBatch(type of RDD[ColumnarBatch])`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r748657399



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Oh yeah. I think we shouldn't do the conversion step.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747240849



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       Will you be able to add a separate `SparkPlan` that converts rows (or Columnar) to Arrow, and Arrow to Columnar instead of adding a configuration? Then I think I can leverage that physical plan in the Scala version of `mapInArrow`, and the change won't be invasive too.
   
   For example,  you can add some physical plans like:
   
   ```scala
   ColumnarToArrowExec
   ArrowToColumnarExec
   ```
   
   and then you use it via `SparkExtensions`. Does it make sense?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r737258784



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowWritableColumnVector.java
##########
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.lang.*;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.vectorized.*;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte
+ * per value and a java array for the values.
+ */
+public final class ArrowWritableColumnVector extends WritableColumnVector {

Review comment:
       @cloud-fan , oh, yes, basically what I did inside <code>ArrowWritableColumnVector</code> is to call arrow APIs, but there are some small difference between Arrow APIs and WritableColumnVector APIs, mostly in Decimal, String, Binary, Array, Map and struct. Is that what you asking about?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r737237402



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowWritableColumnVector.java
##########
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.lang.*;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.vectorized.*;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte
+ * per value and a java array for the values.
+ */
+public final class ArrowWritableColumnVector extends WritableColumnVector {

Review comment:
       I'm fine to use arrow format more widely in operators and/or data sources. But I still don't get it that why do we need a new builder/updater for arrow vectors. Can't we use arrow APIs directly?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
sunchao commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-953441743


   @xuechendi I'm not aware of a broader plan to enable Arrow in Spark. Like @viirya pointed out, instead of having a writable Arrow vector, currently other projects such as Iceberg and Spark RAPIDS choose to first construct the Arrow `ValueVector`s and then wrap them to Spark with `ArrowColumnVector`. This seems to work pretty well so far. I'm curious what new use cases you have in mind (and it'd be great if you have any example code to share).
   
   One thing we have discussed is if Spark eventually implements columnar execution, then operators may need to write out intermediate `ColumnVector`s, in which case we may need an Arrow writable vector so that inputs & outputs are sharing the same in-memory format. This is a big if though. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r737188529



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowWritableColumnVector.java
##########
@@ -0,0 +1,1322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.lang.*;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.vectorized.ArrowColumnVector;
+
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.*;
+import org.apache.arrow.vector.complex.*;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.ArrowUtils;
+import org.apache.spark.sql.vectorized.*;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte
+ * per value and a java array for the values.
+ */
+public final class ArrowWritableColumnVector extends WritableColumnVector {

Review comment:
       The `WritableColumnVector` is used for Spark builtin file sources to build onheap/offheap columnar vectors, that's why it's in the `execution` package which is private. This is the only builder of the interval onheap/offheap columnar vectors.
   
   I expect that for arrow vectors, we can build the arrow vector using arrow APIs and then wrap it with the Spark `ArrowColumnVector`. I don't see why an `ArrowWritableColumnVector` is needed. Please correct me if I miss anything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r745202340



##########
File path: sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
##########
@@ -38,6 +43,29 @@
   private final ArrowVectorAccessor accessor;
   private ArrowColumnVector[] childColumns;
 
+  public static ArrowColumnVector[] loadColumns(StructType schema,
+                                                ArrowRecordBatch recordBatch) {

Review comment:
       I'm not really sure this makes sense here, it seems like it's mainly to convert an `ArrowRecordBatch` to Spark `ColumnarBatch`. That is basically what's done in `ArrowConverters.fromBatchIterator`, maybe you could move that to it's own function there?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       I'm not quite sure of your use case for this code, could you explain a bit more what this does for you, especially why serialize the record batch to a byte array?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747265107



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @HyukjinKwon , `ArrowToRowExec`  is not needed because `ArrowColumnVector` is derived from `ColumnVector`, so current `ColumnarToRowExec` is capable of converting Arrow ColumnarBatch to InternalRow. 
   And for `RowToArrowExec`, there is some built-in rules [codes link](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L512-L558) will add a  `RowToColumnarExec` between two SparkPlans, if we use a new SparkPlan then we need to change above codes as well. So I am hoping we don't add these two new physical plans.
   For mapInArrow case, how about we add a new parameter in `RowToColumnarExec` to tell it to use Arrow.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
xuechendi commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r747235627



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
##########
@@ -458,6 +462,34 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition {
     // This avoids calling `schema` in the RDD closure, so that we don't need to include the entire
     // plan (this) in the closure.
     val localSchema = this.schema
+    if (enableArrowColumnVector) {
+      val maxRecordsPerBatch = SQLConf.get.arrowMaxRecordsPerBatch
+      val timeZoneId = SQLConf.get.sessionLocalTimeZone
+      return child.execute().mapPartitionsInternal { rowIterator =>
+        val context = TaskContext.get()
+        val allocator = ArrowUtils.getDefaultAllocator
+        val bytesIterator = ArrowConverters
+          .toBatchIterator(rowIterator, localSchema, maxRecordsPerBatch, timeZoneId, context)

Review comment:
       @HyukjinKwon , yes, That is what we do, we have some implemented operators and plug into spark using Spark Extension configuration + columnarRules to replace original row physical plans, that is why we need RowToColumnarExec and ColumnarToRowExec to do InternalRow and ArrowColumnarBatch conversion. I think one difference with mapInArrow(scala version) is we want to leverage the built-in attribute supportColumnar to avoid code changes to spark application. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] BryanCutler commented on a change in pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
BryanCutler commented on a change in pull request #34396:
URL: https://github.com/apache/spark/pull/34396#discussion_r748647406



##########
File path: sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ArrowColumnVector.java
##########
@@ -38,6 +43,29 @@
   private final ArrowVectorAccessor accessor;
   private ArrowColumnVector[] childColumns;
 
+  public static ArrowColumnVector[] loadColumns(StructType schema,
+                                                ArrowRecordBatch recordBatch) {

Review comment:
       I was more suggesting something like adding `ArrowConverters.toColumnarBatch(recordBatch: ArrowRecordBatch): ColumnarBatch`. That is already basically one internally in `ArrowConverters.fromBatchIterator`, so instead of duplicating that same conversion just move the existing it to a function if it fits your needs too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #34396: [SPARK-37124][SQL] Support RowToColumnarExec with Arrow format

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-1058691919


   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
sunchao commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-953430379


   cc @viirya I remember we have talked about something similar


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-953436784


   @sunchao Yea, there is basically similar thing happened in iceberg and spark-rapids, IIRC.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuechendi commented on pull request #34396: [SPARK-37124][SQL] Add ArrowWritableColumnVector

Posted by GitBox <gi...@apache.org>.
xuechendi commented on pull request #34396:
URL: https://github.com/apache/spark/pull/34396#issuecomment-953431968


   Hi @sunchao @viirya @cloud-fan , if there is a broader scope/plan of enabling Arrow in spark, do you think we can set up a meeting for quick align?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org