You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/17 21:17:58 UTC

[GitHub] [iceberg] huaxingao opened a new pull request, #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

huaxingao opened a new pull request, #5079:
URL: https://github.com/apache/iceberg/pull/5079

   Currently `VectorizedParquetDefinitionLevelReader` only works with `HeapByteBuffer`. When using off heap byte buffer, it throws Exception
   ```
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 1 times, most recent failure: Lost task 0.0 in stage 5.0 (TID 5) (192.168.1.126 executor driver): java.lang.UnsupportedOperationException
   	at java.base/java.nio.ByteBuffer.array(ByteBuffer.java:1049)
   	at org.apache.iceberg.arrow.vectorized.parquet.VectorizedParquetDefinitionLevelReader$VarWidthReader.nextVal(VectorizedParquetDefinitionLevelReader.java:418)
   	at org.apache.iceberg.arrow.vectorized.parquet.VectorizedParquetDefinitionLevelReader$BaseReader.nextBatch(VectorizedParquetDefinitionLevelReader.java:247)
   	at org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator$VarWidthTypePageReader.nextVal(VectorizedPageIterator.java:353)
   	at org.apache.iceberg.arrow.vectorized.parquet.VectorizedPageIterator$BagePageReader.nextBatch(VectorizedPageIterator.java:161)
   	at org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator$VarWidthTypeBatchReader.nextBatchOf(VectorizedColumnIterator.java:191)
   	at org.apache.iceberg.arrow.vectorized.parquet.VectorizedColumnIterator$BatchReader.nextBatch(VectorizedColumnIterator.java:74)
   	at org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.read(VectorizedArrowReader.java:151)
   	at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.readDataToColumnVectors(ColumnarBatchReader.java:101)
   	at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.loadDataToColumnBatch(ColumnarBatchReader.java:86)
   	at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader$ColumnBatchLoader.<init>(ColumnarBatchReader.java:81)
   	at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:69)
   	at org.apache.iceberg.spark.data.vectorized.ColumnarBatchReader.read(ColumnarBatchReader.java:44)
   	at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:134)
   	at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:102)
   	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93)
   	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130)
   	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
   	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
   	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
   	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
   	at scala.collection.Iterator.foreach(Iterator.scala:943)
   	at scala.collection.Iterator.foreach$(Iterator.scala:943)
   	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.foreach(WholeStageCodegenExec.scala:757)
   	at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
   	at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
   	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2257)
   	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
   	at org.apache.spark.scheduler.Task.run(Task.scala:131)
   	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
   	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1467)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on a diff in pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #5079:
URL: https://github.com/apache/iceberg/pull/5079#discussion_r902058715


##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java:
##########
@@ -415,7 +415,7 @@ protected void nextVal(
       int startOffset = ((BaseVariableWidthVector) vector).getStartOffset(idx);
       // It is possible that the data buffer was reallocated. So it is important to
       // not cache the data buffer reference but instead use vector.getDataBuffer().
-      vector.getDataBuffer().setBytes(startOffset, buffer.array(), buffer.position() + buffer.arrayOffset(),
+      vector.getDataBuffer().setBytes(startOffset, buffer, buffer.position(),

Review Comment:
   Fixed. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5079:
URL: https://github.com/apache/iceberg/pull/5079#discussion_r901942514


##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java:
##########
@@ -415,8 +415,13 @@ protected void nextVal(
       int startOffset = ((BaseVariableWidthVector) vector).getStartOffset(idx);
       // It is possible that the data buffer was reallocated. So it is important to
       // not cache the data buffer reference but instead use vector.getDataBuffer().
-      vector.getDataBuffer().setBytes(startOffset, buffer.array(), buffer.position() + buffer.arrayOffset(),
-          buffer.limit() - buffer.position());
+      if (buffer.isDirect()) {
+        vector.getDataBuffer().setBytes(startOffset, buffer, buffer.position(),
+            buffer.limit() - buffer.position());

Review Comment:
   If the buffer can be passed directly, why not do this in both direct and on-heap cases?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
rdblue merged PR #5079:
URL: https://github.com/apache/iceberg/pull/5079


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #5079:
URL: https://github.com/apache/iceberg/pull/5079#issuecomment-1163839966

   Thank you very much! @rdblue 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
huaxingao commented on PR #5079:
URL: https://github.com/apache/iceberg/pull/5079#issuecomment-1159288566

   ping @samarthjain could you please take a look when you have a moment? Thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] huaxingao commented on a diff in pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
huaxingao commented on code in PR #5079:
URL: https://github.com/apache/iceberg/pull/5079#discussion_r901987433


##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java:
##########
@@ -415,8 +415,13 @@ protected void nextVal(
       int startOffset = ((BaseVariableWidthVector) vector).getStartOffset(idx);
       // It is possible that the data buffer was reallocated. So it is important to
       // not cache the data buffer reference but instead use vector.getDataBuffer().
-      vector.getDataBuffer().setBytes(startOffset, buffer.array(), buffer.position() + buffer.arrayOffset(),
-          buffer.limit() - buffer.position());
+      if (buffer.isDirect()) {
+        vector.getDataBuffer().setBytes(startOffset, buffer, buffer.position(),
+            buffer.limit() - buffer.position());

Review Comment:
   Thanks for taking a look! I changed the fix to one-line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5079:
URL: https://github.com/apache/iceberg/pull/5079#discussion_r902032003


##########
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java:
##########
@@ -415,7 +415,7 @@ protected void nextVal(
       int startOffset = ((BaseVariableWidthVector) vector).getStartOffset(idx);
       // It is possible that the data buffer was reallocated. So it is important to
       // not cache the data buffer reference but instead use vector.getDataBuffer().
-      vector.getDataBuffer().setBytes(startOffset, buffer.array(), buffer.position() + buffer.arrayOffset(),
+      vector.getDataBuffer().setBytes(startOffset, buffer, buffer.position(),

Review Comment:
   I think it's even easier. Arrow has a `setBytes(long, ByteBuffer)` method that writes `bytes.remaining() = buffer.limit() - buffer.position()` bytes starting at `buffer.position()`. This should just be `vector.getDataBuffer().setBytes(startOffset, buffer)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5079:
URL: https://github.com/apache/iceberg/pull/5079#issuecomment-1160911106

   @huaxingao, looks like `FixedWidthBinaryReader` needs to be updated as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #5079: Make VectorizedParquetDefinitionLevelReader work with off heap byte buffer

Posted by GitBox <gi...@apache.org>.
rdblue commented on PR #5079:
URL: https://github.com/apache/iceberg/pull/5079#issuecomment-1163837835

   Thanks, @huaxingao!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org