You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@iceberg.apache.org by vivek B <vi...@gmail.com> on 2021/06/20 09:17:32 UTC

Improving/optimizing planInputPartitionsfunction for SparkBtachScan.class

Hey,
If my understanding is correct. The planInputPatition function in
SparkBatchScan
Creates tasks to read data.
Each task contains files whose block locations we will fetch in ReadTask
Constructor provided, *localityPrefered *is true.

https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L274

if (localityPreferred) {
    Table table = tableBroadcast.value();
    this.preferredLocations = Util.blockLocations(table.io(), task);
} else {
    this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
}


If there are 900 tasks created and each task has 100 files then we will
make 90000  fs.getFileBlockLocations  RPC call.
If each RPC call takes  1ms then a total of 90000 * 0.001 =  90 sec of
time  becomes part  planning time.
And also planInputpartitions gets called repetitively,

Though we can avoid  fs.getFileBlockLocations  RPC  call by passing option
locality=false while reading using DataFrameReader apis there is no way to
disable locality when we fire a sql query(like Merge into) which scans the
table.

I am suggesting we can either parallelize the following code block which
creates a ReadTask.

https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L128

for (int i = 0; i < scanTasks.size(); i++) {
    readTasks[i] = new ReadTask(
            scanTasks.get(i), tableBroadcast, expectedSchemaString,
            caseSensitive, localityPreferred);
}


or
Add a table level property  like  table.sql.locality.default = true. to
enable/disable the localityPreference of the table.

public static boolean isLocalityEnabled(FileIO io,
org.apache.iceberg.Table table, CaseInsensitiveStringMap readOptions)
{
    InputFile in = io.newInputFile(table.location());
    if (in instanceof HadoopInputFile) {
        String scheme = ((HadoopInputFile) in).getFileSystem().getScheme();
        boolean localityPreference =
readOptions.getBoolean("locality",
LOCALITY_WHITELIST_FS.contains(scheme));
        return localityPreference &&
Boolean.valueOf(table.properties().getOrDefault("table.sql.locality.default",
"true"));
    }
    return false;
}

Thanks and Regards,
Vivek

Re: Improving/optimizing planInputPartitionsfunction for SparkBtachScan.class

Posted by Suraj Chandran <ch...@gmail.com>.
Also what Vivek showed was  “during the plan time” in the driver node, not
during the actual execution. Does it make sense to have locality disable
options separate for the plan time for metadata operations? So people can
control locality preference independently for plan and actual execution.

Thanks,
Suraj

On Wed, 23 Jun 2021 at 9:59 PM, Jack Ye <ye...@gmail.com> wrote:

> I was touching this part of the code of isLocalityEnabled recently. I
> think instead of a table property, this is really a file system property
> that requires the initialization of a Hadoop file system, so it might be
> better to determine the default behavior based on some config value in the
> Hadoop configuration, add a config key in
> https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java,
> and define the default boolean value in core-site or hdfs-site. Any
> thoughts on this?
>
> -Jack Ye
>
>
> On Mon, Jun 21, 2021 at 10:21 AM Ryan Blue <bl...@apache.org> wrote:
>
>> Vivek,
>>
>> I think you have a few good ideas here. I think it makes sense to
>> parallelize the calls to get block locations. And it is also a gap that
>> there isn’t a way to turn off locality when reading from a table in HDFS
>> through SparkSQL.
>>
>> I’m not sure that a table property makes sense, since this is so specific
>> to Spark and the underlying storage, but I wouldn’t mind adding
>> read.spark.locality.enabled for this if other people think it would
>> help. We just want to be clear that this only configures Spark since the
>> trade-off would be different for other engines.
>>
>> I also think that this is an area where our SQL extensions could help.
>> I’ve been considering adding a way to process hints that adds read options.
>> That way you could actually set the locality option, like this:
>>
>> SELECT /*+ OPTION("locality", false) */ * FROM t;
>>
>> I think that would be a good extension since there’s definitely a gap in
>> Spark between what you can do with the DataFrameReader/options and SQL.
>>
>> Ryan
>>
>> On Mon, Jun 21, 2021 at 9:46 AM vivek B <vi...@gmail.com>
>> wrote:
>>
>>> Hey,
>>> If my understanding is correct. The planInputPatition function in
>>> SparkBatchScan
>>> Creates tasks to read data.
>>> Each task contains files whose block locations we will fetch in ReadTask
>>> Constructor provided, *localityPrefered *is true.
>>>
>>>
>>> https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L274
>>>
>>> if (localityPreferred) {
>>>     Table table = tableBroadcast.value();
>>>     this.preferredLocations = Util.blockLocations(table.io(), task);
>>> } else {
>>>     this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
>>> }
>>>
>>>
>>> If there are 900 tasks created and each task has 100 files then we will
>>> make 90000  fs.getFileBlockLocations  RPC call.
>>> If each RPC call takes  1ms then a total of 90000 * 0.001 =  90 sec of
>>> time  becomes part  planning time.
>>> And also planInputpartitions gets called repetitively,
>>>
>>> Though we can avoid  fs.getFileBlockLocations  RPC  call by passing
>>> option locality=false while reading using DataFrameReader apis there is no
>>> way to disable locality when we fire a sql query(like Merge into) which
>>> scans the table.
>>>
>>> I am suggesting we can either parallelize the following code block which
>>> creates a ReadTask.
>>>
>>>
>>> https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L128
>>>
>>> for (int i = 0; i < scanTasks.size(); i++) {
>>>     readTasks[i] = new ReadTask(
>>>             scanTasks.get(i), tableBroadcast, expectedSchemaString,
>>>             caseSensitive, localityPreferred);
>>> }
>>>
>>>
>>> or
>>> Add a table level property  like  table.sql.locality.default = true. to
>>> enable/disable the localityPreference of the table.
>>>
>>> public static boolean isLocalityEnabled(FileIO io, org.apache.iceberg.Table table, CaseInsensitiveStringMap readOptions) {
>>>     InputFile in = io.newInputFile(table.location());
>>>     if (in instanceof HadoopInputFile) {
>>>         String scheme = ((HadoopInputFile) in).getFileSystem().getScheme();
>>>         boolean localityPreference = readOptions.getBoolean("locality", LOCALITY_WHITELIST_FS.contains(scheme));
>>>         return localityPreference && Boolean.valueOf(table.properties().getOrDefault("table.sql.locality.default", "true"));
>>>     }
>>>     return false;
>>> }
>>>
>>> Thanks and Regards,
>>> Vivek
>>>
>>>
>>
>> --
>> Ryan Blue
>>
>

Re: Improving/optimizing planInputPartitionsfunction for SparkBtachScan.class

Posted by Jack Ye <ye...@gmail.com>.
I was touching this part of the code of isLocalityEnabled recently. I think
instead of a table property, this is really a file system property that
requires the initialization of a Hadoop file system, so it might be better
to determine the default behavior based on some config value in the Hadoop
configuration, add a config key in
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java,
and define the default boolean value in core-site or hdfs-site. Any
thoughts on this?

-Jack Ye


On Mon, Jun 21, 2021 at 10:21 AM Ryan Blue <bl...@apache.org> wrote:

> Vivek,
>
> I think you have a few good ideas here. I think it makes sense to
> parallelize the calls to get block locations. And it is also a gap that
> there isn’t a way to turn off locality when reading from a table in HDFS
> through SparkSQL.
>
> I’m not sure that a table property makes sense, since this is so specific
> to Spark and the underlying storage, but I wouldn’t mind adding
> read.spark.locality.enabled for this if other people think it would help.
> We just want to be clear that this only configures Spark since the
> trade-off would be different for other engines.
>
> I also think that this is an area where our SQL extensions could help.
> I’ve been considering adding a way to process hints that adds read options.
> That way you could actually set the locality option, like this:
>
> SELECT /*+ OPTION("locality", false) */ * FROM t;
>
> I think that would be a good extension since there’s definitely a gap in
> Spark between what you can do with the DataFrameReader/options and SQL.
>
> Ryan
>
> On Mon, Jun 21, 2021 at 9:46 AM vivek B <vi...@gmail.com>
> wrote:
>
>> Hey,
>> If my understanding is correct. The planInputPatition function in
>> SparkBatchScan
>> Creates tasks to read data.
>> Each task contains files whose block locations we will fetch in ReadTask
>> Constructor provided, *localityPrefered *is true.
>>
>>
>> https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L274
>>
>> if (localityPreferred) {
>>     Table table = tableBroadcast.value();
>>     this.preferredLocations = Util.blockLocations(table.io(), task);
>> } else {
>>     this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
>> }
>>
>>
>> If there are 900 tasks created and each task has 100 files then we will
>> make 90000  fs.getFileBlockLocations  RPC call.
>> If each RPC call takes  1ms then a total of 90000 * 0.001 =  90 sec of
>> time  becomes part  planning time.
>> And also planInputpartitions gets called repetitively,
>>
>> Though we can avoid  fs.getFileBlockLocations  RPC  call by passing
>> option locality=false while reading using DataFrameReader apis there is no
>> way to disable locality when we fire a sql query(like Merge into) which
>> scans the table.
>>
>> I am suggesting we can either parallelize the following code block which
>> creates a ReadTask.
>>
>>
>> https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L128
>>
>> for (int i = 0; i < scanTasks.size(); i++) {
>>     readTasks[i] = new ReadTask(
>>             scanTasks.get(i), tableBroadcast, expectedSchemaString,
>>             caseSensitive, localityPreferred);
>> }
>>
>>
>> or
>> Add a table level property  like  table.sql.locality.default = true. to
>> enable/disable the localityPreference of the table.
>>
>> public static boolean isLocalityEnabled(FileIO io, org.apache.iceberg.Table table, CaseInsensitiveStringMap readOptions) {
>>     InputFile in = io.newInputFile(table.location());
>>     if (in instanceof HadoopInputFile) {
>>         String scheme = ((HadoopInputFile) in).getFileSystem().getScheme();
>>         boolean localityPreference = readOptions.getBoolean("locality", LOCALITY_WHITELIST_FS.contains(scheme));
>>         return localityPreference && Boolean.valueOf(table.properties().getOrDefault("table.sql.locality.default", "true"));
>>     }
>>     return false;
>> }
>>
>> Thanks and Regards,
>> Vivek
>>
>>
>
> --
> Ryan Blue
>

Re: Improving/optimizing planInputPartitionsfunction for SparkBtachScan.class

Posted by Ryan Blue <bl...@apache.org>.
Vivek,

I think you have a few good ideas here. I think it makes sense to
parallelize the calls to get block locations. And it is also a gap that
there isn’t a way to turn off locality when reading from a table in HDFS
through SparkSQL.

I’m not sure that a table property makes sense, since this is so specific
to Spark and the underlying storage, but I wouldn’t mind adding
read.spark.locality.enabled for this if other people think it would help.
We just want to be clear that this only configures Spark since the
trade-off would be different for other engines.

I also think that this is an area where our SQL extensions could help. I’ve
been considering adding a way to process hints that adds read options. That
way you could actually set the locality option, like this:

SELECT /*+ OPTION("locality", false) */ * FROM t;

I think that would be a good extension since there’s definitely a gap in
Spark between what you can do with the DataFrameReader/options and SQL.

Ryan

On Mon, Jun 21, 2021 at 9:46 AM vivek B <vi...@gmail.com> wrote:

> Hey,
> If my understanding is correct. The planInputPatition function in
> SparkBatchScan
> Creates tasks to read data.
> Each task contains files whose block locations we will fetch in ReadTask
> Constructor provided, *localityPrefered *is true.
>
>
> https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L274
>
> if (localityPreferred) {
>     Table table = tableBroadcast.value();
>     this.preferredLocations = Util.blockLocations(table.io(), task);
> } else {
>     this.preferredLocations = HadoopInputFile.NO_LOCATION_PREFERENCE;
> }
>
>
> If there are 900 tasks created and each task has 100 files then we will
> make 90000  fs.getFileBlockLocations  RPC call.
> If each RPC call takes  1ms then a total of 90000 * 0.001 =  90 sec of
> time  becomes part  planning time.
> And also planInputpartitions gets called repetitively,
>
> Though we can avoid  fs.getFileBlockLocations  RPC  call by passing
> option locality=false while reading using DataFrameReader apis there is no
> way to disable locality when we fire a sql query(like Merge into) which
> scans the table.
>
> I am suggesting we can either parallelize the following code block which
> creates a ReadTask.
>
>
> https://github.com/apache/iceberg/blob/765ec12476e1c25fa270660b56f1ea063910831d/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java#L128
>
> for (int i = 0; i < scanTasks.size(); i++) {
>     readTasks[i] = new ReadTask(
>             scanTasks.get(i), tableBroadcast, expectedSchemaString,
>             caseSensitive, localityPreferred);
> }
>
>
> or
> Add a table level property  like  table.sql.locality.default = true. to
> enable/disable the localityPreference of the table.
>
> public static boolean isLocalityEnabled(FileIO io, org.apache.iceberg.Table table, CaseInsensitiveStringMap readOptions) {
>     InputFile in = io.newInputFile(table.location());
>     if (in instanceof HadoopInputFile) {
>         String scheme = ((HadoopInputFile) in).getFileSystem().getScheme();
>         boolean localityPreference = readOptions.getBoolean("locality", LOCALITY_WHITELIST_FS.contains(scheme));
>         return localityPreference && Boolean.valueOf(table.properties().getOrDefault("table.sql.locality.default", "true"));
>     }
>     return false;
> }
>
> Thanks and Regards,
> Vivek
>
>

-- 
Ryan Blue