You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by ueshin <gi...@git.apache.org> on 2017/07/17 12:49:53 UTC

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

GitHub user ueshin opened a pull request:

    https://github.com/apache/spark/pull/18655

    [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and add DecimalType, ArrayType and StructType support.

    ## What changes were proposed in this pull request?
    
    This is a refactoring of `ArrowConverters` and related classes.
    
    1. Introduce `ArrowColumnVector` to read Arrow data in Java.
    2. Refactor `ColumnWriter` as `ArrowWriter`.
    3. Add `DecimalType`, `ArrayType` and `StructType` support.
    4. Refactor `ArrowConverters` to skip intermediate `ArrowRecordBatch` creation.
    
    ## How was this patch tested?
    
    Added some tests and existing tests.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ueshin/apache-spark issues/SPARK-21440

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18655.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18655
    
----
commit 1439fe6d320208ccb565d18fc4b7485210068330
Author: Takuya UESHIN <ue...@databricks.com>
Date:   2017-07-13T08:45:33Z

    Introduce ArrowWriter and ArrowColumnVector.

commit 6fcf700d381a18704b3ee485083ee51c82bbd2f8
Author: Takuya UESHIN <ue...@databricks.com>
Date:   2017-07-14T05:48:33Z

    Use ArrowWriter for ArrowConverters.

commit 579def2db0a0f015760a458032d3bd916669201c
Author: Takuya UESHIN <ue...@databricks.com>
Date:   2017-07-14T09:42:32Z

    Refactor ArrowConverters.

commit 58cd46506b02800269380f7c8acb5f9825664cad
Author: Takuya UESHIN <ue...@databricks.com>
Date:   2017-07-17T06:30:17Z

    Move releasing memory into task completion listener.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79744 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79744/testReport)** for PR 18655 at commit [`a50a271`](https://github.com/apache/spark/commit/a50a271b2738cf25748f2376935d5b30bf4bc3aa).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128312860
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se
       def asPythonSerializable: Array[Byte] = payload
     }
     
    -private[sql] object ArrowPayload {
    -
    -  /**
    -   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
    -   */
    -  def apply(
    -      batch: ArrowRecordBatch,
    -      schema: StructType,
    -      allocator: BufferAllocator): ArrowPayload = {
    -    new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator))
    -  }
    -}
    -
     private[sql] object ArrowConverters {
     
       /**
    -   * Map a Spark DataType to ArrowType.
    -   */
    -  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
    -    dataType match {
    -      case BooleanType => ArrowType.Bool.INSTANCE
    -      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
    -      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
    -      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
    -      case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
    -      case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
    -      case ByteType => new ArrowType.Int(8, true)
    -      case StringType => ArrowType.Utf8.INSTANCE
    -      case BinaryType => ArrowType.Binary.INSTANCE
    -      case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType")
    -    }
    -  }
    -
    -  /**
    -   * Convert a Spark Dataset schema to Arrow schema.
    -   */
    -  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
    -    val arrowFields = schema.fields.map { f =>
    -      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava)
    -    }
    -    new Schema(arrowFields.toList.asJava)
    -  }
    -
    -  /**
        * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload
        * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
        */
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    -
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    -      }
    -      recordsInBatch += 1
    +    context.addTaskCompletionListener { _ =>
    +      root.close()
    +      allocator.close()
    --- End diff --
    
    Yeah, good point.  What about closing resources in both ways?  So have the listener close for the case that something fails, otherwise once the the row iterator is fully consumed then close immediately.  I'm not really sure at what exact point the task completion listener callback is done, is it dependent on any IO? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79696 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79696/testReport)** for PR 18655 at commit [`8ffedda`](https://github.com/apache/spark/commit/8ffedda9f05d379d700aef95dca049a751374f87).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r127734562
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.vectorized;
    +
    +import org.apache.arrow.vector.*;
    +import org.apache.arrow.vector.complex.*;
    +import org.apache.arrow.vector.holders.NullableVarCharHolder;
    +
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.types.*;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +/**
    + * A column backed by Apache Arrow.
    + */
    +public final class ArrowColumnVector extends ColumnVector {
    +
    +  private ValueVector vector;
    +  private ValueVector.Accessor nulls;
    +
    +  private NullableBitVector boolData;
    +  private NullableTinyIntVector byteData;
    +  private NullableSmallIntVector shortData;
    +  private NullableIntVector intData;
    +  private NullableBigIntVector longData;
    +
    +  private NullableFloat4Vector floatData;
    +  private NullableFloat8Vector doubleData;
    +  private NullableDecimalVector decimalData;
    +
    +  private NullableVarCharVector stringData;
    +
    +  private NullableVarBinaryVector binaryData;
    +
    +  private UInt4Vector listOffsetData;
    +
    +  public ArrowColumnVector(ValueVector vector) {
    +    super(vector.getValueCapacity(), DataTypes.NullType, MemoryMode.OFF_HEAP);
    +    initialize(vector);
    +  }
    +
    +  @Override
    +  public long nullsNativeAddress() {
    +    throw new RuntimeException("Cannot get native address for arrow column");
    +  }
    +
    +  @Override
    +  public long valuesNativeAddress() {
    +    throw new RuntimeException("Cannot get native address for arrow column");
    +  }
    +
    +  @Override
    +  public void close() {
    +    if (childColumns != null) {
    +      for (int i = 0; i < childColumns.length; i++) {
    +        childColumns[i].close();
    +      }
    +    }
    +    vector.close();
    +  }
    +
    +  //
    +  // APIs dealing with nulls
    +  //
    +
    +  @Override
    +  public void putNotNull(int rowId) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void putNull(int rowId) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void putNulls(int rowId, int count) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void putNotNulls(int rowId, int count) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean isNullAt(int rowId) {
    +    return nulls.isNull(rowId);
    +  }
    +
    +  //
    +  // APIs dealing with Booleans
    +  //
    +
    +  @Override
    +  public void putBoolean(int rowId, boolean value) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void putBooleans(int rowId, int count, boolean value) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean getBoolean(int rowId) {
    +    return boolData.getAccessor().get(rowId) == 1;
    --- End diff --
    
    Can we use `nulls`? If so, it would be better to use another name instead of `nulls`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129480076
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    -
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    var closed = false
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    +    context.addTaskCompletionListener { _ =>
    +      if (!closed) {
    --- End diff --
    
    is this a bug in arrow?  cc @BryanCutler 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79995 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79995/testReport)** for PR 18655 at commit [`0bac10d`](https://github.com/apache/spark/commit/0bac10d95637c1afa632210b5feca079a61a35d2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79696/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79955 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79955/testReport)** for PR 18655 at commit [`5bbb46f`](https://github.com/apache/spark/commit/5bbb46f55a90bae3008ee374ef2c0349a3489c09).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79741 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79741/testReport)** for PR 18655 at commit [`a50a271`](https://github.com/apache/spark/commit/a50a271b2738cf25748f2376935d5b30bf4bc3aa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128146856
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/vectorized/ArrowWriter.scala ---
    @@ -0,0 +1,405 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.vectorized
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.arrow.vector._
    +import org.apache.arrow.vector.complex._
    +import org.apache.arrow.vector.util.DecimalUtility
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
    +import org.apache.spark.sql.types._
    +
    +object ArrowWriter {
    +
    +  def create(schema: StructType): ArrowWriter = {
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator)
    +    create(root)
    +  }
    +
    +  def create(root: VectorSchemaRoot): ArrowWriter = {
    +    val children = root.getFieldVectors().asScala.map { vector =>
    +      vector.allocateNew()
    +      createFieldWriter(vector)
    +    }
    +    new ArrowWriter(root, children.toArray)
    +  }
    +
    +  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
    +    val field = vector.getField()
    +    ArrowUtils.fromArrowField(field) match {
    +      case BooleanType =>
    +        new BooleanWriter(vector.asInstanceOf[NullableBitVector])
    +      case ByteType =>
    +        new ByteWriter(vector.asInstanceOf[NullableTinyIntVector])
    +      case ShortType =>
    +        new ShortWriter(vector.asInstanceOf[NullableSmallIntVector])
    +      case IntegerType =>
    +        new IntegerWriter(vector.asInstanceOf[NullableIntVector])
    +      case LongType =>
    +        new LongWriter(vector.asInstanceOf[NullableBigIntVector])
    +      case FloatType =>
    +        new FloatWriter(vector.asInstanceOf[NullableFloat4Vector])
    +      case DoubleType =>
    +        new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector])
    +      case DecimalType.Fixed(precision, scale) =>
    +        new DecimalWriter(vector.asInstanceOf[NullableDecimalVector], precision, scale)
    +      case StringType =>
    +        new StringWriter(vector.asInstanceOf[NullableVarCharVector])
    +      case BinaryType =>
    +        new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector])
    +      case ArrayType(_, _) =>
    +        val v = vector.asInstanceOf[ListVector]
    +        val elementVector = createFieldWriter(v.getDataVector())
    +        new ArrayWriter(v, elementVector)
    +      case StructType(_) =>
    +        val v = vector.asInstanceOf[NullableMapVector]
    +        val children = (0 until v.size()).map { ordinal =>
    +          createFieldWriter(v.getChildByOrdinal(ordinal))
    +        }
    +        new StructWriter(v, children.toArray)
    +    }
    +  }
    +}
    +
    +class ArrowWriter(
    +    val root: VectorSchemaRoot,
    +    fields: Array[ArrowFieldWriter]) {
    +
    +  def schema: StructType = StructType(fields.map { f =>
    +    StructField(f.name, f.dataType, f.nullable)
    +  })
    +
    +  private var count: Int = 0
    +
    +  def write(row: InternalRow): Unit = {
    +    var i = 0
    +    while (i < fields.size) {
    +      fields(i).write(row, i)
    +      i += 1
    +    }
    +    count += 1
    +  }
    +
    +  def finish(): Unit = {
    +    root.setRowCount(count)
    +    fields.foreach(_.finish())
    +  }
    +
    +  def reset(): Unit = {
    +    root.setRowCount(0)
    +    count = 0
    +    fields.foreach(_.reset())
    +  }
    +}
    +
    +private[sql] abstract class ArrowFieldWriter {
    +
    +  def valueVector: ValueVector
    +  def valueMutator: ValueVector.Mutator
    +
    +  def name: String = valueVector.getField().getName()
    +  def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField())
    +  def nullable: Boolean = valueVector.getField().isNullable()
    +
    +  def setNull(): Unit
    +  def setValue(input: SpecializedGetters, ordinal: Int): Unit
    +  def skip(): Unit
    --- End diff --
    
    This is for the case if the value of the struct type is null.
    I believe if the value of the struct type, the fields should have some values for the same row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79698 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79698/testReport)** for PR 18655 at commit [`8ffedda`](https://github.com/apache/spark/commit/8ffedda9f05d379d700aef95dca049a751374f87).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79996/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    +1 on holding off for `DecimalType` support


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79786 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79786/testReport)** for PR 18655 at commit [`7084b38`](https://github.com/apache/spark/commit/7084b388d87c8347b79898827658d7827bf5649d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    @BryanCutler @wesm @cpcloud I filed a JIRA issue for decimal type support [SPARK-21552](https://issues.apache.org/jira/browse/SPARK-21552) and sent a pr for it as WIP #18754.
    Let's move on there for discussing decimal type support.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128146851
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se
       def asPythonSerializable: Array[Byte] = payload
     }
     
    -private[sql] object ArrowPayload {
    -
    -  /**
    -   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
    -   */
    -  def apply(
    -      batch: ArrowRecordBatch,
    -      schema: StructType,
    -      allocator: BufferAllocator): ArrowPayload = {
    -    new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator))
    -  }
    -}
    -
     private[sql] object ArrowConverters {
     
       /**
    -   * Map a Spark DataType to ArrowType.
    -   */
    -  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
    -    dataType match {
    -      case BooleanType => ArrowType.Bool.INSTANCE
    -      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
    -      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
    -      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
    -      case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
    -      case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
    -      case ByteType => new ArrowType.Int(8, true)
    -      case StringType => ArrowType.Utf8.INSTANCE
    -      case BinaryType => ArrowType.Binary.INSTANCE
    -      case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType")
    -    }
    -  }
    -
    -  /**
    -   * Convert a Spark Dataset schema to Arrow schema.
    -   */
    -  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
    -    val arrowFields = schema.fields.map { f =>
    -      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava)
    -    }
    -    new Schema(arrowFields.toList.asJava)
    -  }
    -
    -  /**
        * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload
        * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
        */
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    -
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    -      }
    -      recordsInBatch += 1
    +    context.addTaskCompletionListener { _ =>
    +      root.close()
    +      allocator.close()
         }
     
    -    val (fieldNodes, bufferArrays) = columnWriters.map(_.finish()).unzip
    -    val buffers = bufferArrays.flatten
    -
    -    val rowLength = if (fieldNodes.nonEmpty) fieldNodes.head.getLength else 0
    -    val recordBatch = new ArrowRecordBatch(rowLength,
    -      fieldNodes.toList.asJava, buffers.toList.asJava)
    +    new Iterator[ArrowPayload] {
     
    -    buffers.foreach(_.release())
    -    recordBatch
    -  }
    +      override def hasNext: Boolean = rowIter.hasNext
     
    -  /**
    -   * Convert an ArrowRecordBatch to a byte array and close batch to release resources. Once closed,
    -   * the batch can no longer be used.
    -   */
    -  private[arrow] def batchToByteArray(
    -      batch: ArrowRecordBatch,
    -      schema: StructType,
    -      allocator: BufferAllocator): Array[Byte] = {
    -    val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
    -    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    -    val out = new ByteArrayOutputStream()
    -    val writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
    +      override def next(): ArrowPayload = {
    +        val out = new ByteArrayOutputStream()
    +        val writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
    +
    +        Utils.tryWithSafeFinally {
    +          var rowId = 0
    --- End diff --
    
    Thanks! I'll update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128598428
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala ---
    @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll {
         collectAndValidate(df, json, "floating_point-double_precision.json")
       }
     
    +  ignore("decimal conversion") {
    --- End diff --
    
    Arrow integration support for DecimalType isn't slated until v0.6, so it might work but there are no guarantees that a record batch in Java will equal when that batch is read by Python/C++.  Also, we can't test here until the `JsonFileReader` supports it also.  I made the Arrow JIRA here https://issues.apache.org/jira/browse/ARROW-1238


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129484715
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    -
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    var closed = false
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    +    context.addTaskCompletionListener { _ =>
    +      if (!closed) {
    --- End diff --
    
    The root just releases the buffers from the FieldVectors, so I would think it should be able to handle being closed twice.  I'll check tomorrow if seems reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79744 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79744/testReport)** for PR 18655 at commit [`a50a271`](https://github.com/apache/spark/commit/a50a271b2738cf25748f2376935d5b30bf4bc3aa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79741 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79741/testReport)** for PR 18655 at commit [`a50a271`](https://github.com/apache/spark/commit/a50a271b2738cf25748f2376935d5b30bf4bc3aa).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79798/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79955 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79955/testReport)** for PR 18655 at commit [`5bbb46f`](https://github.com/apache/spark/commit/5bbb46f55a90bae3008ee374ef2c0349a3489c09).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79744/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79737 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79737/testReport)** for PR 18655 at commit [`b5988f9`](https://github.com/apache/spark/commit/b5988f9a223de407b7709f239fca672bb02b60aa).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129735129
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    -
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    var closed = false
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    +    context.addTaskCompletionListener { _ =>
    +      if (!closed) {
    --- End diff --
    
    I filed https://issues.apache.org/jira/browse/ARROW-1283 to fix this.  For now, it looks like we need this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128146843
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se
       def asPythonSerializable: Array[Byte] = payload
     }
     
    -private[sql] object ArrowPayload {
    -
    -  /**
    -   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
    -   */
    -  def apply(
    -      batch: ArrowRecordBatch,
    -      schema: StructType,
    -      allocator: BufferAllocator): ArrowPayload = {
    -    new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator))
    -  }
    -}
    -
     private[sql] object ArrowConverters {
     
       /**
    -   * Map a Spark DataType to ArrowType.
    -   */
    -  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
    -    dataType match {
    -      case BooleanType => ArrowType.Bool.INSTANCE
    -      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
    -      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
    -      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
    -      case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
    -      case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
    -      case ByteType => new ArrowType.Int(8, true)
    -      case StringType => ArrowType.Utf8.INSTANCE
    -      case BinaryType => ArrowType.Binary.INSTANCE
    -      case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType")
    -    }
    -  }
    -
    -  /**
    -   * Convert a Spark Dataset schema to Arrow schema.
    -   */
    -  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
    -    val arrowFields = schema.fields.map { f =>
    -      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava)
    -    }
    -    new Schema(arrowFields.toList.asJava)
    -  }
    -
    -  /**
        * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload
        * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
        */
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    -
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    -      }
    -      recordsInBatch += 1
    +    context.addTaskCompletionListener { _ =>
    +      root.close()
    +      allocator.close()
    --- End diff --
    
    I was worried about memory leak when an exception happens during iterating. In that case, the task will fail before the row iterator is completely consumed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79957/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    I see, I'll move files back to `arrow` package.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79798 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79798/testReport)** for PR 18655 at commit [`6fc4da0`](https://github.com/apache/spark/commit/6fc4da05a84ee55ec8fd98c078c16d5671a303cc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79798 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79798/testReport)** for PR 18655 at commit [`6fc4da0`](https://github.com/apache/spark/commit/6fc4da05a84ee55ec8fd98c078c16d5671a303cc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class ArrowWriterSuite extends SparkFunSuite `


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128064385
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/vectorized/ArrowWriter.scala ---
    @@ -0,0 +1,405 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.vectorized
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.arrow.vector._
    +import org.apache.arrow.vector.complex._
    +import org.apache.arrow.vector.util.DecimalUtility
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
    +import org.apache.spark.sql.types._
    +
    +object ArrowWriter {
    +
    +  def create(schema: StructType): ArrowWriter = {
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator)
    +    create(root)
    +  }
    +
    +  def create(root: VectorSchemaRoot): ArrowWriter = {
    +    val children = root.getFieldVectors().asScala.map { vector =>
    +      vector.allocateNew()
    +      createFieldWriter(vector)
    +    }
    +    new ArrowWriter(root, children.toArray)
    +  }
    +
    +  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
    +    val field = vector.getField()
    +    ArrowUtils.fromArrowField(field) match {
    +      case BooleanType =>
    +        new BooleanWriter(vector.asInstanceOf[NullableBitVector])
    +      case ByteType =>
    +        new ByteWriter(vector.asInstanceOf[NullableTinyIntVector])
    +      case ShortType =>
    +        new ShortWriter(vector.asInstanceOf[NullableSmallIntVector])
    +      case IntegerType =>
    +        new IntegerWriter(vector.asInstanceOf[NullableIntVector])
    +      case LongType =>
    +        new LongWriter(vector.asInstanceOf[NullableBigIntVector])
    +      case FloatType =>
    +        new FloatWriter(vector.asInstanceOf[NullableFloat4Vector])
    +      case DoubleType =>
    +        new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector])
    +      case DecimalType.Fixed(precision, scale) =>
    +        new DecimalWriter(vector.asInstanceOf[NullableDecimalVector], precision, scale)
    +      case StringType =>
    +        new StringWriter(vector.asInstanceOf[NullableVarCharVector])
    +      case BinaryType =>
    +        new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector])
    +      case ArrayType(_, _) =>
    +        val v = vector.asInstanceOf[ListVector]
    +        val elementVector = createFieldWriter(v.getDataVector())
    +        new ArrayWriter(v, elementVector)
    +      case StructType(_) =>
    +        val v = vector.asInstanceOf[NullableMapVector]
    +        val children = (0 until v.size()).map { ordinal =>
    +          createFieldWriter(v.getChildByOrdinal(ordinal))
    +        }
    +        new StructWriter(v, children.toArray)
    +    }
    +  }
    +}
    +
    +class ArrowWriter(
    +    val root: VectorSchemaRoot,
    +    fields: Array[ArrowFieldWriter]) {
    +
    +  def schema: StructType = StructType(fields.map { f =>
    +    StructField(f.name, f.dataType, f.nullable)
    +  })
    +
    +  private var count: Int = 0
    +
    +  def write(row: InternalRow): Unit = {
    +    var i = 0
    +    while (i < fields.size) {
    +      fields(i).write(row, i)
    +      i += 1
    +    }
    +    count += 1
    +  }
    +
    +  def finish(): Unit = {
    +    root.setRowCount(count)
    +    fields.foreach(_.finish())
    +  }
    +
    +  def reset(): Unit = {
    +    root.setRowCount(0)
    +    count = 0
    +    fields.foreach(_.reset())
    +  }
    +}
    +
    +private[sql] abstract class ArrowFieldWriter {
    +
    +  def valueVector: ValueVector
    +  def valueMutator: ValueVector.Mutator
    +
    +  def name: String = valueVector.getField().getName()
    +  def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField())
    +  def nullable: Boolean = valueVector.getField().isNullable()
    +
    +  def setNull(): Unit
    +  def setValue(input: SpecializedGetters, ordinal: Int): Unit
    +  def skip(): Unit
    --- End diff --
    
    What's the purpose of the `skip()` method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    > For ArrowConverters, I thought we can skip the intermediate ArrowRecordBatch creation in ArrowConverters.toPayloadIterator(). What do you think about that?
    
    Ok, I see.  By using `ArrowWriter` directly on the root, then the `ArrowFileWriter` can use that same root in creating the Byte array.  So no need to create an intermediate `ArrowRecordBatch`.  That sounds good to me!
    
    > For ColumnWriter, at first I'd like to support complex types like ArrayType and StructType, so I refactored it based on your ColumnWriter implementation. And then I renamed and moved the package 
    
    That's fine, but do they need to be in `o.a.s.sql.execution.vectorized`?  If so, then what's the point of having a `o.a.s.sql.execution.arrow` package if `ArrowUtils` and `ArrowWriter` are not even there?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79668/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/18655


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79957 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79957/testReport)** for PR 18655 at commit [`19f3973`](https://github.com/apache/spark/commit/19f3973c4acf1b05ae51c338481d975cebf66a98).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129481638
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala ---
    @@ -0,0 +1,383 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.arrow
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.arrow.vector._
    +import org.apache.arrow.vector.complex._
    +import org.apache.arrow.vector.util.DecimalUtility
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
    +import org.apache.spark.sql.types._
    +
    +object ArrowWriter {
    +
    +  def create(schema: StructType): ArrowWriter = {
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator)
    +    create(root)
    +  }
    +
    +  def create(root: VectorSchemaRoot): ArrowWriter = {
    +    val children = root.getFieldVectors().asScala.map { vector =>
    +      vector.allocateNew()
    +      createFieldWriter(vector)
    +    }
    +    new ArrowWriter(root, children.toArray)
    +  }
    +
    +  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
    +    val field = vector.getField()
    +    ArrowUtils.fromArrowField(field) match {
    --- End diff --
    
    Would it be better to do as below?
    
    ```scala
        (ArrowUtils.fromArrowField(field), vector) match {
          case (_: BooleanType, vector: NullableBitVector) => new BooleanWriter(vector)
          case (_: ByteType, vector: NullableTinyIntVector) => new ByteWriter(vector)
      ...
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79741/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79957 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79957/testReport)** for PR 18655 at commit [`19f3973`](https://github.com/apache/spark/commit/19f3973c4acf1b05ae51c338481d975cebf66a98).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by wesm <gi...@git.apache.org>.
Github user wesm commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    On DecimalType, I want to point out that we haven't hardened the memory format and integration tests between Java<->C++ within Arrow. It would be great if you could help with this -- we ran into a problem in C++ where we needed an extra sign bit with 16-byte high precision decimals. So we have 3 memory representations:
    
    - 4 byte decimals (low precision)
    - 8 byte
    - 16 byte plus sign bitmap
    
    What is Spark's internal memory representation? cc @cpcloud


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org

[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129486171
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala ---
    @@ -0,0 +1,383 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.arrow
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.arrow.vector._
    +import org.apache.arrow.vector.complex._
    +import org.apache.arrow.vector.util.DecimalUtility
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
    +import org.apache.spark.sql.types._
    +
    +object ArrowWriter {
    +
    +  def create(schema: StructType): ArrowWriter = {
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator)
    +    create(root)
    +  }
    +
    +  def create(root: VectorSchemaRoot): ArrowWriter = {
    +    val children = root.getFieldVectors().asScala.map { vector =>
    +      vector.allocateNew()
    +      createFieldWriter(vector)
    +    }
    +    new ArrowWriter(root, children.toArray)
    +  }
    +
    +  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
    +    val field = vector.getField()
    +    ArrowUtils.fromArrowField(field) match {
    +      case BooleanType =>
    +        new BooleanWriter(vector.asInstanceOf[NullableBitVector])
    +      case ByteType =>
    +        new ByteWriter(vector.asInstanceOf[NullableTinyIntVector])
    +      case ShortType =>
    +        new ShortWriter(vector.asInstanceOf[NullableSmallIntVector])
    +      case IntegerType =>
    +        new IntegerWriter(vector.asInstanceOf[NullableIntVector])
    +      case LongType =>
    +        new LongWriter(vector.asInstanceOf[NullableBigIntVector])
    +      case FloatType =>
    +        new FloatWriter(vector.asInstanceOf[NullableFloat4Vector])
    +      case DoubleType =>
    +        new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector])
    +      case StringType =>
    +        new StringWriter(vector.asInstanceOf[NullableVarCharVector])
    +      case BinaryType =>
    +        new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector])
    +      case ArrayType(_, _) =>
    +        val v = vector.asInstanceOf[ListVector]
    +        val elementVector = createFieldWriter(v.getDataVector())
    +        new ArrayWriter(v, elementVector)
    +      case StructType(_) =>
    +        val v = vector.asInstanceOf[NullableMapVector]
    +        val children = (0 until v.size()).map { ordinal =>
    +          createFieldWriter(v.getChildByOrdinal(ordinal))
    +        }
    +        new StructWriter(v, children.toArray)
    +      case dt =>
    +        throw new UnsupportedOperationException(s"Unsupported data type: ${dt.simpleString}")
    +    }
    +  }
    +}
    +
    +class ArrowWriter(
    +    val root: VectorSchemaRoot,
    +    fields: Array[ArrowFieldWriter]) {
    +
    +  def schema: StructType = StructType(fields.map { f =>
    +    StructField(f.name, f.dataType, f.nullable)
    +  })
    +
    +  private var count: Int = 0
    +
    +  def write(row: InternalRow): Unit = {
    +    var i = 0
    +    while (i < fields.size) {
    +      fields(i).write(row, i)
    +      i += 1
    +    }
    +    count += 1
    +  }
    +
    +  def finish(): Unit = {
    +    root.setRowCount(count)
    +    fields.foreach(_.finish())
    +  }
    +
    +  def reset(): Unit = {
    +    root.setRowCount(0)
    +    count = 0
    +    fields.foreach(_.reset())
    +  }
    +}
    +
    +private[arrow] abstract class ArrowFieldWriter {
    +
    +  def valueVector: ValueVector
    +  def valueMutator: ValueVector.Mutator
    +
    +  def name: String = valueVector.getField().getName()
    +  def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField())
    +  def nullable: Boolean = valueVector.getField().isNullable()
    +
    +  def setNull(): Unit
    +  def setValue(input: SpecializedGetters, ordinal: Int): Unit
    +  def skip(): Unit
    +
    +  protected var count: Int = 0
    +
    +  def write(input: SpecializedGetters, ordinal: Int): Unit = {
    +    if (input.isNullAt(ordinal)) {
    +      setNull()
    +    } else {
    +      setValue(input, ordinal)
    +    }
    +    count += 1
    +  }
    +
    +  def writeSkip(): Unit = {
    +    skip()
    +    count += 1
    --- End diff --
    
    Basically, yes, it's enough except for `StructType`, but should we set null bit to `1` for skipped value?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129488362
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala ---
    @@ -857,6 +857,449 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll {
         collectAndValidate(df, json, "nanData-floating_point.json")
       }
     
    +  test("array type conversion") {
    +    val json =
    +      s"""
    +         |{
    +         |  "schema" : {
    +         |    "fields" : [ {
    +         |      "name" : "a_arr",
    +         |      "nullable" : true,
    +         |      "type" : {
    +         |        "name" : "list"
    +         |      },
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "nullable" : false,
    +         |        "type" : {
    +         |          "name" : "int",
    +         |          "bitWidth" : 32,
    +         |          "isSigned" : true
    +         |        },
    +         |        "children" : [ ],
    +         |        "typeLayout" : {
    +         |          "vectors" : [ {
    +         |            "type" : "VALIDITY",
    +         |            "typeBitWidth" : 1
    +         |          }, {
    +         |            "type" : "DATA",
    +         |            "typeBitWidth" : 32
    +         |          } ]
    +         |        }
    +         |      } ],
    +         |      "typeLayout" : {
    +         |        "vectors" : [ {
    +         |          "type" : "VALIDITY",
    +         |          "typeBitWidth" : 1
    +         |        }, {
    +         |          "type" : "OFFSET",
    +         |          "typeBitWidth" : 32
    +         |        } ]
    +         |      }
    +         |    }, {
    +         |      "name" : "b_arr",
    +         |      "nullable" : true,
    +         |      "type" : {
    +         |        "name" : "list"
    +         |      },
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "nullable" : false,
    +         |        "type" : {
    +         |          "name" : "int",
    +         |          "bitWidth" : 32,
    +         |          "isSigned" : true
    +         |        },
    +         |        "children" : [ ],
    +         |        "typeLayout" : {
    +         |          "vectors" : [ {
    +         |            "type" : "VALIDITY",
    +         |            "typeBitWidth" : 1
    +         |          }, {
    +         |            "type" : "DATA",
    +         |            "typeBitWidth" : 32
    +         |          } ]
    +         |        }
    +         |      } ],
    +         |      "typeLayout" : {
    +         |        "vectors" : [ {
    +         |          "type" : "VALIDITY",
    +         |          "typeBitWidth" : 1
    +         |        }, {
    +         |          "type" : "OFFSET",
    +         |          "typeBitWidth" : 32
    +         |        } ]
    +         |      }
    +         |    }, {
    +         |      "name" : "c_arr",
    +         |      "nullable" : true,
    +         |      "type" : {
    +         |        "name" : "list"
    +         |      },
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "nullable" : true,
    +         |        "type" : {
    +         |          "name" : "int",
    +         |          "bitWidth" : 32,
    +         |          "isSigned" : true
    +         |        },
    +         |        "children" : [ ],
    +         |        "typeLayout" : {
    +         |          "vectors" : [ {
    +         |            "type" : "VALIDITY",
    +         |            "typeBitWidth" : 1
    +         |          }, {
    +         |            "type" : "DATA",
    +         |            "typeBitWidth" : 32
    +         |          } ]
    +         |        }
    +         |      } ],
    +         |      "typeLayout" : {
    +         |        "vectors" : [ {
    +         |          "type" : "VALIDITY",
    +         |          "typeBitWidth" : 1
    +         |        }, {
    +         |          "type" : "OFFSET",
    +         |          "typeBitWidth" : 32
    +         |        } ]
    +         |      }
    +         |    }, {
    +         |      "name" : "d_arr",
    +         |      "nullable" : true,
    +         |      "type" : {
    +         |        "name" : "list"
    +         |      },
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "nullable" : true,
    +         |        "type" : {
    +         |          "name" : "list"
    +         |        },
    +         |        "children" : [ {
    +         |          "name" : "element",
    +         |          "nullable" : false,
    +         |          "type" : {
    +         |            "name" : "int",
    +         |            "bitWidth" : 32,
    +         |            "isSigned" : true
    +         |          },
    +         |          "children" : [ ],
    +         |          "typeLayout" : {
    +         |            "vectors" : [ {
    +         |              "type" : "VALIDITY",
    +         |              "typeBitWidth" : 1
    +         |            }, {
    +         |              "type" : "DATA",
    +         |              "typeBitWidth" : 32
    +         |            } ]
    +         |          }
    +         |        } ],
    +         |        "typeLayout" : {
    +         |          "vectors" : [ {
    +         |            "type" : "VALIDITY",
    +         |            "typeBitWidth" : 1
    +         |          }, {
    +         |            "type" : "OFFSET",
    +         |            "typeBitWidth" : 32
    +         |          } ]
    +         |        }
    +         |      } ],
    +         |      "typeLayout" : {
    +         |        "vectors" : [ {
    +         |          "type" : "VALIDITY",
    +         |          "typeBitWidth" : 1
    +         |        }, {
    +         |          "type" : "OFFSET",
    +         |          "typeBitWidth" : 32
    +         |        } ]
    +         |      }
    +         |    } ]
    +         |  },
    +         |  "batches" : [ {
    +         |    "count" : 4,
    +         |    "columns" : [ {
    +         |      "name" : "a_arr",
    +         |      "count" : 4,
    +         |      "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |      "OFFSET" : [ 0, 2, 4, 4, 5 ],
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "count" : 5,
    +         |        "VALIDITY" : [ 1, 1, 1, 1, 1 ],
    +         |        "DATA" : [ 1, 2, 3, 4, 5 ]
    +         |      } ]
    +         |    }, {
    +         |      "name" : "b_arr",
    +         |      "count" : 4,
    +         |      "VALIDITY" : [ 1, 0, 1, 0 ],
    +         |      "OFFSET" : [ 0, 2, 2, 2, 2 ],
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "count" : 2,
    +         |        "VALIDITY" : [ 1, 1 ],
    +         |        "DATA" : [ 1, 2 ]
    +         |      } ]
    +         |    }, {
    +         |      "name" : "c_arr",
    +         |      "count" : 4,
    +         |      "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |      "OFFSET" : [ 0, 2, 4, 4, 5 ],
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "count" : 5,
    +         |        "VALIDITY" : [ 1, 1, 1, 0, 1 ],
    +         |        "DATA" : [ 1, 2, 3, 0, 5 ]
    +         |      } ]
    +         |    }, {
    +         |      "name" : "d_arr",
    +         |      "count" : 4,
    +         |      "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |      "OFFSET" : [ 0, 1, 3, 3, 4 ],
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "count" : 4,
    +         |        "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |        "OFFSET" : [ 0, 2, 3, 3, 4 ],
    +         |        "children" : [ {
    +         |          "name" : "element",
    +         |          "count" : 4,
    +         |          "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |          "DATA" : [ 1, 2, 3, 5 ]
    +         |        } ]
    +         |      } ]
    +         |    } ]
    +         |  } ]
    +         |}
    +       """.stripMargin
    +
    +    val a_arr = Seq(Seq(1, 2), Seq(3, 4), Seq(), Seq(5))
    +    val b_arr = Seq(Some(Seq(1, 2)), None, Some(Seq()), None)
    +    val c_arr = Seq(Seq(Some(1), Some(2)), Seq(Some(3), None), Seq(), Seq(Some(5)))
    +    val d_arr = Seq(Seq(Seq(1, 2)), Seq(Seq(3), Seq()), Seq(), Seq(Seq(5)))
    --- End diff --
    
    Thanks, I'll modify it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by wesm <gi...@git.apache.org>.
Github user wesm commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    There are a bunch of open JIRAs about decimals in Arrow: https://issues.apache.org/jira/issues/?filter=12334829&jql=project%20%3D%20ARROW%20AND%20status%20in%20(%22In%20Review%22%2C%20Open%2C%20%22In%20Progress%22%2C%20Reopened)%20AND%20text%20~%20%22decimal%22 between these JIRAs and the mailing list if would be good to come up with a game plan for integration tests between Java and C++ (and thus Python) so we can enable Spark to send Python decimals


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    cc @cloud-fan @BryanCutler 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79960 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79960/testReport)** for PR 18655 at commit [`19f3973`](https://github.com/apache/spark/commit/19f3973c4acf1b05ae51c338481d975cebf66a98).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    yes let leave decimal support for folllow-ups


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    LGTM, merging to master!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129477483
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    -
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    var closed = false
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    +    context.addTaskCompletionListener { _ =>
    +      if (!closed) {
    --- End diff --
    
    The `allocator` can be closed twice, but the `root` throws an exception after `allocator` is closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79786 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79786/testReport)** for PR 18655 at commit [`7084b38`](https://github.com/apache/spark/commit/7084b388d87c8347b79898827658d7827bf5649d).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79737 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79737/testReport)** for PR 18655 at commit [`b5988f9`](https://github.com/apache/spark/commit/b5988f9a223de407b7709f239fca672bb02b60aa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128313325
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala ---
    @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll {
         collectAndValidate(df, json, "floating_point-double_precision.json")
       }
     
    +  ignore("decimal conversion") {
    --- End diff --
    
    That might be true, I haven't looked into it yet.  I can work on adding support on the Arrow side, so I'll try to check on that and see where it stands in the upcoming 0.5 release.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128061312
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se
       def asPythonSerializable: Array[Byte] = payload
     }
     
    -private[sql] object ArrowPayload {
    -
    -  /**
    -   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
    -   */
    -  def apply(
    -      batch: ArrowRecordBatch,
    -      schema: StructType,
    -      allocator: BufferAllocator): ArrowPayload = {
    -    new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator))
    -  }
    -}
    -
     private[sql] object ArrowConverters {
     
       /**
    -   * Map a Spark DataType to ArrowType.
    -   */
    -  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
    -    dataType match {
    -      case BooleanType => ArrowType.Bool.INSTANCE
    -      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
    -      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
    -      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
    -      case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
    -      case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
    -      case ByteType => new ArrowType.Int(8, true)
    -      case StringType => ArrowType.Utf8.INSTANCE
    -      case BinaryType => ArrowType.Binary.INSTANCE
    -      case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType")
    -    }
    -  }
    -
    -  /**
    -   * Convert a Spark Dataset schema to Arrow schema.
    -   */
    -  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
    -    val arrowFields = schema.fields.map { f =>
    -      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava)
    -    }
    -    new Schema(arrowFields.toList.asJava)
    -  }
    -
    -  /**
        * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload
        * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
        */
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    -
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    -      }
    -      recordsInBatch += 1
    +    context.addTaskCompletionListener { _ =>
    +      root.close()
    +      allocator.close()
    --- End diff --
    
    It seems a little odd to me to tie an iterator to a TaskContext, why not just close resources as soon as the row iterator is consumed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129480466
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.arrow
    +
    +import org.apache.spark.SparkFunSuite
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.util.ArrayData
    +import org.apache.spark.sql.execution.vectorized.ArrowColumnVector
    +import org.apache.spark.sql.types._
    +import org.apache.spark.unsafe.types.UTF8String
    +
    +class ArrowWriterSuite extends SparkFunSuite {
    +
    +  test("simple") {
    +    def check(dt: DataType, data: Seq[Any], get: (ArrowColumnVector, Int) => Any): Unit = {
    +      val schema = new StructType().add("value", dt, nullable = true)
    +      val writer = ArrowWriter.create(schema)
    +      assert(writer.schema === schema)
    +
    +      data.foreach { datum =>
    +        writer.write(InternalRow(datum))
    +      }
    +      writer.finish()
    +
    +      val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0))
    +      data.zipWithIndex.foreach {
    +        case (null, rowId) => assert(reader.isNullAt(rowId))
    +        case (datum, rowId) => assert(get(reader, rowId) === datum)
    --- End diff --
    
    we can do something like
    ```
    dt match {
     case BooleanType => reader.getBoolean(rowid)
     case IntegerType => ...
     ...
    }
    ```
    Then the caller side doesn't need to pass in a `get`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79960/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Thank you for your comments.
    I agree that we should split this into smaller PRs. I'll push another commit to remove `ArrowColumnVector` from this as soon as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129480291
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala ---
    @@ -0,0 +1,383 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.arrow
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.arrow.vector._
    +import org.apache.arrow.vector.complex._
    +import org.apache.arrow.vector.util.DecimalUtility
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
    +import org.apache.spark.sql.types._
    +
    +object ArrowWriter {
    +
    +  def create(schema: StructType): ArrowWriter = {
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator)
    +    create(root)
    +  }
    +
    +  def create(root: VectorSchemaRoot): ArrowWriter = {
    +    val children = root.getFieldVectors().asScala.map { vector =>
    +      vector.allocateNew()
    +      createFieldWriter(vector)
    +    }
    +    new ArrowWriter(root, children.toArray)
    +  }
    +
    +  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
    +    val field = vector.getField()
    +    ArrowUtils.fromArrowField(field) match {
    +      case BooleanType =>
    +        new BooleanWriter(vector.asInstanceOf[NullableBitVector])
    +      case ByteType =>
    +        new ByteWriter(vector.asInstanceOf[NullableTinyIntVector])
    +      case ShortType =>
    +        new ShortWriter(vector.asInstanceOf[NullableSmallIntVector])
    +      case IntegerType =>
    +        new IntegerWriter(vector.asInstanceOf[NullableIntVector])
    +      case LongType =>
    +        new LongWriter(vector.asInstanceOf[NullableBigIntVector])
    +      case FloatType =>
    +        new FloatWriter(vector.asInstanceOf[NullableFloat4Vector])
    +      case DoubleType =>
    +        new DoubleWriter(vector.asInstanceOf[NullableFloat8Vector])
    +      case StringType =>
    +        new StringWriter(vector.asInstanceOf[NullableVarCharVector])
    +      case BinaryType =>
    +        new BinaryWriter(vector.asInstanceOf[NullableVarBinaryVector])
    +      case ArrayType(_, _) =>
    +        val v = vector.asInstanceOf[ListVector]
    +        val elementVector = createFieldWriter(v.getDataVector())
    +        new ArrayWriter(v, elementVector)
    +      case StructType(_) =>
    +        val v = vector.asInstanceOf[NullableMapVector]
    +        val children = (0 until v.size()).map { ordinal =>
    +          createFieldWriter(v.getChildByOrdinal(ordinal))
    +        }
    +        new StructWriter(v, children.toArray)
    +      case dt =>
    +        throw new UnsupportedOperationException(s"Unsupported data type: ${dt.simpleString}")
    +    }
    +  }
    +}
    +
    +class ArrowWriter(
    +    val root: VectorSchemaRoot,
    +    fields: Array[ArrowFieldWriter]) {
    +
    +  def schema: StructType = StructType(fields.map { f =>
    +    StructField(f.name, f.dataType, f.nullable)
    +  })
    +
    +  private var count: Int = 0
    +
    +  def write(row: InternalRow): Unit = {
    +    var i = 0
    +    while (i < fields.size) {
    +      fields(i).write(row, i)
    +      i += 1
    +    }
    +    count += 1
    +  }
    +
    +  def finish(): Unit = {
    +    root.setRowCount(count)
    +    fields.foreach(_.finish())
    +  }
    +
    +  def reset(): Unit = {
    +    root.setRowCount(0)
    +    count = 0
    +    fields.foreach(_.reset())
    +  }
    +}
    +
    +private[arrow] abstract class ArrowFieldWriter {
    +
    +  def valueVector: ValueVector
    +  def valueMutator: ValueVector.Mutator
    +
    +  def name: String = valueVector.getField().getName()
    +  def dataType: DataType = ArrowUtils.fromArrowField(valueVector.getField())
    +  def nullable: Boolean = valueVector.getField().isNullable()
    +
    +  def setNull(): Unit
    +  def setValue(input: SpecializedGetters, ordinal: Int): Unit
    +  def skip(): Unit
    +
    +  protected var count: Int = 0
    +
    +  def write(input: SpecializedGetters, ordinal: Int): Unit = {
    +    if (input.isNullAt(ordinal)) {
    +      setNull()
    +    } else {
    +      setValue(input, ordinal)
    +    }
    +    count += 1
    +  }
    +
    +  def writeSkip(): Unit = {
    +    skip()
    +    count += 1
    --- End diff --
    
    For skipping purpose, is it enough to just do `count += 1`? e.g. `vector.set(1, v1); vector.set(3, v3)`, value 2 is skipped.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r127704333
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java ---
    @@ -0,0 +1,502 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.vectorized;
    +
    +import org.apache.arrow.vector.*;
    +import org.apache.arrow.vector.complex.*;
    +import org.apache.arrow.vector.holders.NullableVarCharHolder;
    +
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.types.*;
    +import org.apache.spark.unsafe.types.UTF8String;
    +
    +/**
    + * A column backed by Apache Arrow.
    + */
    +public final class ArrowColumnVector extends ColumnVector {
    +
    +  private ValueVector vector;
    +  private ValueVector.Accessor nulls;
    +
    +  private NullableBitVector boolData;
    +  private NullableTinyIntVector byteData;
    +  private NullableSmallIntVector shortData;
    +  private NullableIntVector intData;
    +  private NullableBigIntVector longData;
    +
    +  private NullableFloat4Vector floatData;
    +  private NullableFloat8Vector doubleData;
    +  private NullableDecimalVector decimalData;
    +
    +  private NullableVarCharVector stringData;
    +
    +  private NullableVarBinaryVector binaryData;
    +
    +  private UInt4Vector listOffsetData;
    +
    +  public ArrowColumnVector(ValueVector vector) {
    +    super(vector.getValueCapacity(), DataTypes.NullType, MemoryMode.OFF_HEAP);
    +    initialize(vector);
    +  }
    +
    +  @Override
    +  public long nullsNativeAddress() {
    +    throw new RuntimeException("Cannot get native address for arrow column");
    +  }
    +
    +  @Override
    +  public long valuesNativeAddress() {
    +    throw new RuntimeException("Cannot get native address for arrow column");
    +  }
    +
    +  @Override
    +  public void close() {
    +    if (childColumns != null) {
    +      for (int i = 0; i < childColumns.length; i++) {
    +        childColumns[i].close();
    +      }
    +    }
    +    vector.close();
    +  }
    +
    +  //
    +  // APIs dealing with nulls
    +  //
    +
    +  @Override
    +  public void putNotNull(int rowId) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void putNull(int rowId) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void putNulls(int rowId, int count) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void putNotNulls(int rowId, int count) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean isNullAt(int rowId) {
    +    return nulls.isNull(rowId);
    +  }
    +
    +  //
    +  // APIs dealing with Booleans
    +  //
    +
    +  @Override
    +  public void putBoolean(int rowId, boolean value) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void putBooleans(int rowId, int count, boolean value) {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean getBoolean(int rowId) {
    +    return boolData.getAccessor().get(rowId) == 1;
    +  }
    +
    +  @Override
    +  public boolean[] getBooleans(int rowId, int count) {
    +    assert(dictionary == null);
    +    boolean[] array = new boolean[count];
    +    for (int i = 0; i < count; ++i) {
    +      array[i] = (boolData.getAccessor().get(rowId + i) == 1);
    --- End diff --
    
    Can we move `boolData.getAccessor()` out of the loop if it is a loop invariant?
    Ditto for other types (e.g. `getBytes()`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79955/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    @BryanCutler @wesm @cpcloud Thank you for reviewing this.
    If the remaining issue here is only `DecimalType` support, I'd like to separate it from this pr and merge this first to avoid duplicating efforts around writers.
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79996 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79996/testReport)** for PR 18655 at commit [`b85dc23`](https://github.com/apache/spark/commit/b85dc231d05f5e1a1a3d8b0bcbc778b85d83c533).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79698 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79698/testReport)** for PR 18655 at commit [`8ffedda`](https://github.com/apache/spark/commit/8ffedda9f05d379d700aef95dca049a751374f87).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79698/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79786/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79960 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79960/testReport)** for PR 18655 at commit [`19f3973`](https://github.com/apache/spark/commit/19f3973c4acf1b05ae51c338481d975cebf66a98).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129486273
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowWriterSuite.scala ---
    @@ -0,0 +1,247 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.arrow
    +
    +import org.apache.spark.SparkFunSuite
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.util.ArrayData
    +import org.apache.spark.sql.execution.vectorized.ArrowColumnVector
    +import org.apache.spark.sql.types._
    +import org.apache.spark.unsafe.types.UTF8String
    +
    +class ArrowWriterSuite extends SparkFunSuite {
    +
    +  test("simple") {
    +    def check(dt: DataType, data: Seq[Any], get: (ArrowColumnVector, Int) => Any): Unit = {
    +      val schema = new StructType().add("value", dt, nullable = true)
    +      val writer = ArrowWriter.create(schema)
    +      assert(writer.schema === schema)
    +
    +      data.foreach { datum =>
    +        writer.write(InternalRow(datum))
    +      }
    +      writer.finish()
    +
    +      val reader = new ArrowColumnVector(writer.root.getFieldVectors().get(0))
    +      data.zipWithIndex.foreach {
    +        case (null, rowId) => assert(reader.isNullAt(rowId))
    +        case (datum, rowId) => assert(get(reader, rowId) === datum)
    --- End diff --
    
    Thanks, I'll update it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79996 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79996/testReport)** for PR 18655 at commit [`b85dc23`](https://github.com/apache/spark/commit/b85dc231d05f5e1a1a3d8b0bcbc778b85d83c533).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128061219
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se
       def asPythonSerializable: Array[Byte] = payload
     }
     
    -private[sql] object ArrowPayload {
    -
    -  /**
    -   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
    -   */
    -  def apply(
    -      batch: ArrowRecordBatch,
    -      schema: StructType,
    -      allocator: BufferAllocator): ArrowPayload = {
    -    new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator))
    -  }
    -}
    -
     private[sql] object ArrowConverters {
     
       /**
    -   * Map a Spark DataType to ArrowType.
    -   */
    -  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
    -    dataType match {
    -      case BooleanType => ArrowType.Bool.INSTANCE
    -      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
    -      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
    -      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
    -      case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
    -      case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
    -      case ByteType => new ArrowType.Int(8, true)
    -      case StringType => ArrowType.Utf8.INSTANCE
    -      case BinaryType => ArrowType.Binary.INSTANCE
    -      case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType")
    -    }
    -  }
    -
    -  /**
    -   * Convert a Spark Dataset schema to Arrow schema.
    -   */
    -  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
    -    val arrowFields = schema.fields.map { f =>
    -      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava)
    -    }
    -    new Schema(arrowFields.toList.asJava)
    -  }
    -
    -  /**
        * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload
        * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
        */
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    -
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    -      }
    -      recordsInBatch += 1
    +    context.addTaskCompletionListener { _ =>
    +      root.close()
    +      allocator.close()
    --- End diff --
    
    It seems a little odd to me to tie an iterator to a TaskContext, why not just close resources as soon as the row iterator is consumed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Thanks for this @ueshin.  I agree with @kiszk that it would be easier to review if you can split this into smaller PRs, maybe keep the additional type support separate?  I'm all for refactoring this too, but could you elaborate with some details on why you are refactoring `ColumnWriter` and `ArrowConverters`?  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79668 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79668/testReport)** for PR 18655 at commit [`58cd465`](https://github.com/apache/spark/commit/58cd46506b02800269380f7c8acb5f9825664cad).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79737/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79696 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79696/testReport)** for PR 18655 at commit [`8ffedda`](https://github.com/apache/spark/commit/8ffedda9f05d379d700aef95dca049a751374f87).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129487716
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowWriter.scala ---
    @@ -0,0 +1,383 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.arrow
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.arrow.vector._
    +import org.apache.arrow.vector.complex._
    +import org.apache.arrow.vector.util.DecimalUtility
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
    +import org.apache.spark.sql.types._
    +
    +object ArrowWriter {
    +
    +  def create(schema: StructType): ArrowWriter = {
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val root = VectorSchemaRoot.create(arrowSchema, ArrowUtils.rootAllocator)
    +    create(root)
    +  }
    +
    +  def create(root: VectorSchemaRoot): ArrowWriter = {
    +    val children = root.getFieldVectors().asScala.map { vector =>
    +      vector.allocateNew()
    +      createFieldWriter(vector)
    +    }
    +    new ArrowWriter(root, children.toArray)
    +  }
    +
    +  private def createFieldWriter(vector: ValueVector): ArrowFieldWriter = {
    +    val field = vector.getField()
    +    ArrowUtils.fromArrowField(field) match {
    --- End diff --
    
    Thanks, I'll modify it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128062715
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -55,145 +51,55 @@ private[sql] class ArrowPayload private[arrow] (payload: Array[Byte]) extends Se
       def asPythonSerializable: Array[Byte] = payload
     }
     
    -private[sql] object ArrowPayload {
    -
    -  /**
    -   * Create an ArrowPayload from an ArrowRecordBatch and Spark schema.
    -   */
    -  def apply(
    -      batch: ArrowRecordBatch,
    -      schema: StructType,
    -      allocator: BufferAllocator): ArrowPayload = {
    -    new ArrowPayload(ArrowConverters.batchToByteArray(batch, schema, allocator))
    -  }
    -}
    -
     private[sql] object ArrowConverters {
     
       /**
    -   * Map a Spark DataType to ArrowType.
    -   */
    -  private[arrow] def sparkTypeToArrowType(dataType: DataType): ArrowType = {
    -    dataType match {
    -      case BooleanType => ArrowType.Bool.INSTANCE
    -      case ShortType => new ArrowType.Int(8 * ShortType.defaultSize, true)
    -      case IntegerType => new ArrowType.Int(8 * IntegerType.defaultSize, true)
    -      case LongType => new ArrowType.Int(8 * LongType.defaultSize, true)
    -      case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
    -      case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
    -      case ByteType => new ArrowType.Int(8, true)
    -      case StringType => ArrowType.Utf8.INSTANCE
    -      case BinaryType => ArrowType.Binary.INSTANCE
    -      case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dataType")
    -    }
    -  }
    -
    -  /**
    -   * Convert a Spark Dataset schema to Arrow schema.
    -   */
    -  private[arrow] def schemaToArrowSchema(schema: StructType): Schema = {
    -    val arrowFields = schema.fields.map { f =>
    -      new Field(f.name, f.nullable, sparkTypeToArrowType(f.dataType), List.empty[Field].asJava)
    -    }
    -    new Schema(arrowFields.toList.asJava)
    -  }
    -
    -  /**
        * Maps Iterator from InternalRow to ArrowPayload. Limit ArrowRecordBatch size in ArrowPayload
        * by setting maxRecordsPerBatch or use 0 to fully consume rowIter.
        */
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    -
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    -
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    -      }
    -      recordsInBatch += 1
    +    context.addTaskCompletionListener { _ =>
    +      root.close()
    +      allocator.close()
         }
     
    -    val (fieldNodes, bufferArrays) = columnWriters.map(_.finish()).unzip
    -    val buffers = bufferArrays.flatten
    -
    -    val rowLength = if (fieldNodes.nonEmpty) fieldNodes.head.getLength else 0
    -    val recordBatch = new ArrowRecordBatch(rowLength,
    -      fieldNodes.toList.asJava, buffers.toList.asJava)
    +    new Iterator[ArrowPayload] {
     
    -    buffers.foreach(_.release())
    -    recordBatch
    -  }
    +      override def hasNext: Boolean = rowIter.hasNext
     
    -  /**
    -   * Convert an ArrowRecordBatch to a byte array and close batch to release resources. Once closed,
    -   * the batch can no longer be used.
    -   */
    -  private[arrow] def batchToByteArray(
    -      batch: ArrowRecordBatch,
    -      schema: StructType,
    -      allocator: BufferAllocator): Array[Byte] = {
    -    val arrowSchema = ArrowConverters.schemaToArrowSchema(schema)
    -    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    -    val out = new ByteArrayOutputStream()
    -    val writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
    +      override def next(): ArrowPayload = {
    +        val out = new ByteArrayOutputStream()
    +        val writer = new ArrowFileWriter(root, null, Channels.newChannel(out))
    +
    +        Utils.tryWithSafeFinally {
    +          var rowId = 0
    --- End diff --
    
    nit: maybe `rowCount` instead of `rowId` because it is a count of how many rows in the batch so far and not a unique id?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128064603
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala ---
    @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll {
         collectAndValidate(df, json, "floating_point-double_precision.json")
       }
     
    +  ignore("decimal conversion") {
    --- End diff --
    
    Why ignore this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129477058
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---
    @@ -77,95 +59,55 @@ private[sql] object ArrowConverters {
       private[sql] def toPayloadIterator(
           rowIter: Iterator[InternalRow],
           schema: StructType,
    -      maxRecordsPerBatch: Int): Iterator[ArrowPayload] = {
    -    new Iterator[ArrowPayload] {
    -      private val _allocator = new RootAllocator(Long.MaxValue)
    -      private var _nextPayload = if (rowIter.nonEmpty) convert() else null
    +      maxRecordsPerBatch: Int,
    +      context: TaskContext): Iterator[ArrowPayload] = {
     
    -      override def hasNext: Boolean = _nextPayload != null
    -
    -      override def next(): ArrowPayload = {
    -        val obj = _nextPayload
    -        if (hasNext) {
    -          if (rowIter.hasNext) {
    -            _nextPayload = convert()
    -          } else {
    -            _allocator.close()
    -            _nextPayload = null
    -          }
    -        }
    -        obj
    -      }
    -
    -      private def convert(): ArrowPayload = {
    -        val batch = internalRowIterToArrowBatch(rowIter, schema, _allocator, maxRecordsPerBatch)
    -        ArrowPayload(batch, schema, _allocator)
    -      }
    -    }
    -  }
    +    val arrowSchema = ArrowUtils.toArrowSchema(schema)
    +    val allocator =
    +      ArrowUtils.rootAllocator.newChildAllocator("toPayloadIterator", 0, Long.MaxValue)
     
    -  /**
    -   * Iterate over InternalRows and write to an ArrowRecordBatch, stopping when rowIter is consumed
    -   * or the number of records in the batch equals maxRecordsInBatch.  If maxRecordsPerBatch is 0,
    -   * then rowIter will be fully consumed.
    -   */
    -  private def internalRowIterToArrowBatch(
    -      rowIter: Iterator[InternalRow],
    -      schema: StructType,
    -      allocator: BufferAllocator,
    -      maxRecordsPerBatch: Int = 0): ArrowRecordBatch = {
    +    val root = VectorSchemaRoot.create(arrowSchema, allocator)
    +    val arrowWriter = ArrowWriter.create(root)
     
    -    val columnWriters = schema.fields.zipWithIndex.map { case (field, ordinal) =>
    -      ColumnWriter(field.dataType, ordinal, allocator).init()
    -    }
    +    var closed = false
     
    -    val writerLength = columnWriters.length
    -    var recordsInBatch = 0
    -    while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || recordsInBatch < maxRecordsPerBatch)) {
    -      val row = rowIter.next()
    -      var i = 0
    -      while (i < writerLength) {
    -        columnWriters(i).write(row)
    -        i += 1
    +    context.addTaskCompletionListener { _ =>
    +      if (!closed) {
    --- End diff --
    
    do we really need this? I think it's ok to close twice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    yea let's put `ArrowColumnVector` and its tests in a new PR and merge that first.
    
    `ArrowWriter` will also be used for pandas UDF, see https://issues.apache.org/jira/browse/SPARK-21190 for more details, so it makes sense to move it to a separated file.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r129482346
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala ---
    @@ -857,6 +857,449 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll {
         collectAndValidate(df, json, "nanData-floating_point.json")
       }
     
    +  test("array type conversion") {
    +    val json =
    +      s"""
    +         |{
    +         |  "schema" : {
    +         |    "fields" : [ {
    +         |      "name" : "a_arr",
    +         |      "nullable" : true,
    +         |      "type" : {
    +         |        "name" : "list"
    +         |      },
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "nullable" : false,
    +         |        "type" : {
    +         |          "name" : "int",
    +         |          "bitWidth" : 32,
    +         |          "isSigned" : true
    +         |        },
    +         |        "children" : [ ],
    +         |        "typeLayout" : {
    +         |          "vectors" : [ {
    +         |            "type" : "VALIDITY",
    +         |            "typeBitWidth" : 1
    +         |          }, {
    +         |            "type" : "DATA",
    +         |            "typeBitWidth" : 32
    +         |          } ]
    +         |        }
    +         |      } ],
    +         |      "typeLayout" : {
    +         |        "vectors" : [ {
    +         |          "type" : "VALIDITY",
    +         |          "typeBitWidth" : 1
    +         |        }, {
    +         |          "type" : "OFFSET",
    +         |          "typeBitWidth" : 32
    +         |        } ]
    +         |      }
    +         |    }, {
    +         |      "name" : "b_arr",
    +         |      "nullable" : true,
    +         |      "type" : {
    +         |        "name" : "list"
    +         |      },
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "nullable" : false,
    +         |        "type" : {
    +         |          "name" : "int",
    +         |          "bitWidth" : 32,
    +         |          "isSigned" : true
    +         |        },
    +         |        "children" : [ ],
    +         |        "typeLayout" : {
    +         |          "vectors" : [ {
    +         |            "type" : "VALIDITY",
    +         |            "typeBitWidth" : 1
    +         |          }, {
    +         |            "type" : "DATA",
    +         |            "typeBitWidth" : 32
    +         |          } ]
    +         |        }
    +         |      } ],
    +         |      "typeLayout" : {
    +         |        "vectors" : [ {
    +         |          "type" : "VALIDITY",
    +         |          "typeBitWidth" : 1
    +         |        }, {
    +         |          "type" : "OFFSET",
    +         |          "typeBitWidth" : 32
    +         |        } ]
    +         |      }
    +         |    }, {
    +         |      "name" : "c_arr",
    +         |      "nullable" : true,
    +         |      "type" : {
    +         |        "name" : "list"
    +         |      },
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "nullable" : true,
    +         |        "type" : {
    +         |          "name" : "int",
    +         |          "bitWidth" : 32,
    +         |          "isSigned" : true
    +         |        },
    +         |        "children" : [ ],
    +         |        "typeLayout" : {
    +         |          "vectors" : [ {
    +         |            "type" : "VALIDITY",
    +         |            "typeBitWidth" : 1
    +         |          }, {
    +         |            "type" : "DATA",
    +         |            "typeBitWidth" : 32
    +         |          } ]
    +         |        }
    +         |      } ],
    +         |      "typeLayout" : {
    +         |        "vectors" : [ {
    +         |          "type" : "VALIDITY",
    +         |          "typeBitWidth" : 1
    +         |        }, {
    +         |          "type" : "OFFSET",
    +         |          "typeBitWidth" : 32
    +         |        } ]
    +         |      }
    +         |    }, {
    +         |      "name" : "d_arr",
    +         |      "nullable" : true,
    +         |      "type" : {
    +         |        "name" : "list"
    +         |      },
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "nullable" : true,
    +         |        "type" : {
    +         |          "name" : "list"
    +         |        },
    +         |        "children" : [ {
    +         |          "name" : "element",
    +         |          "nullable" : false,
    +         |          "type" : {
    +         |            "name" : "int",
    +         |            "bitWidth" : 32,
    +         |            "isSigned" : true
    +         |          },
    +         |          "children" : [ ],
    +         |          "typeLayout" : {
    +         |            "vectors" : [ {
    +         |              "type" : "VALIDITY",
    +         |              "typeBitWidth" : 1
    +         |            }, {
    +         |              "type" : "DATA",
    +         |              "typeBitWidth" : 32
    +         |            } ]
    +         |          }
    +         |        } ],
    +         |        "typeLayout" : {
    +         |          "vectors" : [ {
    +         |            "type" : "VALIDITY",
    +         |            "typeBitWidth" : 1
    +         |          }, {
    +         |            "type" : "OFFSET",
    +         |            "typeBitWidth" : 32
    +         |          } ]
    +         |        }
    +         |      } ],
    +         |      "typeLayout" : {
    +         |        "vectors" : [ {
    +         |          "type" : "VALIDITY",
    +         |          "typeBitWidth" : 1
    +         |        }, {
    +         |          "type" : "OFFSET",
    +         |          "typeBitWidth" : 32
    +         |        } ]
    +         |      }
    +         |    } ]
    +         |  },
    +         |  "batches" : [ {
    +         |    "count" : 4,
    +         |    "columns" : [ {
    +         |      "name" : "a_arr",
    +         |      "count" : 4,
    +         |      "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |      "OFFSET" : [ 0, 2, 4, 4, 5 ],
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "count" : 5,
    +         |        "VALIDITY" : [ 1, 1, 1, 1, 1 ],
    +         |        "DATA" : [ 1, 2, 3, 4, 5 ]
    +         |      } ]
    +         |    }, {
    +         |      "name" : "b_arr",
    +         |      "count" : 4,
    +         |      "VALIDITY" : [ 1, 0, 1, 0 ],
    +         |      "OFFSET" : [ 0, 2, 2, 2, 2 ],
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "count" : 2,
    +         |        "VALIDITY" : [ 1, 1 ],
    +         |        "DATA" : [ 1, 2 ]
    +         |      } ]
    +         |    }, {
    +         |      "name" : "c_arr",
    +         |      "count" : 4,
    +         |      "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |      "OFFSET" : [ 0, 2, 4, 4, 5 ],
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "count" : 5,
    +         |        "VALIDITY" : [ 1, 1, 1, 0, 1 ],
    +         |        "DATA" : [ 1, 2, 3, 0, 5 ]
    +         |      } ]
    +         |    }, {
    +         |      "name" : "d_arr",
    +         |      "count" : 4,
    +         |      "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |      "OFFSET" : [ 0, 1, 3, 3, 4 ],
    +         |      "children" : [ {
    +         |        "name" : "element",
    +         |        "count" : 4,
    +         |        "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |        "OFFSET" : [ 0, 2, 3, 3, 4 ],
    +         |        "children" : [ {
    +         |          "name" : "element",
    +         |          "count" : 4,
    +         |          "VALIDITY" : [ 1, 1, 1, 1 ],
    +         |          "DATA" : [ 1, 2, 3, 5 ]
    +         |        } ]
    +         |      } ]
    +         |    } ]
    +         |  } ]
    +         |}
    +       """.stripMargin
    +
    +    val a_arr = Seq(Seq(1, 2), Seq(3, 4), Seq(), Seq(5))
    +    val b_arr = Seq(Some(Seq(1, 2)), None, Some(Seq()), None)
    +    val c_arr = Seq(Seq(Some(1), Some(2)), Seq(Some(3), None), Seq(), Seq(Some(5)))
    +    val d_arr = Seq(Seq(Seq(1, 2)), Seq(Seq(3), Seq()), Seq(), Seq(Seq(5)))
    --- End diff --
    
    How about camelCase naming?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by cpcloud <gi...@git.apache.org>.
Github user cpcloud commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Based on this code: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/ColumnType.scala#L429-L547
    
    It looks like there are two types:
    
    1. 8 byte compact decimal that fits in a Java `long`
    2. Up to length 16 `Array[byte]` representation (based on the `BINARY`) type.
    
    I think Arrow's Decimal representation in Java is almost identical to this.
    
    Looking at the `BigInteger` Java implementation (which is what `BigDecimal` sits on top of) the sign is carried around in the first `byte` of the array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org

[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79668 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79668/testReport)** for PR 18655 at commit [`58cd465`](https://github.com/apache/spark/commit/58cd46506b02800269380f7c8acb5f9825664cad).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConvert...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18655#discussion_r128146875
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala ---
    @@ -391,6 +392,85 @@ class ArrowConvertersSuite extends SharedSQLContext with BeforeAndAfterAll {
         collectAndValidate(df, json, "floating_point-double_precision.json")
       }
     
    +  ignore("decimal conversion") {
    --- End diff --
    
    Oh, I'm sorry, I should have mentioned it.
    It seems like `JsonFileReader` doesn't support DecimalType, so I ignored it for now.
    But now I'm thinking that If Arrow 0.4.0 has a bug for the decimal type as you said, should I remove decimal type support from this pr and add support in the following prs?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by kiszk <gi...@git.apache.org>.
Github user kiszk commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Good feature, but can we split this PR into smaller PRs for ease of review since it looks large?
    For example, since `ArrowColumnVector` is not used in refactored code, this part can be moved to another PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    @BryanCutler I'd like to share the motivation of refactoring `ArrowConverters` and `ColumnWriter`.
    
    For `ColumnWriter`, at first I'd like to support complex types like `ArrayType` and `StructType`, so I refactored it based on your `ColumnWriter` implementation. And then I renamed and moved the package so that we can also use it for pandas UDF as @cloud-fan mentioned. As you might see before, I'll introduce `ArrowColumnVector` as a reader for Arrow vectors as well.
    
    For `ArrowConverters`, I thought we can skip the intermediate `ArrowRecordBatch` creation in `ArrowConverters.toPayloadIterator()`. What do you think about that?
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    **[Test build #79995 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/79995/testReport)** for PR 18655 at commit [`0bac10d`](https://github.com/apache/spark/commit/0bac10d95637c1afa632210b5feca079a61a35d2).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #18655: [SPARK-21440][SQL][PYSPARK] Refactor ArrowConverters and...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/18655
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/79995/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org