You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/09 15:39:12 UTC

[GitHub] [iceberg] jshmchenxi opened a new pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

jshmchenxi opened a new pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800


   Follows #2577, use thread pool to initialize readTasks if Spark locality is preferred.
   Before this, the Spark plan phase could be slow as it uses single thread to obtain block locations of all files for this scan.
   More information could be found in [this comment](https://github.com/apache/iceberg/pull/2577#issue-637698205)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667424528



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -206,11 +219,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<ColumnarBatch> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, new BatchReaderFactory(batchSize));
+          synchronized (readTasks) {

Review comment:
       I changed the code to use arrays like in spark3 to avoid synchronization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667554624



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -95,6 +104,7 @@
   private Filter[] pushedFilters = NO_FILTERS;
   private final boolean localityPreferred;
   private final int batchSize;
+  private ExecutorService readTasksInitExecutorService = DEFAULT_READTASKS_INIT_EXECUTOR_SERVICE;

Review comment:
       I don't think that there is a need for this field and I'd prefer not to add mutable state. Can you refactor this to call `executeWith(ThreadPools.getWorkerPool())`  instead? You can pass `null` to that method so it can also check `localityPreferred` inline:
   
   ```
       .executeWith(localityPreferred ? ThreadPools.getWorkerPool() : null)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
openinx commented on pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#issuecomment-877922114


   Thanks for @jshmchenxi for the optimize work for spark on HDFS ( I just notice this and #2577,  it's impressive).
   
   Do we also need to parallelize the [SparkMicroBatchStream#planInputPartitions](https://github.com/apache/iceberg/blob/a100d2d5a06153674cdf97940039f3df68b5f9b8/spark3/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java#L140) in this PR ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] southernriver commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
southernriver commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667416653



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,11 +246,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<InternalRow> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, InternalRowReaderFactory.INSTANCE);
+          synchronized (readTasks) {
+            readTasks.add(readTask);

Review comment:
       Yeap! But another reason is we can keep the style consistent with spark3 module, a little bit of optimization with array as follow  [:R153](https://github.com/apache/iceberg/pull/2800/files#diff-47b9fbed0d32a8f8ec128a4f78cd93f238e65c38f93e9a10d257eb83838563ffR153), how do you think?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667411811



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,11 +246,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<InternalRow> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, InternalRowReaderFactory.INSTANCE);
+          synchronized (readTasks) {
+            readTasks.add(readTask);

Review comment:
       Yes, using array can help avoid lock, but the lock time should be insignificant compared to the get block location operation. The return value is a list and I didn't want to do all the transformation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] southernriver commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
southernriver commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667355630



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,11 +246,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<InternalRow> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, InternalRowReaderFactory.INSTANCE);
+          synchronized (readTasks) {
+            readTasks.add(readTask);

Review comment:
       Maybe itβ€˜s better to use array instead of list, and then we can avoid to add lock or temporary var ? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667413292



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -206,11 +219,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<ColumnarBatch> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, new BatchReaderFactory(batchSize));
+          synchronized (readTasks) {

Review comment:
       Or I have fallen into a premature optimization πŸ˜‚




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667684197



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -95,6 +104,7 @@
   private Filter[] pushedFilters = NO_FILTERS;
   private final boolean localityPreferred;
   private final int batchSize;
+  private ExecutorService readTasksInitExecutorService = DEFAULT_READTASKS_INIT_EXECUTOR_SERVICE;

Review comment:
       That really simplifies the code. Thanks @rdblue 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667395881



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -206,11 +219,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<ColumnarBatch> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, new BatchReaderFactory(batchSize));
+          synchronized (readTasks) {

Review comment:
       Why not use a concurrent list?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667413292



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -206,11 +219,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<ColumnarBatch> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, new BatchReaderFactory(batchSize));
+          synchronized (readTasks) {

Review comment:
       Or I have fallen into a premature optimization πŸ˜‚




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667412218



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -206,11 +219,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<ColumnarBatch> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, new BatchReaderFactory(batchSize));
+          synchronized (readTasks) {

Review comment:
       At first time I wanted to use a concurrent list. But this would change the return value from a normal list to a concurrent list, and the subsequent operation on this list should only be read. Maybe it will affect performance.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667326911



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -206,11 +219,14 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          readTasks.add(new ReadTask<>(

Review comment:
       Oh, that was a mistake. Fixed it!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue merged pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
rdblue merged pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667640843



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -205,15 +219,20 @@ public StructType readSchema() {
     // broadcast the table metadata as input partitions will be sent to executors
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
-    List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
-    LOG.info("Batching input partitions with {} tasks.", readTasks.size());
+    List<CombinedScanTask> scanTasks = tasks();
+    InputPartition<ColumnarBatch>[] readTasks = new InputPartition[scanTasks.size()];
 
-    return readTasks;
+    Tasks.range(readTasks.length)
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(index -> {

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667424616



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -227,11 +246,17 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, InternalRowReaderFactory.INSTANCE));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          InputPartition<InternalRow> readTask = new ReadTask<>(
+              task, tableBroadcast, expectedSchemaString, caseSensitive,
+              localityPreferred, InternalRowReaderFactory.INSTANCE);
+          synchronized (readTasks) {
+            readTasks.add(readTask);

Review comment:
       Okay!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] southernriver commented on pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
southernriver commented on pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#issuecomment-877658957


   Oops, I have also fix this a few days ago, and  create  another PR  [#2803](https://github.com/apache/iceberg/pull/2803) just a moment ago.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667554221



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -205,15 +219,20 @@ public StructType readSchema() {
     // broadcast the table metadata as input partitions will be sent to executors
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
-    List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
-    LOG.info("Batching input partitions with {} tasks.", readTasks.size());
+    List<CombinedScanTask> scanTasks = tasks();
+    InputPartition<ColumnarBatch>[] readTasks = new InputPartition[scanTasks.size()];
 
-    return readTasks;
+    Tasks.range(readTasks.length)
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(index -> {

Review comment:
       The block is unnecessary because there is only one expression, can you remove it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] openinx commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
openinx commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667586471



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -95,6 +104,7 @@
   private Filter[] pushedFilters = NO_FILTERS;
   private final boolean localityPreferred;
   private final int batchSize;
+  private ExecutorService readTasksInitExecutorService = DEFAULT_READTASKS_INIT_EXECUTOR_SERVICE;

Review comment:
       I like this comment because it really simplify the code path without introducing any static or local variables.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jshmchenxi commented on pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
jshmchenxi commented on pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#issuecomment-878037179


   @openinx Thanks for reminding, I've added parallelization to SparkMicroBatchStream#planInputPartitions in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#issuecomment-878457411


   Thanks, @jshmchenxi! I merged this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #2800: Spark: Parallelize initializing readTasks when localityPreferred is true

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #2800:
URL: https://github.com/apache/iceberg/pull/2800#discussion_r667179215



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -206,11 +219,14 @@ public StructType readSchema() {
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
-    }
+    Tasks.foreach(tasks())
+        .stopOnFailure()
+        .executeWith(readTasksInitExecutorService)
+        .run(task -> {
+          readTasks.add(new ReadTask<>(

Review comment:
       This can't alter `readTasks` from multiple threads because array lists aren't thread-safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org