You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/10 14:07:14 UTC

[GitHub] [iceberg] jshmchenxi opened a new pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

jshmchenxi opened a new pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577


   …e locality for spark-sql
   
   When reading an Iceberg table by spark-sql, I couldn't find a way to disable locality as it is defined as a Spark read option. In our busy cluster, a table of about 8k files took about 15s to get block locations from NN. Both QueryExecution$.prepareForExecution() and QueryExecution.sparkPlan() will get block locations and that's 30 seconds...
   
   So I try to add locality as a table property that it can be disable for spark-sql.
   
   Maybe there is a way to disable locality in spark-sql session? Please inform me, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r636788729



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,15 +233,58 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+
+    initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> InternalRowReaderFactory.INSTANCE);
 
     return readTasks;
   }
 
+  /**
+   * Initialize ReadTasks with multi threads as get block locations can be slow
+   *
+   * @param readTasks Result list to return
+   */
+  private <T> void initializeReadTasks(List<InputPartition<T>> readTasks,
+      Broadcast<Table> tableBroadcast, String expectedSchemaString, Supplier<ReaderFactory<T>> supplier) {
+    int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
+        LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));
+
+    if (!localityPreferred || taskInitThreads == 1) {
+      for (CombinedScanTask task : tasks()) {
+        readTasks.add(new ReadTask<>(
+            task, tableBroadcast, expectedSchemaString, caseSensitive,
+            localityPreferred, supplier.get()));
+      }
+      return;
+    }
+
+    List<Future<ReadTask<T>>> futures = new ArrayList<>();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(
+        taskInitThreads,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("Init-ReadTask-%d")
+            .build());
+
+    List<CombinedScanTask> scanTasks = tasks();
+    for (int i = 0; i < scanTasks.size(); i++) {
+      final int curIndex = i;
+      futures.add(pool.submit(() -> new ReadTask<>(scanTasks.get(curIndex), tableBroadcast,
+          expectedSchemaString, caseSensitive, true, supplier.get())));
+    }

Review comment:
       Thanks! I will try to use iceberg utils in another PR and this will focus on adding the ability to disable locality.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637331639



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       I'm a big fan of hints (not that I was asked). But they're a great SQL first solution to a number of problems.
   
   Assuming you mean like
   ```
   select /* read.locality.enabled=false */ a, b, c from table iceberg_table t
   ```
   
   We have encountered a few situations internally where updating `spark.sql.partitions` would make a job unreasonably slow (due to possibly a large filter prior to the write), so a `COALESCE` hint would be really helpful in helping with file sizes (which I assume is naturally supported from spark 3.x hints, unless Iceberg operations get in the way, e.g maybe anything to do with merging or deletes). But I have admittedly not tried it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r635661142



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       I think that locality should be enabled by default, but only relevant if the file system is white-listed. That would make the logic here `localityEnabled && LOCALITY_WHITELIST_FS.contains(fsScheme)`. That's a bit simpler than using the table property to override and we don't want to check locality for file systems that don't support it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-840856172


   I think that this can be configured via `spark.locality.wait`. I think if you set it to zero, it will just automatically give up looking for a data local node. At least that's what I've done when reading from S3 with yarn (which is by definition not local).
   
   ```
   Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node. The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any). It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc. You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637331639



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       I'm a big fan of hints (not that I was asked). But they're a great SQL first solution to a number of problems.
   
   Assuming you mean like
   ```
   select /* read.locality=false */ a, b, c from table iceberg_table t
   ```.
   
   We have encountered a few situations internally where updating `spark.sql.partitions` would make a job unreasonably slow (due to possibly a large filter prior to the write), so a `COALESCE` hint would be really helpful (which I assume is naturally supported unless Iceberg operations get in the way, e.g maybe anything to do with merging or deletes). But I have admittedly not tried it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r636806924



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       OK. I am using spark-sql and didn't find a way to set `locality` as a spark read option. I will try to add it as a user-defined spark configuration so that it can be set per session.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi edited a comment on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi edited a comment on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-841934851


   @openinx  @rdblue  Hi, this could be a performance bottleneck when reading iceberg table with spark-sql. Would you have a look, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r636802401



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
##########
@@ -48,6 +48,9 @@ private SparkReadOptions() {
   // Overrides the table's read.parquet.vectorization.batch-size
   public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
 
+  // Overrides the table's read.locality.enabled
+  public static final String LOCALITY_ENABLED = "locality";

Review comment:
       The locality was passed by spark read option ["locality"](https://github.com/apache/iceberg/blob/master/spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java#L497). Maybe someone is already using it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick edited a comment on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick edited a comment on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-840856172


   I think that this can be configured via `spark.locality.wait`. I think if you set it to zero, it will just automatically give up looking for a data local node. At least that's what I've done when reading from S3 with yarn (which is by definition not local).
   
   ```
   Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node.
   The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any).
   It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc.
   You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r635661316



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,15 +233,58 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+
+    initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> InternalRowReaderFactory.INSTANCE);
 
     return readTasks;
   }
 
+  /**
+   * Initialize ReadTasks with multi threads as get block locations can be slow
+   *
+   * @param readTasks Result list to return
+   */
+  private <T> void initializeReadTasks(List<InputPartition<T>> readTasks,
+      Broadcast<Table> tableBroadcast, String expectedSchemaString, Supplier<ReaderFactory<T>> supplier) {
+    int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
+        LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));
+
+    if (!localityPreferred || taskInitThreads == 1) {
+      for (CombinedScanTask task : tasks()) {
+        readTasks.add(new ReadTask<>(
+            task, tableBroadcast, expectedSchemaString, caseSensitive,
+            localityPreferred, supplier.get()));
+      }
+      return;
+    }
+
+    List<Future<ReadTask<T>>> futures = new ArrayList<>();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(

Review comment:
       Let's remove these changes and consider them in a separate PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-840861832


   Also, if we introduced this, would we be removing the possibility of all of the different locality levels that are provided by Yarn?
   
   https://spark.apache.org/docs/latest/tuning.html#data-locality
   
   From the scheduling configs, you can see that the options provided by spark are much more complex:
   https://spark.apache.org/docs/latest/configuration.html#scheduling
   
   spark.locality.wait.node
   spark.locality.wait.process
   spark.locality.wait.rack
   
   I think that the 30s upper limit you are seeing is potentially being derived from `spark.scheduler.maxRegisteredResourcesWaitingTime`, which is by default 30s. The docs for that state:
   ```
   Maximum amount of time to wait for resources to register before scheduling begins.
   ```
   
   So I'm guessing that is where the 30s thing came into play (before spark started scheduling tasks).
   
   Can you try setting spark.locallity.wait.rack or .node and see if that helps? I've only tried this on S3, so I'm not 100% sure if this will help you.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r636791805



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,15 +233,58 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+
+    initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> InternalRowReaderFactory.INSTANCE);
 
     return readTasks;
   }
 
+  /**
+   * Initialize ReadTasks with multi threads as get block locations can be slow
+   *
+   * @param readTasks Result list to return
+   */
+  private <T> void initializeReadTasks(List<InputPartition<T>> readTasks,
+      Broadcast<Table> tableBroadcast, String expectedSchemaString, Supplier<ReaderFactory<T>> supplier) {
+    int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
+        LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));
+
+    if (!localityPreferred || taskInitThreads == 1) {
+      for (CombinedScanTask task : tasks()) {
+        readTasks.add(new ReadTask<>(
+            task, tableBroadcast, expectedSchemaString, caseSensitive,
+            localityPreferred, supplier.get()));
+      }
+      return;
+    }
+
+    List<Future<ReadTask<T>>> futures = new ArrayList<>();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(
+        taskInitThreads,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("Init-ReadTask-%d")
+            .build());

Review comment:
       > Seems to me like how parallel and whether or not you want to wait for locality would be specific to the cluster that the job is running on.
   I aggre with that. And this property is useful only with spark. Maybe it's better we define it as a user-defined spark configuration that can be set per session?

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,15 +233,58 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+
+    initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> InternalRowReaderFactory.INSTANCE);
 
     return readTasks;
   }
 
+  /**
+   * Initialize ReadTasks with multi threads as get block locations can be slow
+   *
+   * @param readTasks Result list to return
+   */
+  private <T> void initializeReadTasks(List<InputPartition<T>> readTasks,
+      Broadcast<Table> tableBroadcast, String expectedSchemaString, Supplier<ReaderFactory<T>> supplier) {
+    int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
+        LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));
+
+    if (!localityPreferred || taskInitThreads == 1) {
+      for (CombinedScanTask task : tasks()) {
+        readTasks.add(new ReadTask<>(
+            task, tableBroadcast, expectedSchemaString, caseSensitive,
+            localityPreferred, supplier.get()));
+      }
+      return;
+    }
+
+    List<Future<ReadTask<T>>> futures = new ArrayList<>();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(
+        taskInitThreads,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("Init-ReadTask-%d")
+            .build());

Review comment:
       > Seems to me like how parallel and whether or not you want to wait for locality would be specific to the cluster that the job is running on.
   
   I aggre with that. And this property is useful only with spark. Maybe it's better we define it as a user-defined spark configuration that can be set per session?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r636796098



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -90,6 +90,12 @@ private TableProperties() {
   public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled";
   public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false;
 
+  public static final String LOCALITY_ENABLED = "read.locality.enabled";
+  public static final String LOCALITY_ENABLED_DEFAULT = null;

Review comment:
       > Is that inconsistent across uses and that's why this is null?
   
   Yes. The current default comes from `LOCALITY_WHITELIST_FS.contains(scheme)`, which is true on `hdfs` and false otherwise.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick removed a comment on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick removed a comment on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-840856930


   > I think that this can be configured via `spark.locality.wait`. I think if you set it to zero, it will just automatically give up looking for a data local node. At least that's what I've done when reading from S3 with yarn (which is by definition not local).
   > 
   > ```
   > Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node.
   > The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any).
   > It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc.
   > You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
   > ```
   
   Given that you say it takes 30seconds, that would align with the default value of `3000` (appears to be in milliseconds).
   
   If it's possible to leave this as a spark property, maybe it's not something we really need defined on the table level? I'm open to discuss on that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-842716939


   > @kbendick Hi! Thank you for your kind suggestion!
   > I looked through the problem and found that the delay happens during spark-sql planning phase (eg. before the spark job was started).
   > The `spark.locality.wait.*` configurations should take effect during the spark job lifetime, so I'm afraid it cannot help with the problem.
   > I've added multi-threaded mechanism for ReadTask initialize to solve this. Would you have another look? Thanks!
   
   Ah ok. That makes sense re: the current spark.locality.wait.* entries. I will try to take a look at this later today, but I might not be the best person to be reviewing this portion of the code. Will give it a look though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637259082



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
##########
@@ -48,6 +48,9 @@ private SparkReadOptions() {
   // Overrides the table's read.parquet.vectorization.batch-size
   public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
 
+  // Overrides the table's read.locality.enabled
+  public static final String LOCALITY_ENABLED = "locality";

Review comment:
       Okay, this was an existing setting? If so we don't need to rename it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r635660323



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
##########
@@ -48,6 +48,9 @@ private SparkReadOptions() {
   // Overrides the table's read.parquet.vectorization.batch-size
   public static final String VECTORIZATION_BATCH_SIZE = "batch-size";
 
+  // Overrides the table's read.locality.enabled
+  public static final String LOCALITY_ENABLED = "locality";

Review comment:
       What about `locality-enabled`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637332124



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       Yeah, I'm not a fan either, but the read and write options probably won't be possible through SQL otherwise. Maybe that's not what we want to do for things that would ideally have SQL clauses (like `AS OF TIMESTAMP` or `AS OF VERSION`) but hints like `locality=true` seem like a reasonable path to me. We may even be able to get that in upstream Spark.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-841934851


   @rdblue Hi, this could be a performance bottleneck when reading iceberg table with spark-sql. Would you have a look, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
RussellSpitzer commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637332680



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       I thought for time travel and such I'm a bigger fan of just adding more special table naming, just so we have less reliance on catalyst. That said I have no problem with doing some more hints for things like locality




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637332216



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       Lastly, I think hints are much easier to explain to end users and have them remember, as there are A LOT of spark configurations (by necessity) and even people who are very well acquainted with spark can get them mixed up.... which leads to support requests etc.
   
   If the hint gets placed in the query (SQL file, notebook), developers are much more likely to remember what needs to be updated to change it to suit their current needs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-841925758


   @kbendick Hi! Thank you for your kind suggestion!
   I looked through the problem and found that the delay happens during spark-sql planning phase (eg. before the spark job was started).
   The `spark.locality.wait.*` configurations should take effect during the spark job lifetime, so I'm afraid it cannot help with the problem.
   I've added multi-threaded mechanism for ReadTask initialize to solve this. Would you have another look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637331639



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       I'm a big fan of hints (not that I was asked). But they're a great SQL first solution to a number of problems.
   
   Assuming you mean like
   ```
   select /* read.locality=false */ a, b, c from table iceberg_table t
   ```
   
   We have encountered a few situations internally where updating `spark.sql.partitions` would make a job unreasonably slow (due to possibly a large filter prior to the write), so a `COALESCE` hint would be really helpful in helping with file sizes (which I assume is naturally supported from spark 3.x hints, unless Iceberg operations get in the way, e.g maybe anything to do with merging or deletes). But I have admittedly not tried it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-840856930


   > I think that this can be configured via `spark.locality.wait`. I think if you set it to zero, it will just automatically give up looking for a data local node. At least that's what I've done when reading from S3 with yarn (which is by definition not local).
   > 
   > ```
   > Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node.
   > The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any).
   > It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc.
   > You should increase this setting if your tasks are long and see poor locality, but the default usually works well.
   > ```
   
   Given that you say it takes 30seconds, that would align with the default value of `3000` (appears to be in milliseconds).
   
   If it's possible to leave this as a spark property, maybe it's not something we really need defined on the table level? I'm open to discuss on that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-841925758






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r636807507



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,15 +233,58 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+
+    initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> InternalRowReaderFactory.INSTANCE);
 
     return readTasks;
   }
 
+  /**
+   * Initialize ReadTasks with multi threads as get block locations can be slow
+   *
+   * @param readTasks Result list to return
+   */
+  private <T> void initializeReadTasks(List<InputPartition<T>> readTasks,
+      Broadcast<Table> tableBroadcast, String expectedSchemaString, Supplier<ReaderFactory<T>> supplier) {
+    int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
+        LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));
+
+    if (!localityPreferred || taskInitThreads == 1) {
+      for (CombinedScanTask task : tasks()) {
+        readTasks.add(new ReadTask<>(
+            task, tableBroadcast, expectedSchemaString, caseSensitive,
+            localityPreferred, supplier.get()));
+      }
+      return;
+    }
+
+    List<Future<ReadTask<T>>> futures = new ArrayList<>();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(

Review comment:
       Thanks! I will open another PR for this feature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637259416



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       We should add the ability to set read options in hints via Spark extensions. Being able to use hints would be great because we could use it for this and a few other things, like time travel in SQL. (@RussellSpitzer, what do you think?)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637332216



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       Lastly, I think hints are much easier to explain to end users and have them remember, as there are A LOT of spark configurations (by necessity - not even including iceberg spark properties which are much fewer) and even people who are very well acquainted with spark can get them mixed up.... which leads to support requests etc.
   
   If the hint gets placed in the query (SQL file, notebook), developers are much more likely to remember what needs to be updated to change it to suit their current needs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r635662268



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -90,6 +90,12 @@ private TableProperties() {
   public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled";
   public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false;
 
+  public static final String LOCALITY_ENABLED = "read.locality.enabled";
+  public static final String LOCALITY_ENABLED_DEFAULT = null;
+
+  public static final String LOCALITY_TASK_INITIALIZE_THREADS = "read.locality.task.initialize.threads";
+  public static final int LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT = 1;

Review comment:
       If this PR is adding the ability to disable locality, I think it should remain focused on that task and should not include a separate feature.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r635660096



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -90,6 +90,12 @@ private TableProperties() {
   public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled";
   public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false;
 
+  public static final String LOCALITY_ENABLED = "read.locality.enabled";
+  public static final String LOCALITY_ENABLED_DEFAULT = null;

Review comment:
       What is the current default? Is that inconsistent across uses and that's why this is null?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi edited a comment on pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi edited a comment on pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#issuecomment-841934851


   @openinx  @rdblue  Hi, this could be a performance bottleneck when reading iceberg table with spark-sql. Would you have a look, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r636796553



##########
File path: core/src/main/java/org/apache/iceberg/TableProperties.java
##########
@@ -90,6 +90,12 @@ private TableProperties() {
   public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled";
   public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false;
 
+  public static final String LOCALITY_ENABLED = "read.locality.enabled";
+  public static final String LOCALITY_ENABLED_DEFAULT = null;
+
+  public static final String LOCALITY_TASK_INITIALIZE_THREADS = "read.locality.task.initialize.threads";
+  public static final int LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT = 1;

Review comment:
       Got it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r637331802



##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       I think hints could solve a number of problems that people want to create table level properties for, for which table level properties are likely not the most appropriate as they're dependent on seasonal changes in dataset size and also cluster resources when the job is started.

##########
File path: spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
##########
@@ -100,4 +107,11 @@ public static void validatePartitionTransforms(PartitionSpec spec) {
       }
     }
   }
+
+  public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
+    String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
+        TableProperties.LOCALITY_ENABLED_DEFAULT);
+    return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
+        Boolean.parseBoolean(tableLocalityProp);

Review comment:
       I'm a big fan of hints (not that I was asked). But they're a great SQL first solution to a number of problems.
   
   Assuming you mean like
   ```
   select /* read.locality=false */ a, b, c from table iceberg_table t
   ```
   
   We have encountered a few situations internally where updating `spark.sql.partitions` would make a job unreasonably slow (due to possibly a large filter prior to the write), so a `COALESCE` hint would be really helpful (which I assume is naturally supported unless Iceberg operations get in the way, e.g maybe anything to do with merging or deletes). But I have admittedly not tried it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a change in pull request #2577: Spark: Add read.locality.enabled to TableProperties to support disabl…

Posted by GitBox <gi...@apache.org>.
kbendick commented on a change in pull request #2577:
URL: https://github.com/apache/iceberg/pull/2577#discussion_r633937711



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,15 +233,58 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+
+    initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> InternalRowReaderFactory.INSTANCE);
 
     return readTasks;
   }
 
+  /**
+   * Initialize ReadTasks with multi threads as get block locations can be slow
+   *
+   * @param readTasks Result list to return
+   */
+  private <T> void initializeReadTasks(List<InputPartition<T>> readTasks,
+      Broadcast<Table> tableBroadcast, String expectedSchemaString, Supplier<ReaderFactory<T>> supplier) {
+    int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
+        LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));
+
+    if (!localityPreferred || taskInitThreads == 1) {
+      for (CombinedScanTask task : tasks()) {
+        readTasks.add(new ReadTask<>(
+            task, tableBroadcast, expectedSchemaString, caseSensitive,
+            localityPreferred, supplier.get()));
+      }
+      return;
+    }
+
+    List<Future<ReadTask<T>>> futures = new ArrayList<>();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(
+        taskInitThreads,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("Init-ReadTask-%d")
+            .build());
+
+    List<CombinedScanTask> scanTasks = tasks();
+    for (int i = 0; i < scanTasks.size(); i++) {
+      final int curIndex = i;
+      futures.add(pool.submit(() -> new ReadTask<>(scanTasks.get(curIndex), tableBroadcast,
+          expectedSchemaString, caseSensitive, true, supplier.get())));
+    }

Review comment:
       You might be able to use `org.apache.iceberg.util.ParallelIterable` here to help simplify some of this code that takes care of submitting tasks. 

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,15 +233,58 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+
+    initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> InternalRowReaderFactory.INSTANCE);
 
     return readTasks;
   }
 
+  /**
+   * Initialize ReadTasks with multi threads as get block locations can be slow
+   *
+   * @param readTasks Result list to return
+   */
+  private <T> void initializeReadTasks(List<InputPartition<T>> readTasks,
+      Broadcast<Table> tableBroadcast, String expectedSchemaString, Supplier<ReaderFactory<T>> supplier) {
+    int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
+        LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));
+
+    if (!localityPreferred || taskInitThreads == 1) {
+      for (CombinedScanTask task : tasks()) {
+        readTasks.add(new ReadTask<>(
+            task, tableBroadcast, expectedSchemaString, caseSensitive,
+            localityPreferred, supplier.get()));
+      }
+      return;
+    }
+
+    List<Future<ReadTask<T>>> futures = new ArrayList<>();
+
+    final ExecutorService pool = Executors.newFixedThreadPool(
+        taskInitThreads,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("Init-ReadTask-%d")
+            .build());

Review comment:
       Have a look at `org.apache.iceberg.util.ThreadPools`, which can help handle this for you. It would also allow for the pool size to be set as a system property (as well as providing space for a default).
   
   I'm not too sure how I feel about the default coming as a table level property though. Seems to me like how parallel and whether or not you want to wait for locality would be specific to the cluster that the job is running on. But I don't have a strong opinion on that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org