You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by Mayur Srivastava <Ma...@twosigma.com> on 2021/02/11 15:22:38 UTC

Reading data from Iceberg table into Apache Arrow in Java

Hi,


We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage.


We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set.


I have noticed that Iceberg already has an Arrow module https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. I  have also looked into https://github.com/apache/iceberg/issues/9 and https://github.com/apache/iceberg/milestone/2. But, I'm not sure about the current status of the vectorized reader support. I'm also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library.


I have a few questions regarding the Vectorized reader/Arrow support:

1.      Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java?

2.      Is there an example of reading data in Arrow format from an Iceberg table?

3.      Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library.


Let me know if you have any questions for me.


Thanks,

Mayur


Re: Reading data from Iceberg table into Apache Arrow in Java

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Yes, I think we should move forward with reads that don't need to merge
deletes and have a check that there are no deletes to merge. That will work
in many cases and we can add read support for v2 later.

On Wed, Mar 3, 2021 at 3:42 AM Mayur Srivastava <
Mayur.Srivastava@twosigma.com> wrote:

> >> Should we proceed with this pr and later add support for vectorized
> reads in a separate pr?
>
> I meant support deletes in the vectorized reader.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Mayur Srivastava <Ma...@twosigma.com>
> *Sent:* Wednesday, March 3, 2021 6:41 AM
> *To:* dev@iceberg.apache.org
> *Cc:* Ryan Blue <rb...@netflix.com>
> *Subject:* RE: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Thanks for finding out Peter.
>
>
>
> Should we proceed with this pr and later add support for vectorized reads
> in a separate pr?
>
> There are also some other limitations in the current pr (listed in the pr)
> which could be addressed in subsequent prs.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Peter Vary <pv...@cloudera.com.INVALID>
> *Sent:* Tuesday, March 2, 2021 10:38 AM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Cc:* Ryan Blue <rb...@netflix.com>
> *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> After some more digging, I think there is no solution yet for vectorized
> reads with deletes.
>
>
>
> *this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles &&
> (allOrcFileScanTasks ||*
>
> *     (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));*
>
>
>
>
>
> Mayur Srivastava <Ma...@twosigma.com> ezt írta (időpont: 2021.
> márc. 2., Ke 15:48):
>
> Hi Peter,
>
>
>
> Good point.
>
>
>
> Most of the ArrowReader code is inspired from the Spark’s vectorized
> reader (src:
> https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68
> ).
>
>
>
>
>
> We’ll probably need some help from someone who understands the Spark
> vectorized read path.
>
>
>
> But, I’ll read the code to understand the deletes.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Peter Vary <pv...@cloudera.com.INVALID>
> *Sent:* Tuesday, March 2, 2021 8:51 AM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Cc:* rblue@netflix.com
> *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Hi Mayur,
>
>
>
> Playing around with the idea of implementing vectorized reads for Hive so
> your message come just in time :)
>
>
>
> Took a quick look at the code but I do not really understand how
> vectorized reads handle deletes.
>
>
>
> In non-vectorized code-path I have found this which filters the rows
> one-by-one:
>
>
> https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277
>
>
>
>
>
> *          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask,
> tableSchema, readSchema);
>   Schema requiredSchema = deletes.requiredSchema();
>   return deletes.filter(openTask(currentTask, requiredSchema));*
>
>
>
> In your code I have found that the delete files encryption keys are
> collected, but not sure how they are used:
>
>
> https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59
>
>
>
> *    task.files().stream()*
>
> *
> .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()),
> fileScanTask.deletes().stream()))*
>
> *        .forEach(file -> keyMetadata.put(file.path().toString(),
> file.keyMetadata()));*
>
>
>
> Could you please help me with some quick pointers?
>
>
>
> Thanks,
>
> Peter
>
>
>
> On Mar 1, 2021, at 16:17, Mayur Srivastava <Ma...@twosigma.com>
> wrote:
>
>
>
> Hi Ryan,
>
>
>
> I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the
> vectorized arrow reader:
>
>
>
> This is my first Iceberg pull request - I'm not fully aware of the
> contributing conventions of this repo, so let me know if any changes are
> needed in the pr.
>
> I've refactored some code from the Spark vectorized reader and added an
> ArrowReader which is a vectorized reader in Iceberg core.
>
>
>
> About the ArrowReader:
>
> 1.      I’ve put the ArrowReader in the iceberg-data module because it
> needed to access the Iceberg table scan. Let me know if the reader needs to
> be moved.
>
> 2.      I had to make a dependency addition of ‘iceberg-arrow’ for the
> iceberg-data module. Specially for the ArrowReaderTest, I had to add the
> following. Let me know if there is a better way for doing this.
>
> compileOnly("org.apache.arrow:arrow-vector") {
>
> exclude group: 'io.netty', module: 'netty-buffer'
>
> exclude group: 'com.google.code.findbugs', module: 'jsr305'
>
> }
>
> 3.      Most of the code in ArrowReader is taken from the spark
> vectorized reader. I think there is a potential to share
> ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do
> it yet.
>
> 4.      ArrowReader returns an iterator of VectorSchemaRoot and the
> behavior is explained in the Javadoc.
>
> 5.      Some small changes were needed in IcebergGenerics to expose the
> table scan object and VectorizedArrowReader to allocate different timestamp
> Arrow vectors based on with/without timezone.
>
> 6.      All prepush gradle tests pass except one which is still running
> (and it seems very slow - TestFlinkIcebergSink).
>
> 7.      I've not performed any performance tests with the implementation
> yet. I'm planning to do so this week.
>
>
>
> Following are some limitations/questions for this implementation:
>
> 1.      The arrow vector type is coupled with the physical data type in
> the parquet file: When column data contains a constant value, the column is
> dictionary encoded and the returned Arrow type is int32 irrespective of the
> Iceberg data type. I think that the Arrow vector type should be consistent
> with the logical Iceberg data type (and not change due to the physical data
> type). There is a test ArrowReaderTest.testReadAllWithConstantRecords()
> that is currently ignored.
>
> 2.      Type promotion does not work: In the ArrowReaderTest, the data
> for column ‘int_promotion’ was written as int, and then type was promoted
> from int to long, but the Arrow reader still returns IntVector. I think
> that the Arrow vector type should be consistent with the promoted logical
> Iceberg data type.
>
> 3.      Data type limitations:
>
> a.      Types not tested: UUID, FixedType, DecimalType. In the
> ArrowReaderTest, the parquet write was failing for these data types (due to
> a null pointer exception in ParquetMetadataConverter.addRowGroup:
> columnMetadata.getStatistics() was null). Are there unit tests with these
> types that write to parquet?
>
> b.      Types not supported: TimeType, ListType, MapType, StructType.
> What is the path to add Arrow support for these data types?
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Mayur Srivastava <Ma...@twosigma.com>
> *Sent:* Friday, February 12, 2021 7:41 PM
> *To:* dev@iceberg.apache.org; rblue@netflix.com
> *Subject:* RE: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Thank you Ryan.
>
>
>
> I’ll dig into the file scan plan and Spark codebase to learn about the
> internals of Iceberg vectorized read path. Then, I’ll try to implement the
> vectorized reader using core components only. I’ll be happy to work with
> you to contribute it back to the upstream. I’ll get back to you if I’ve any
> question or need any more pointers.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Ryan Blue <rb...@netflix.com.INVALID>
> *Sent:* Friday, February 12, 2021 2:26 PM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Hi Mayur,
>
>
>
> We built the Arrow support with Spark as the first use case, so the best
> examples of how to use it are in Spark.
>
>
>
> The generic reader does two things: it plans a scan and sets up an
> iterator of file readers to produce generic records. What you want to do is
> the same thing, but set up the file readers to produce Arrow batches. You
> can do that by changing the `Parquet.read` call and passing the callback to
> create an Arrow batch reader rather than generic row reader. I don't think
> there is a public example of this, but maybe someone else knows about one.
> This isn't available in Iceberg yet, but if you want to add it we'd be
> happy to help you get it in.
>
>
>
> The Spark read path has a good example, but it also wraps the Arrow
> batches so Spark can read them. Also, keep in mind that the Arrow
> integration only supports flat schemas right now, not fully nested schemas.
> So you'd need to still fall back to the row-based path. (Side note, if you
> have code to convert generics to Arrow, that's really useful to post
> somewhere.)
>
>
>
> I hope that helps. It would be great to work with you to improve this in a
> couple of PRs!
>
>
>
> rb
>
>
>
> On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
> Hi,
>
>
>
> We have an existing time series data access service based on Arrow/Flight
> which uses Apache Arrow format data to perform writes and reads (using time
> range queries) from a bespoke table-backend based on a S3 compatible
> storage.
>
>
>
> We are trying to replace our bespoke table-backend with Iceberg tables.
> For integrating with Iceberg, we are using Iceberg core+data+parquet
> modules directly to write and read data. I would like to note that our
> service cannot use the Spark route to write or read the data. In our
> current Iceberg reader integration code, we are using
> IcebergGenerics.read(table).select(...).where(...).build() to iterate
> through the data row-by-row. Instead of this (potentially slower) read path
> which needs conversion between rows and Arrow VectorSchemaRoot, we want to
> use a vectorized read path which directly returns an Arrow VectorSchemaRoot
> as a callback or Arrow record batches as the result set.
>
>
>
> I have noticed that Iceberg already has an Arrow module
> https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow.
> I  have also looked into https://github.com/apache/iceberg/issues/9 and
> https://github.com/apache/iceberg/milestone/2. But, I’m not sure about
> the current status of the vectorized reader support. I’m also not sure how
> this Arrow module is being used to perform a vectorized read to execute a
> query on an Iceberg table in the core/data/parquet library.
>
>
>
> I have a few questions regarding the Vectorized reader/Arrow support:
>
> 1.      Is it possible to run a vectorized read on an Iceberg table to
> return data in Arrow format using a non-Spark reader in Java?
>
> 2.      Is there an example of reading data in Arrow format from an
> Iceberg table?
>
> 3.      Is the Spark read path completely vectorized? I ask this question
> to find out if we can borrow from the vectorized Spark reader or we can
> move code from vectorized Spark reader to the Iceberg core library.
>
>
>
> Let me know if you have any questions for me.
>
>
>
> Thanks,
>
> Mayur
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>
>

-- 
Ryan Blue
Software Engineer
Netflix

RE: Reading data from Iceberg table into Apache Arrow in Java

Posted by Mayur Srivastava <Ma...@twosigma.com>.
>> Should we proceed with this pr and later add support for vectorized reads in a separate pr?
I meant support deletes in the vectorized reader.

Thanks,
Mayur

From: Mayur Srivastava <Ma...@twosigma.com>
Sent: Wednesday, March 3, 2021 6:41 AM
To: dev@iceberg.apache.org
Cc: Ryan Blue <rb...@netflix.com>
Subject: RE: Reading data from Iceberg table into Apache Arrow in Java

Thanks for finding out Peter.

Should we proceed with this pr and later add support for vectorized reads in a separate pr?
There are also some other limitations in the current pr (listed in the pr) which could be addressed in subsequent prs.

Thanks,
Mayur

From: Peter Vary <pv...@cloudera.com.INVALID>>
Sent: Tuesday, March 2, 2021 10:38 AM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Cc: Ryan Blue <rb...@netflix.com>>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

After some more digging, I think there is no solution yet for vectorized reads with deletes.

this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
     (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));


Mayur Srivastava <Ma...@twosigma.com>> ezt írta (időpont: 2021. márc. 2., Ke 15:48):
Hi Peter,

Good point.

Most of the ArrowReader code is inspired from the Spark’s vectorized reader (src: https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68).


We’ll probably need some help from someone who understands the Spark vectorized read path.

But, I’ll read the code to understand the deletes.

Thanks,
Mayur

From: Peter Vary <pv...@cloudera.com.INVALID>>
Sent: Tuesday, March 2, 2021 8:51 AM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Cc: rblue@netflix.com<ma...@netflix.com>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

Playing around with the idea of implementing vectorized reads for Hive so your message come just in time :)

Took a quick look at the code but I do not really understand how vectorized reads handle deletes.

In non-vectorized code-path I have found this which filters the rows one-by-one:
https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277

          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema);
          Schema requiredSchema = deletes.requiredSchema();
          return deletes.filter(openTask(currentTask, requiredSchema));

In your code I have found that the delete files encryption keys are collected, but not sure how they are used:
https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59

    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));

Could you please help me with some quick pointers?

Thanks,
Peter

On Mar 1, 2021, at 16:17, Mayur Srivastava <Ma...@twosigma.com>> wrote:

Hi Ryan,

I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the vectorized arrow reader:

This is my first Iceberg pull request - I'm not fully aware of the contributing conventions of this repo, so let me know if any changes are needed in the pr.
I've refactored some code from the Spark vectorized reader and added an ArrowReader which is a vectorized reader in Iceberg core.

About the ArrowReader:
1.      I’ve put the ArrowReader in the iceberg-data module because it needed to access the Iceberg table scan. Let me know if the reader needs to be moved.
2.      I had to make a dependency addition of ‘iceberg-arrow’ for the iceberg-data module. Specially for the ArrowReaderTest, I had to add the following. Let me know if there is a better way for doing this.
compileOnly("org.apache.arrow:arrow-vector") {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
3.      Most of the code in ArrowReader is taken from the spark vectorized reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do it yet.
4.      ArrowReader returns an iterator of VectorSchemaRoot and the behavior is explained in the Javadoc.
5.      Some small changes were needed in IcebergGenerics to expose the table scan object and VectorizedArrowReader to allocate different timestamp Arrow vectors based on with/without timezone.
6.      All prepush gradle tests pass except one which is still running (and it seems very slow - TestFlinkIcebergSink).
7.      I've not performed any performance tests with the implementation yet. I'm planning to do so this week.

Following are some limitations/questions for this implementation:
1.      The arrow vector type is coupled with the physical data type in the parquet file: When column data contains a constant value, the column is dictionary encoded and the returned Arrow type is int32 irrespective of the Iceberg data type. I think that the Arrow vector type should be consistent with the logical Iceberg data type (and not change due to the physical data type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is currently ignored.
2.      Type promotion does not work: In the ArrowReaderTest, the data for column ‘int_promotion’ was written as int, and then type was promoted from int to long, but the Arrow reader still returns IntVector. I think that the Arrow vector type should be consistent with the promoted logical Iceberg data type.
3.      Data type limitations:
a.      Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest, the parquet write was failing for these data types (due to a null pointer exception in ParquetMetadataConverter.addRowGroup: columnMetadata.getStatistics() was null). Are there unit tests with these types that write to parquet?
b.      Types not supported: TimeType, ListType, MapType, StructType. What is the path to add Arrow support for these data types?

Thanks,
Mayur

From: Mayur Srivastava <Ma...@twosigma.com>>
Sent: Friday, February 12, 2021 7:41 PM
To: dev@iceberg.apache.org<ma...@iceberg.apache.org>; rblue@netflix.com<ma...@netflix.com>
Subject: RE: Reading data from Iceberg table into Apache Arrow in Java

Thank you Ryan.

I’ll dig into the file scan plan and Spark codebase to learn about the internals of Iceberg vectorized read path. Then, I’ll try to implement the vectorized reader using core components only. I’ll be happy to work with you to contribute it back to the upstream. I’ll get back to you if I’ve any question or need any more pointers.

Thanks,
Mayur

From: Ryan Blue <rb...@netflix.com.INVALID>>
Sent: Friday, February 12, 2021 2:26 PM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

We built the Arrow support with Spark as the first use case, so the best examples of how to use it are in Spark.

The generic reader does two things: it plans a scan and sets up an iterator of file readers to produce generic records. What you want to do is the same thing, but set up the file readers to produce Arrow batches. You can do that by changing the `Parquet.read` call and passing the callback to create an Arrow batch reader rather than generic row reader. I don't think there is a public example of this, but maybe someone else knows about one. This isn't available in Iceberg yet, but if you want to add it we'd be happy to help you get it in.

The Spark read path has a good example, but it also wraps the Arrow batches so Spark can read them. Also, keep in mind that the Arrow integration only supports flat schemas right now, not fully nested schemas. So you'd need to still fall back to the row-based path. (Side note, if you have code to convert generics to Arrow, that's really useful to post somewhere.)

I hope that helps. It would be great to work with you to improve this in a couple of PRs!

rb

On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <Ma...@twosigma.com>> wrote:
Hi,

We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage.

We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set.

I have noticed that Iceberg already has an Arrow modulehttps://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. I  have also looked into https://github.com/apache/iceberg/issues/9 and https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the current status of the vectorized reader support. I’m also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library.

I have a few questions regarding the Vectorized reader/Arrow support:
1.      Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java?
2.      Is there an example of reading data in Arrow format from an Iceberg table?
3.      Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library.

Let me know if you have any questions for me.

Thanks,
Mayur



--
Ryan Blue
Software Engineer
Netflix


RE: Reading data from Iceberg table into Apache Arrow in Java

Posted by Mayur Srivastava <Ma...@twosigma.com>.
Thanks for finding out Peter.

Should we proceed with this pr and later add support for vectorized reads in a separate pr?
There are also some other limitations in the current pr (listed in the pr) which could be addressed in subsequent prs.

Thanks,
Mayur

From: Peter Vary <pv...@cloudera.com.INVALID>
Sent: Tuesday, March 2, 2021 10:38 AM
To: Iceberg Dev List <de...@iceberg.apache.org>
Cc: Ryan Blue <rb...@netflix.com>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

After some more digging, I think there is no solution yet for vectorized reads with deletes.

this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
     (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));


Mayur Srivastava <Ma...@twosigma.com>> ezt írta (időpont: 2021. márc. 2., Ke 15:48):
Hi Peter,

Good point.

Most of the ArrowReader code is inspired from the Spark’s vectorized reader (src: https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68).


We’ll probably need some help from someone who understands the Spark vectorized read path.

But, I’ll read the code to understand the deletes.

Thanks,
Mayur

From: Peter Vary <pv...@cloudera.com.INVALID>>
Sent: Tuesday, March 2, 2021 8:51 AM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Cc: rblue@netflix.com<ma...@netflix.com>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

Playing around with the idea of implementing vectorized reads for Hive so your message come just in time :)

Took a quick look at the code but I do not really understand how vectorized reads handle deletes.

In non-vectorized code-path I have found this which filters the rows one-by-one:
https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277

          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema);
          Schema requiredSchema = deletes.requiredSchema();
          return deletes.filter(openTask(currentTask, requiredSchema));

In your code I have found that the delete files encryption keys are collected, but not sure how they are used:
https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59

    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));

Could you please help me with some quick pointers?

Thanks,
Peter

On Mar 1, 2021, at 16:17, Mayur Srivastava <Ma...@twosigma.com>> wrote:

Hi Ryan,

I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the vectorized arrow reader:

This is my first Iceberg pull request - I'm not fully aware of the contributing conventions of this repo, so let me know if any changes are needed in the pr.
I've refactored some code from the Spark vectorized reader and added an ArrowReader which is a vectorized reader in Iceberg core.

About the ArrowReader:
1.      I’ve put the ArrowReader in the iceberg-data module because it needed to access the Iceberg table scan. Let me know if the reader needs to be moved.
2.      I had to make a dependency addition of ‘iceberg-arrow’ for the iceberg-data module. Specially for the ArrowReaderTest, I had to add the following. Let me know if there is a better way for doing this.
compileOnly("org.apache.arrow:arrow-vector") {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
3.      Most of the code in ArrowReader is taken from the spark vectorized reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do it yet.
4.      ArrowReader returns an iterator of VectorSchemaRoot and the behavior is explained in the Javadoc.
5.      Some small changes were needed in IcebergGenerics to expose the table scan object and VectorizedArrowReader to allocate different timestamp Arrow vectors based on with/without timezone.
6.      All prepush gradle tests pass except one which is still running (and it seems very slow - TestFlinkIcebergSink).
7.      I've not performed any performance tests with the implementation yet. I'm planning to do so this week.

Following are some limitations/questions for this implementation:
1.      The arrow vector type is coupled with the physical data type in the parquet file: When column data contains a constant value, the column is dictionary encoded and the returned Arrow type is int32 irrespective of the Iceberg data type. I think that the Arrow vector type should be consistent with the logical Iceberg data type (and not change due to the physical data type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is currently ignored.
2.      Type promotion does not work: In the ArrowReaderTest, the data for column ‘int_promotion’ was written as int, and then type was promoted from int to long, but the Arrow reader still returns IntVector. I think that the Arrow vector type should be consistent with the promoted logical Iceberg data type.
3.      Data type limitations:
a.      Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest, the parquet write was failing for these data types (due to a null pointer exception in ParquetMetadataConverter.addRowGroup: columnMetadata.getStatistics() was null). Are there unit tests with these types that write to parquet?
b.      Types not supported: TimeType, ListType, MapType, StructType. What is the path to add Arrow support for these data types?

Thanks,
Mayur

From: Mayur Srivastava <Ma...@twosigma.com>>
Sent: Friday, February 12, 2021 7:41 PM
To: dev@iceberg.apache.org<ma...@iceberg.apache.org>; rblue@netflix.com<ma...@netflix.com>
Subject: RE: Reading data from Iceberg table into Apache Arrow in Java

Thank you Ryan.

I’ll dig into the file scan plan and Spark codebase to learn about the internals of Iceberg vectorized read path. Then, I’ll try to implement the vectorized reader using core components only. I’ll be happy to work with you to contribute it back to the upstream. I’ll get back to you if I’ve any question or need any more pointers.

Thanks,
Mayur

From: Ryan Blue <rb...@netflix.com.INVALID>>
Sent: Friday, February 12, 2021 2:26 PM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

We built the Arrow support with Spark as the first use case, so the best examples of how to use it are in Spark.

The generic reader does two things: it plans a scan and sets up an iterator of file readers to produce generic records. What you want to do is the same thing, but set up the file readers to produce Arrow batches. You can do that by changing the `Parquet.read` call and passing the callback to create an Arrow batch reader rather than generic row reader. I don't think there is a public example of this, but maybe someone else knows about one. This isn't available in Iceberg yet, but if you want to add it we'd be happy to help you get it in.

The Spark read path has a good example, but it also wraps the Arrow batches so Spark can read them. Also, keep in mind that the Arrow integration only supports flat schemas right now, not fully nested schemas. So you'd need to still fall back to the row-based path. (Side note, if you have code to convert generics to Arrow, that's really useful to post somewhere.)

I hope that helps. It would be great to work with you to improve this in a couple of PRs!

rb

On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <Ma...@twosigma.com>> wrote:
Hi,

We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage.

We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set.

I have noticed that Iceberg already has an Arrow modulehttps://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. I  have also looked into https://github.com/apache/iceberg/issues/9 and https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the current status of the vectorized reader support. I’m also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library.

I have a few questions regarding the Vectorized reader/Arrow support:
1.      Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java?
2.      Is there an example of reading data in Arrow format from an Iceberg table?
3.      Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library.

Let me know if you have any questions for me.

Thanks,
Mayur



--
Ryan Blue
Software Engineer
Netflix


Re: Reading data from Iceberg table into Apache Arrow in Java

Posted by Peter Vary <pv...@cloudera.com.INVALID>.
After some more digging, I think there is no solution yet for vectorized
reads with deletes.

*this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles &&
(allOrcFileScanTasks ||*
*     (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));*


Mayur Srivastava <Ma...@twosigma.com> ezt írta (időpont: 2021.
márc. 2., Ke 15:48):

> Hi Peter,
>
>
>
> Good point.
>
>
>
> Most of the ArrowReader code is inspired from the Spark’s vectorized
> reader (src:
> https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68
> ).
>
>
>
>
>
> We’ll probably need some help from someone who understands the Spark
> vectorized read path.
>
>
>
> But, I’ll read the code to understand the deletes.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Peter Vary <pv...@cloudera.com.INVALID>
> *Sent:* Tuesday, March 2, 2021 8:51 AM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Cc:* rblue@netflix.com
> *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Hi Mayur,
>
>
>
> Playing around with the idea of implementing vectorized reads for Hive so
> your message come just in time :)
>
>
>
> Took a quick look at the code but I do not really understand how
> vectorized reads handle deletes.
>
>
>
> In non-vectorized code-path I have found this which filters the rows
> one-by-one:
>
>
> https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277
>
>
>
>
>
> *          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask,
> tableSchema, readSchema);
>   Schema requiredSchema = deletes.requiredSchema();
>   return deletes.filter(openTask(currentTask, requiredSchema));*
>
>
>
> In your code I have found that the delete files encryption keys are
> collected, but not sure how they are used:
>
>
> https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59
>
>
>
> *    task.files().stream()*
>
> *
> .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()),
> fileScanTask.deletes().stream()))*
>
> *        .forEach(file -> keyMetadata.put(file.path().toString(),
> file.keyMetadata()));*
>
>
>
> Could you please help me with some quick pointers?
>
>
>
> Thanks,
>
> Peter
>
>
>
> On Mar 1, 2021, at 16:17, Mayur Srivastava <Ma...@twosigma.com>
> wrote:
>
>
>
> Hi Ryan,
>
>
>
> I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the
> vectorized arrow reader:
>
>
>
> This is my first Iceberg pull request - I'm not fully aware of the
> contributing conventions of this repo, so let me know if any changes are
> needed in the pr.
>
> I've refactored some code from the Spark vectorized reader and added an
> ArrowReader which is a vectorized reader in Iceberg core.
>
>
>
> About the ArrowReader:
>
> 1.      I’ve put the ArrowReader in the iceberg-data module because it
> needed to access the Iceberg table scan. Let me know if the reader needs to
> be moved.
>
> 2.      I had to make a dependency addition of ‘iceberg-arrow’ for the
> iceberg-data module. Specially for the ArrowReaderTest, I had to add the
> following. Let me know if there is a better way for doing this.
>
> compileOnly("org.apache.arrow:arrow-vector") {
>
> exclude group: 'io.netty', module: 'netty-buffer'
>
> exclude group: 'com.google.code.findbugs', module: 'jsr305'
>
> }
>
> 3.      Most of the code in ArrowReader is taken from the spark
> vectorized reader. I think there is a potential to share
> ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do
> it yet.
>
> 4.      ArrowReader returns an iterator of VectorSchemaRoot and the
> behavior is explained in the Javadoc.
>
> 5.      Some small changes were needed in IcebergGenerics to expose the
> table scan object and VectorizedArrowReader to allocate different timestamp
> Arrow vectors based on with/without timezone.
>
> 6.      All prepush gradle tests pass except one which is still running
> (and it seems very slow - TestFlinkIcebergSink).
>
> 7.      I've not performed any performance tests with the implementation
> yet. I'm planning to do so this week.
>
>
>
> Following are some limitations/questions for this implementation:
>
> 1.      The arrow vector type is coupled with the physical data type in
> the parquet file: When column data contains a constant value, the column is
> dictionary encoded and the returned Arrow type is int32 irrespective of the
> Iceberg data type. I think that the Arrow vector type should be consistent
> with the logical Iceberg data type (and not change due to the physical data
> type). There is a test ArrowReaderTest.testReadAllWithConstantRecords()
> that is currently ignored.
>
> 2.      Type promotion does not work: In the ArrowReaderTest, the data
> for column ‘int_promotion’ was written as int, and then type was promoted
> from int to long, but the Arrow reader still returns IntVector. I think
> that the Arrow vector type should be consistent with the promoted logical
> Iceberg data type.
>
> 3.      Data type limitations:
>
> a.      Types not tested: UUID, FixedType, DecimalType. In the
> ArrowReaderTest, the parquet write was failing for these data types (due to
> a null pointer exception in ParquetMetadataConverter.addRowGroup:
> columnMetadata.getStatistics() was null). Are there unit tests with these
> types that write to parquet?
>
> b.      Types not supported: TimeType, ListType, MapType, StructType.
> What is the path to add Arrow support for these data types?
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Mayur Srivastava <Ma...@twosigma.com>
> *Sent:* Friday, February 12, 2021 7:41 PM
> *To:* dev@iceberg.apache.org; rblue@netflix.com
> *Subject:* RE: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Thank you Ryan.
>
>
>
> I’ll dig into the file scan plan and Spark codebase to learn about the
> internals of Iceberg vectorized read path. Then, I’ll try to implement the
> vectorized reader using core components only. I’ll be happy to work with
> you to contribute it back to the upstream. I’ll get back to you if I’ve any
> question or need any more pointers.
>
>
>
> Thanks,
>
> Mayur
>
>
>
> *From:* Ryan Blue <rb...@netflix.com.INVALID>
> *Sent:* Friday, February 12, 2021 2:26 PM
> *To:* Iceberg Dev List <de...@iceberg.apache.org>
> *Subject:* Re: Reading data from Iceberg table into Apache Arrow in Java
>
>
>
> Hi Mayur,
>
>
>
> We built the Arrow support with Spark as the first use case, so the best
> examples of how to use it are in Spark.
>
>
>
> The generic reader does two things: it plans a scan and sets up an
> iterator of file readers to produce generic records. What you want to do is
> the same thing, but set up the file readers to produce Arrow batches. You
> can do that by changing the `Parquet.read` call and passing the callback to
> create an Arrow batch reader rather than generic row reader. I don't think
> there is a public example of this, but maybe someone else knows about one.
> This isn't available in Iceberg yet, but if you want to add it we'd be
> happy to help you get it in.
>
>
>
> The Spark read path has a good example, but it also wraps the Arrow
> batches so Spark can read them. Also, keep in mind that the Arrow
> integration only supports flat schemas right now, not fully nested schemas.
> So you'd need to still fall back to the row-based path. (Side note, if you
> have code to convert generics to Arrow, that's really useful to post
> somewhere.)
>
>
>
> I hope that helps. It would be great to work with you to improve this in a
> couple of PRs!
>
>
>
> rb
>
>
>
> On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <
> Mayur.Srivastava@twosigma.com> wrote:
>
> Hi,
>
>
>
> We have an existing time series data access service based on Arrow/Flight
> which uses Apache Arrow format data to perform writes and reads (using time
> range queries) from a bespoke table-backend based on a S3 compatible
> storage.
>
>
>
> We are trying to replace our bespoke table-backend with Iceberg tables.
> For integrating with Iceberg, we are using Iceberg core+data+parquet
> modules directly to write and read data. I would like to note that our
> service cannot use the Spark route to write or read the data. In our
> current Iceberg reader integration code, we are using
> IcebergGenerics.read(table).select(...).where(...).build() to iterate
> through the data row-by-row. Instead of this (potentially slower) read path
> which needs conversion between rows and Arrow VectorSchemaRoot, we want to
> use a vectorized read path which directly returns an Arrow VectorSchemaRoot
> as a callback or Arrow record batches as the result set.
>
>
>
> I have noticed that Iceberg already has an Arrow module
> https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow.
> I  have also looked into https://github.com/apache/iceberg/issues/9 and
> https://github.com/apache/iceberg/milestone/2. But, I’m not sure about
> the current status of the vectorized reader support. I’m also not sure how
> this Arrow module is being used to perform a vectorized read to execute a
> query on an Iceberg table in the core/data/parquet library.
>
>
>
> I have a few questions regarding the Vectorized reader/Arrow support:
>
> 1.      Is it possible to run a vectorized read on an Iceberg table to
> return data in Arrow format using a non-Spark reader in Java?
>
> 2.      Is there an example of reading data in Arrow format from an
> Iceberg table?
>
> 3.      Is the Spark read path completely vectorized? I ask this question
> to find out if we can borrow from the vectorized Spark reader or we can
> move code from vectorized Spark reader to the Iceberg core library.
>
>
>
> Let me know if you have any questions for me.
>
>
>
> Thanks,
>
> Mayur
>
>
>
>
>
>
> --
>
> Ryan Blue
>
> Software Engineer
>
> Netflix
>
>
>

RE: Reading data from Iceberg table into Apache Arrow in Java

Posted by Mayur Srivastava <Ma...@twosigma.com>.
Hi Peter,

Good point.

Most of the ArrowReader code is inspired from the Spark’s vectorized reader (src: https://github.com/apache/iceberg/blob/631efec2f9ce1f0526d6613c81ac8fd0ccb95b5e/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java#L68).


We’ll probably need some help from someone who understands the Spark vectorized read path.

But, I’ll read the code to understand the deletes.

Thanks,
Mayur

From: Peter Vary <pv...@cloudera.com.INVALID>
Sent: Tuesday, March 2, 2021 8:51 AM
To: Iceberg Dev List <de...@iceberg.apache.org>
Cc: rblue@netflix.com
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

Playing around with the idea of implementing vectorized reads for Hive so your message come just in time :)

Took a quick look at the code but I do not really understand how vectorized reads handle deletes.

In non-vectorized code-path I have found this which filters the rows one-by-one:
https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277

          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema);
          Schema requiredSchema = deletes.requiredSchema();
          return deletes.filter(openTask(currentTask, requiredSchema));

In your code I have found that the delete files encryption keys are collected, but not sure how they are used:
https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59

    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));

Could you please help me with some quick pointers?

Thanks,
Peter

On Mar 1, 2021, at 16:17, Mayur Srivastava <Ma...@twosigma.com>> wrote:

Hi Ryan,

I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the vectorized arrow reader:

This is my first Iceberg pull request - I'm not fully aware of the contributing conventions of this repo, so let me know if any changes are needed in the pr.
I've refactored some code from the Spark vectorized reader and added an ArrowReader which is a vectorized reader in Iceberg core.

About the ArrowReader:
1.      I’ve put the ArrowReader in the iceberg-data module because it needed to access the Iceberg table scan. Let me know if the reader needs to be moved.
2.      I had to make a dependency addition of ‘iceberg-arrow’ for the iceberg-data module. Specially for the ArrowReaderTest, I had to add the following. Let me know if there is a better way for doing this.
compileOnly("org.apache.arrow:arrow-vector") {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
3.      Most of the code in ArrowReader is taken from the spark vectorized reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do it yet.
4.      ArrowReader returns an iterator of VectorSchemaRoot and the behavior is explained in the Javadoc.
5.      Some small changes were needed in IcebergGenerics to expose the table scan object and VectorizedArrowReader to allocate different timestamp Arrow vectors based on with/without timezone.
6.      All prepush gradle tests pass except one which is still running (and it seems very slow - TestFlinkIcebergSink).
7.      I've not performed any performance tests with the implementation yet. I'm planning to do so this week.

Following are some limitations/questions for this implementation:
1.      The arrow vector type is coupled with the physical data type in the parquet file: When column data contains a constant value, the column is dictionary encoded and the returned Arrow type is int32 irrespective of the Iceberg data type. I think that the Arrow vector type should be consistent with the logical Iceberg data type (and not change due to the physical data type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is currently ignored.
2.      Type promotion does not work: In the ArrowReaderTest, the data for column ‘int_promotion’ was written as int, and then type was promoted from int to long, but the Arrow reader still returns IntVector. I think that the Arrow vector type should be consistent with the promoted logical Iceberg data type.
3.      Data type limitations:
a.      Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest, the parquet write was failing for these data types (due to a null pointer exception in ParquetMetadataConverter.addRowGroup: columnMetadata.getStatistics() was null). Are there unit tests with these types that write to parquet?
b.      Types not supported: TimeType, ListType, MapType, StructType. What is the path to add Arrow support for these data types?

Thanks,
Mayur

From: Mayur Srivastava <Ma...@twosigma.com>>
Sent: Friday, February 12, 2021 7:41 PM
To: dev@iceberg.apache.org<ma...@iceberg.apache.org>; rblue@netflix.com<ma...@netflix.com>
Subject: RE: Reading data from Iceberg table into Apache Arrow in Java

Thank you Ryan.

I’ll dig into the file scan plan and Spark codebase to learn about the internals of Iceberg vectorized read path. Then, I’ll try to implement the vectorized reader using core components only. I’ll be happy to work with you to contribute it back to the upstream. I’ll get back to you if I’ve any question or need any more pointers.

Thanks,
Mayur

From: Ryan Blue <rb...@netflix.com.INVALID>>
Sent: Friday, February 12, 2021 2:26 PM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

We built the Arrow support with Spark as the first use case, so the best examples of how to use it are in Spark.

The generic reader does two things: it plans a scan and sets up an iterator of file readers to produce generic records. What you want to do is the same thing, but set up the file readers to produce Arrow batches. You can do that by changing the `Parquet.read` call and passing the callback to create an Arrow batch reader rather than generic row reader. I don't think there is a public example of this, but maybe someone else knows about one. This isn't available in Iceberg yet, but if you want to add it we'd be happy to help you get it in.

The Spark read path has a good example, but it also wraps the Arrow batches so Spark can read them. Also, keep in mind that the Arrow integration only supports flat schemas right now, not fully nested schemas. So you'd need to still fall back to the row-based path. (Side note, if you have code to convert generics to Arrow, that's really useful to post somewhere.)

I hope that helps. It would be great to work with you to improve this in a couple of PRs!

rb

On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <Ma...@twosigma.com>> wrote:
Hi,

We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage.

We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set.

I have noticed that Iceberg already has an Arrow modulehttps://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. I  have also looked into https://github.com/apache/iceberg/issues/9 and https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the current status of the vectorized reader support. I’m also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library.

I have a few questions regarding the Vectorized reader/Arrow support:
1.      Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java?
2.      Is there an example of reading data in Arrow format from an Iceberg table?
3.      Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library.

Let me know if you have any questions for me.

Thanks,
Mayur



--
Ryan Blue
Software Engineer
Netflix


Re: Reading data from Iceberg table into Apache Arrow in Java

Posted by Peter Vary <pv...@cloudera.com.INVALID>.
Hi Mayur,

Playing around with the idea of implementing vectorized reads for Hive so your message come just in time :)

Took a quick look at the code but I do not really understand how vectorized reads handle deletes.

In non-vectorized code-path I have found this which filters the rows one-by-one:
https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277 <https://github.com/apache/iceberg/blob/master/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L275-L277>

          DeleteFilter deletes = new GenericDeleteFilter(io, currentTask, tableSchema, readSchema);
          Schema requiredSchema = deletes.requiredSchema();
          return deletes.filter(openTask(currentTask, requiredSchema));

In your code I have found that the delete files encryption keys are collected, but not sure how they are used:
https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59 <https://github.com/apache/iceberg/pull/2286/files#diff-32bdafecb94e3bfaf90c9bffbdee4ff8c958e00b76bb46f19b2447bfdd0047cbR59>

    task.files().stream()
        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
        .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));

Could you please help me with some quick pointers?

Thanks,
Peter

> On Mar 1, 2021, at 16:17, Mayur Srivastava <Ma...@twosigma.com> wrote:
> 
> Hi Ryan,
>  
> I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286 <https://github.com/apache/iceberg/pull/2286>) for the vectorized arrow reader:
>  
> This is my first Iceberg pull request - I'm not fully aware of the contributing conventions of this repo, so let me know if any changes are needed in the pr.
> I've refactored some code from the Spark vectorized reader and added an ArrowReader which is a vectorized reader in Iceberg core.
>  
> About the ArrowReader:
> 1.      I’ve put the ArrowReader in the iceberg-data module because it needed to access the Iceberg table scan. Let me know if the reader needs to be moved.
> 2.      I had to make a dependency addition of ‘iceberg-arrow’ for the iceberg-data module. Specially for the ArrowReaderTest, I had to add the following. Let me know if there is a better way for doing this.
> compileOnly("org.apache.arrow:arrow-vector") {
> exclude group: 'io.netty', module: 'netty-buffer'
> exclude group: 'com.google.code.findbugs', module: 'jsr305'
> }
> 3.      Most of the code in ArrowReader is taken from the spark vectorized reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do it yet.
> 4.      ArrowReader returns an iterator of VectorSchemaRoot and the behavior is explained in the Javadoc.
> 5.      Some small changes were needed in IcebergGenerics to expose the table scan object and VectorizedArrowReader to allocate different timestamp Arrow vectors based on with/without timezone.
> 6.      All prepush gradle tests pass except one which is still running (and it seems very slow - TestFlinkIcebergSink).
> 7.      I've not performed any performance tests with the implementation yet. I'm planning to do so this week.
>  
> Following are some limitations/questions for this implementation:
> 1.      The arrow vector type is coupled with the physical data type in the parquet file: When column data contains a constant value, the column is dictionary encoded and the returned Arrow type is int32 irrespective of the Iceberg data type. I think that the Arrow vector type should be consistent with the logical Iceberg data type (and not change due to the physical data type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is currently ignored.
> 2.      Type promotion does not work: In the ArrowReaderTest, the data for column ‘int_promotion’ was written as int, and then type was promoted from int to long, but the Arrow reader still returns IntVector. I think that the Arrow vector type should be consistent with the promoted logical Iceberg data type.
> 3.      Data type limitations:
> a.      Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest, the parquet write was failing for these data types (due to a null pointer exception in ParquetMetadataConverter.addRowGroup: columnMetadata.getStatistics() was null). Are there unit tests with these types that write to parquet?
> b.      Types not supported: TimeType, ListType, MapType, StructType. What is the path to add Arrow support for these data types?
>  
> Thanks,
> Mayur
>  
> From: Mayur Srivastava <Ma...@twosigma.com> 
> Sent: Friday, February 12, 2021 7:41 PM
> To: dev@iceberg.apache.org; rblue@netflix.com
> Subject: RE: Reading data from Iceberg table into Apache Arrow in Java
>  
> Thank you Ryan.
>  
> I’ll dig into the file scan plan and Spark codebase to learn about the internals of Iceberg vectorized read path. Then, I’ll try to implement the vectorized reader using core components only. I’ll be happy to work with you to contribute it back to the upstream. I’ll get back to you if I’ve any question or need any more pointers.
>  
> Thanks,
> Mayur
>  
> From: Ryan Blue <rblue@netflix.com.INVALID <ma...@netflix.com.INVALID>> 
> Sent: Friday, February 12, 2021 2:26 PM
> To: Iceberg Dev List <dev@iceberg.apache.org <ma...@iceberg.apache.org>>
> Subject: Re: Reading data from Iceberg table into Apache Arrow in Java
>  
> Hi Mayur,
>  
> We built the Arrow support with Spark as the first use case, so the best examples of how to use it are in Spark.
>  
> The generic reader does two things: it plans a scan and sets up an iterator of file readers to produce generic records. What you want to do is the same thing, but set up the file readers to produce Arrow batches. You can do that by changing the `Parquet.read` call and passing the callback to create an Arrow batch reader rather than generic row reader. I don't think there is a public example of this, but maybe someone else knows about one. This isn't available in Iceberg yet, but if you want to add it we'd be happy to help you get it in.
>  
> The Spark read path has a good example, but it also wraps the Arrow batches so Spark can read them. Also, keep in mind that the Arrow integration only supports flat schemas right now, not fully nested schemas. So you'd need to still fall back to the row-based path. (Side note, if you have code to convert generics to Arrow, that's really useful to post somewhere.)
>  
> I hope that helps. It would be great to work with you to improve this in a couple of PRs!
>  
> rb
>  
> On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <Mayur.Srivastava@twosigma.com <ma...@twosigma.com>> wrote:
> Hi,
>  
> We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage. 
>  
> We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set. 
>  
> I have noticed that Iceberg already has an Arrow modulehttps://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow <https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow>. I  have also looked into https://github.com/apache/iceberg/issues/9 <https://github.com/apache/iceberg/issues/9> and https://github.com/apache/iceberg/milestone/2 <https://github.com/apache/iceberg/milestone/2>. But, I’m not sure about the current status of the vectorized reader support. I’m also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library.
>  
> I have a few questions regarding the Vectorized reader/Arrow support:
> 1.      Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java?
> 2.      Is there an example of reading data in Arrow format from an Iceberg table?
> 3.      Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library.
>  
> Let me know if you have any questions for me.
>  
> Thanks,
> Mayur
>  
> 
>  
> -- 
> Ryan Blue
> Software Engineer
> Netflix


RE: Reading data from Iceberg table into Apache Arrow in Java

Posted by Mayur Srivastava <Ma...@twosigma.com>.
Hi Ryan,

I’ve submitted a pr (https://github.com/apache/iceberg/pull/2286) for the vectorized arrow reader:

This is my first Iceberg pull request - I'm not fully aware of the contributing conventions of this repo, so let me know if any changes are needed in the pr.
I've refactored some code from the Spark vectorized reader and added an ArrowReader which is a vectorized reader in Iceberg core.

About the ArrowReader:

1.      I’ve put the ArrowReader in the iceberg-data module because it needed to access the Iceberg table scan. Let me know if the reader needs to be moved.

2.      I had to make a dependency addition of ‘iceberg-arrow’ for the iceberg-data module. Specially for the ArrowReaderTest, I had to add the following. Let me know if there is a better way for doing this.

compileOnly("org.apache.arrow:arrow-vector") {

exclude group: 'io.netty', module: 'netty-buffer'

exclude group: 'com.google.code.findbugs', module: 'jsr305'

}

3.      Most of the code in ArrowReader is taken from the spark vectorized reader. I think there is a potential to share ‘BaseArrowFileScanTaskReader’ in both versions, but I did not attempt to do it yet.

4.      ArrowReader returns an iterator of VectorSchemaRoot and the behavior is explained in the Javadoc.

5.      Some small changes were needed in IcebergGenerics to expose the table scan object and VectorizedArrowReader to allocate different timestamp Arrow vectors based on with/without timezone.

6.      All prepush gradle tests pass except one which is still running (and it seems very slow - TestFlinkIcebergSink).

7.      I've not performed any performance tests with the implementation yet. I'm planning to do so this week.


Following are some limitations/questions for this implementation:

1.      The arrow vector type is coupled with the physical data type in the parquet file: When column data contains a constant value, the column is dictionary encoded and the returned Arrow type is int32 irrespective of the Iceberg data type. I think that the Arrow vector type should be consistent with the logical Iceberg data type (and not change due to the physical data type). There is a test ArrowReaderTest.testReadAllWithConstantRecords() that is currently ignored.

2.      Type promotion does not work: In the ArrowReaderTest, the data for column ‘int_promotion’ was written as int, and then type was promoted from int to long, but the Arrow reader still returns IntVector. I think that the Arrow vector type should be consistent with the promoted logical Iceberg data type.

3.      Data type limitations:

a.      Types not tested: UUID, FixedType, DecimalType. In the ArrowReaderTest, the parquet write was failing for these data types (due to a null pointer exception in ParquetMetadataConverter.addRowGroup: columnMetadata.getStatistics() was null). Are there unit tests with these types that write to parquet?

b.      Types not supported: TimeType, ListType, MapType, StructType. What is the path to add Arrow support for these data types?

Thanks,
Mayur

From: Mayur Srivastava <Ma...@twosigma.com>
Sent: Friday, February 12, 2021 7:41 PM
To: dev@iceberg.apache.org; rblue@netflix.com
Subject: RE: Reading data from Iceberg table into Apache Arrow in Java

Thank you Ryan.

I’ll dig into the file scan plan and Spark codebase to learn about the internals of Iceberg vectorized read path. Then, I’ll try to implement the vectorized reader using core components only. I’ll be happy to work with you to contribute it back to the upstream. I’ll get back to you if I’ve any question or need any more pointers.

Thanks,
Mayur

From: Ryan Blue <rb...@netflix.com.INVALID>>
Sent: Friday, February 12, 2021 2:26 PM
To: Iceberg Dev List <de...@iceberg.apache.org>>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

We built the Arrow support with Spark as the first use case, so the best examples of how to use it are in Spark.

The generic reader does two things: it plans a scan and sets up an iterator of file readers to produce generic records. What you want to do is the same thing, but set up the file readers to produce Arrow batches. You can do that by changing the `Parquet.read` call and passing the callback to create an Arrow batch reader rather than generic row reader. I don't think there is a public example of this, but maybe someone else knows about one. This isn't available in Iceberg yet, but if you want to add it we'd be happy to help you get it in.

The Spark read path has a good example, but it also wraps the Arrow batches so Spark can read them. Also, keep in mind that the Arrow integration only supports flat schemas right now, not fully nested schemas. So you'd need to still fall back to the row-based path. (Side note, if you have code to convert generics to Arrow, that's really useful to post somewhere.)

I hope that helps. It would be great to work with you to improve this in a couple of PRs!

rb

On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <Ma...@twosigma.com>> wrote:

Hi,


We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage.


We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set.


I have noticed that Iceberg already has an Arrow module https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. I  have also looked into https://github.com/apache/iceberg/issues/9 and https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the current status of the vectorized reader support. I’m also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library.


I have a few questions regarding the Vectorized reader/Arrow support:

1.      Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java?

2.      Is there an example of reading data in Arrow format from an Iceberg table?

3.      Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library.


Let me know if you have any questions for me.


Thanks,

Mayur



--
Ryan Blue
Software Engineer
Netflix

RE: Reading data from Iceberg table into Apache Arrow in Java

Posted by Mayur Srivastava <Ma...@twosigma.com>.
Thank you Ryan.

I’ll dig into the file scan plan and Spark codebase to learn about the internals of Iceberg vectorized read path. Then, I’ll try to implement the vectorized reader using core components only. I’ll be happy to work with you to contribute it back to the upstream. I’ll get back to you if I’ve any question or need any more pointers.

Thanks,
Mayur

From: Ryan Blue <rb...@netflix.com.INVALID>
Sent: Friday, February 12, 2021 2:26 PM
To: Iceberg Dev List <de...@iceberg.apache.org>
Subject: Re: Reading data from Iceberg table into Apache Arrow in Java

Hi Mayur,

We built the Arrow support with Spark as the first use case, so the best examples of how to use it are in Spark.

The generic reader does two things: it plans a scan and sets up an iterator of file readers to produce generic records. What you want to do is the same thing, but set up the file readers to produce Arrow batches. You can do that by changing the `Parquet.read` call and passing the callback to create an Arrow batch reader rather than generic row reader. I don't think there is a public example of this, but maybe someone else knows about one. This isn't available in Iceberg yet, but if you want to add it we'd be happy to help you get it in.

The Spark read path has a good example, but it also wraps the Arrow batches so Spark can read them. Also, keep in mind that the Arrow integration only supports flat schemas right now, not fully nested schemas. So you'd need to still fall back to the row-based path. (Side note, if you have code to convert generics to Arrow, that's really useful to post somewhere.)

I hope that helps. It would be great to work with you to improve this in a couple of PRs!

rb

On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <Ma...@twosigma.com>> wrote:

Hi,


We have an existing time series data access service based on Arrow/Flight which uses Apache Arrow format data to perform writes and reads (using time range queries) from a bespoke table-backend based on a S3 compatible storage.


We are trying to replace our bespoke table-backend with Iceberg tables. For integrating with Iceberg, we are using Iceberg core+data+parquet modules directly to write and read data. I would like to note that our service cannot use the Spark route to write or read the data. In our current Iceberg reader integration code, we are using IcebergGenerics.read(table).select(...).where(...).build() to iterate through the data row-by-row. Instead of this (potentially slower) read path which needs conversion between rows and Arrow VectorSchemaRoot, we want to use a vectorized read path which directly returns an Arrow VectorSchemaRoot as a callback or Arrow record batches as the result set.


I have noticed that Iceberg already has an Arrow module https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow. I  have also looked into https://github.com/apache/iceberg/issues/9 and https://github.com/apache/iceberg/milestone/2. But, I’m not sure about the current status of the vectorized reader support. I’m also not sure how this Arrow module is being used to perform a vectorized read to execute a query on an Iceberg table in the core/data/parquet library.


I have a few questions regarding the Vectorized reader/Arrow support:

1.      Is it possible to run a vectorized read on an Iceberg table to return data in Arrow format using a non-Spark reader in Java?

2.      Is there an example of reading data in Arrow format from an Iceberg table?

3.      Is the Spark read path completely vectorized? I ask this question to find out if we can borrow from the vectorized Spark reader or we can move code from vectorized Spark reader to the Iceberg core library.


Let me know if you have any questions for me.


Thanks,

Mayur



--
Ryan Blue
Software Engineer
Netflix

Re: Reading data from Iceberg table into Apache Arrow in Java

Posted by Ryan Blue <rb...@netflix.com.INVALID>.
Hi Mayur,

We built the Arrow support with Spark as the first use case, so the best
examples of how to use it are in Spark.

The generic reader does two things: it plans a scan and sets up an iterator
of file readers to produce generic records. What you want to do is the same
thing, but set up the file readers to produce Arrow batches. You can do
that by changing the `Parquet.read` call and passing the callback to create
an Arrow batch reader rather than generic row reader. I don't think there
is a public example of this, but maybe someone else knows about one. This
isn't available in Iceberg yet, but if you want to add it we'd be happy to
help you get it in.

The Spark read path has a good example, but it also wraps the Arrow batches
so Spark can read them. Also, keep in mind that the Arrow integration only
supports flat schemas right now, not fully nested schemas. So you'd need to
still fall back to the row-based path. (Side note, if you have code to
convert generics to Arrow, that's really useful to post somewhere.)

I hope that helps. It would be great to work with you to improve this in a
couple of PRs!

rb

On Thu, Feb 11, 2021 at 7:22 AM Mayur Srivastava <
Mayur.Srivastava@twosigma.com> wrote:

> Hi,
>
>
>
> We have an existing time series data access service based on Arrow/Flight
> which uses Apache Arrow format data to perform writes and reads (using time
> range queries) from a bespoke table-backend based on a S3 compatible
> storage.
>
>
>
> We are trying to replace our bespoke table-backend with Iceberg tables.
> For integrating with Iceberg, we are using Iceberg core+data+parquet
> modules directly to write and read data. I would like to note that our
> service cannot use the Spark route to write or read the data. In our
> current Iceberg reader integration code, we are using
> IcebergGenerics.read(table).select(...).where(...).build() to iterate
> through the data row-by-row. Instead of this (potentially slower) read path
> which needs conversion between rows and Arrow VectorSchemaRoot, we want to
> use a vectorized read path which directly returns an Arrow VectorSchemaRoot
> as a callback or Arrow record batches as the result set.
>
>
>
> I have noticed that Iceberg already has an Arrow module
> https://github.com/apache/iceberg/tree/master/arrow/src/main/java/org/apache/iceberg/arrow.
> I  have also looked into https://github.com/apache/iceberg/issues/9 and
> https://github.com/apache/iceberg/milestone/2. But, I’m not sure about
> the current status of the vectorized reader support. I’m also not sure how
> this Arrow module is being used to perform a vectorized read to execute a
> query on an Iceberg table in the core/data/parquet library.
>
>
>
> I have a few questions regarding the Vectorized reader/Arrow support:
>
> 1.      Is it possible to run a vectorized read on an Iceberg table to
> return data in Arrow format using a non-Spark reader in Java?
>
> 2.      Is there an example of reading data in Arrow format from an
> Iceberg table?
>
> 3.      Is the Spark read path completely vectorized? I ask this question
> to find out if we can borrow from the vectorized Spark reader or we can
> move code from vectorized Spark reader to the Iceberg core library.
>
>
>
> Let me know if you have any questions for me.
>
>
>
> Thanks,
>
> Mayur
>
>
>


-- 
Ryan Blue
Software Engineer
Netflix