You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/07/12 01:06:43 UTC

[GitHub] [iceberg] kbendick commented on a change in pull request #2803: Spark: Add Multi-thread to construct ReadTask

kbendick commented on a change in pull request #2803:
URL: https://github.com/apache/iceberg/pull/2803#discussion_r667564758



##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -193,6 +202,7 @@ public StructType readSchema() {
   /**
    * This is called in the Spark Driver when data is to be materialized into {@link ColumnarBatch}
    */
+  @SuppressWarnings({"checkstyle:LocalVariableName", "checkstyle:RegexpSinglelineJava"})

Review comment:
       What variable names are causing these checkstyle issues? Would it be possible to simply change them, instead of suppressing the warning for the whole function?

##########
File path: spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
##########
@@ -205,35 +215,62 @@ public StructType readSchema() {
     // broadcast the table metadata as input partitions will be sent to executors
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
-    List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
-    for (CombinedScanTask task : tasks()) {
-      readTasks.add(new ReadTask<>(
-          task, tableBroadcast, expectedSchemaString, caseSensitive,
-          localityPreferred, new BatchReaderFactory(batchSize)));
+    int taskSize = tasks().size();
+    InputPartition<ColumnarBatch>[] readTasks = new InputPartition[taskSize];
+    Long startTime = System.currentTimeMillis();
+    try {
+      pool.submit(() -> IntStream.range(0, taskSize).parallel()
+              .mapToObj(taskId -> {
+                LOG.trace("The size of scanTasks is {},  current taskId is {}, current thread id is {}",
+                        taskSize, taskId, Thread.currentThread().getName());
+                readTasks[taskId] = new ReadTask<>(
+                        tasks().get(taskId), tableBroadcast, expectedSchemaString, caseSensitive,
+                        localityPreferred, new BatchReaderFactory(batchSize));
+                return true;
+              }).collect(Collectors.toList())).get();
+    } catch (Exception e) {
+      e.printStackTrace();

Review comment:
       Instead of simply dumping the stack trace, I think it would be helpful to users to use the logging function to log some information about the exception. Also, does `printStackTrace` respect the users logging configuration (or even desired log output) the same way that using the normal logger would? Seems to me like this call to printStackTrace could easily get swallowed / go unseen if users are logging to a file and have a logging pipeline to access logs (assuming this call prints to stdout or similar).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org