You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2023/01/19 14:01:56 UTC

[GitHub] [iceberg] kingeasternsun opened a new pull request, #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

kingeasternsun opened a new pull request, #6624:
URL: https://github.com/apache/iceberg/pull/6624

   …, SnapshotTable.
   
   Signed-off-by: kingeasternsun <ki...@gmail.com>
   
   Based on    `Speed up TableMigration By collect the DafaFile In parallel`   #3876 ,  
   - add parallelism parameter to `add_files` and the related functions.
   - for compatibility,I Keep the original method  and let original method call the newer method with default parallelism = 1
   - add parallelism parameter to MigrateTableProcedure
   - add parallelism parameter to SnapshotTableProcedure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1411736076

   Hi, should I squash the commits before merge @RussellSpitzer @jackye1995 @amogh-jahagirdar ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1397701478

   Left a review, thanks for the contribution @kingeasternsun ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1084333280


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {
+    throw new UnsupportedOperationException(

Review Comment:
   I think unsupported is right here, since we are basically saying that the method is unsupported not that there isn't a default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083331517


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {

Review Comment:
   Thanks, I see there are 2 places I think can be consistent, the first is this place, where we can just do `default MigrateTable withParallelism(int parallelism)` instead of using a different name and introduce a `reader` concept.
   
   The other place is in the procedure, where you have `        ProcedureParameter.optional("max_concurrent_read_datafiles", DataTypes.IntegerType)
   `, we can just name the option `parallelism` as well.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083428466


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {
+    throw new UnsupportedOperationException(

Review Comment:
   > I think we can default to 1?
   
   /LGTM,  But I'm not familiar with JAVA,  Could you give more detail about how set this default to 1 , many thanks. @jackye1995 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1088593764


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -442,14 +444,51 @@ public static void importSparkTable(
             "Cannot find any partitions in table %s",
             sourceTableIdent);
         importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
+            spark,
+            sourceTablePartitions,
+            targetTable,
+            spec,
+            stagingDir,
+            checkDuplicateFiles,
+            parallelism);
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
           e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
     }
   }
 
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param partitionFilter only import partitions whose values match those in the map, can be
+   *     partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      Map<String, String> partitionFilter,
+      boolean checkDuplicateFiles) {
+    importSparkTable(
+        spark,
+        sourceTableIdent,
+        targetTable,
+        stagingDir,
+        Collections.emptyMap(),
+        checkDuplicateFiles,
+        1);

Review Comment:
   Ok, I'll fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by GitBox <gi...@apache.org>.
kingeasternsun commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1398154903

   > Left a review, thanks for the contribution @kingeasternsun ! Also looks like spotless checks are failing which you can fix by running `./gradlew :iceberg-api:spotlessJavaCheck`
   
   Thanks for your advice, I have fixed them. Will you merge this please.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1084340425


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -442,14 +444,51 @@ public static void importSparkTable(
             "Cannot find any partitions in table %s",
             sourceTableIdent);
         importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
+            spark,
+            sourceTablePartitions,
+            targetTable,
+            spec,
+            stagingDir,
+            checkDuplicateFiles,
+            parallelism);
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
           e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
     }
   }
 
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param partitionFilter only import partitions whose values match those in the map, can be
+   *     partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      Map<String, String> partitionFilter,
+      boolean checkDuplicateFiles) {
+    importSparkTable(
+        spark,
+        sourceTableIdent,
+        targetTable,
+        stagingDir,
+        Collections.emptyMap(),
+        checkDuplicateFiles,
+        1);

Review Comment:
   can we set the default here to our worker pool default thread count?
   https://github.com/kingeasternsun/iceberg/blob/feature/add-parallelism-add-files/core/src/main/java/org/apache/iceberg/util/ThreadPools.java#L38
   
   I think this is probably safe
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1089592709


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java:
##########
@@ -39,7 +39,8 @@ class MigrateTableProcedure extends BaseProcedure {
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
         ProcedureParameter.optional("properties", STRING_MAP),
-        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
+        ProcedureParameter.optional("max_concurrent_read_datafiles", DataTypes.IntegerType)

Review Comment:
   @amogh-jahagirdar ,Thanks for your review,  I have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1730727152

   @jackye1995 Thanks for your review. 
   
   >  If not we can open a new PR and mark you as coauthor.
   
   Yes, I agree with that


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083145300


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {

Review Comment:
   can we simplify and unify all the naming? We can just call it `parallelism` and use this name across the board. It also matches the naming in `TableMigrationUtil`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by GitBox <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1082057002


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java:
##########
@@ -39,7 +39,8 @@ class MigrateTableProcedure extends BaseProcedure {
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
         ProcedureParameter.optional("properties", STRING_MAP),
-        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
+        ProcedureParameter.optional("max_concurrent_read_datafiles", DataTypes.IntegerType)

Review Comment:
   Thanks for your reviews, It because this  https://github.com/apache/iceberg/pull/3973#discussion_r794150006



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1089605123


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -442,14 +444,51 @@ public static void importSparkTable(
             "Cannot find any partitions in table %s",
             sourceTableIdent);
         importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
+            spark,
+            sourceTablePartitions,
+            targetTable,
+            spec,
+            stagingDir,
+            checkDuplicateFiles,
+            parallelism);
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
           e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
     }
   }
 
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param partitionFilter only import partitions whose values match those in the map, can be
+   *     partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      Map<String, String> partitionFilter,
+      boolean checkDuplicateFiles) {
+    importSparkTable(
+        spark,
+        sourceTableIdent,
+        targetTable,
+        stagingDir,
+        Collections.emptyMap(),
+        checkDuplicateFiles,
+        1);

Review Comment:
   @RussellSpitzer , I have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1143822056


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/SnapshotTableSparkAction.java:
##########
@@ -54,6 +55,8 @@ public class SnapshotTableSparkAction extends BaseTableCreationSparkAction<Snaps
   private StagingTableCatalog destCatalog;
   private Identifier destTableIdent;
   private String destTableLocation = null;
+  // Max number of concurrent files to read per partition while indexing table
+  private int readDatafileParallelism = 1;

Review Comment:
   Same comment on default parallelism
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1087772776


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -442,14 +444,51 @@ public static void importSparkTable(
             "Cannot find any partitions in table %s",
             sourceTableIdent);
         importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
+            spark,
+            sourceTablePartitions,
+            targetTable,
+            spec,
+            stagingDir,
+            checkDuplicateFiles,
+            parallelism);
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
           e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
     }
   }
 
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param partitionFilter only import partitions whose values match those in the map, can be
+   *     partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      Map<String, String> partitionFilter,
+      boolean checkDuplicateFiles) {
+    importSparkTable(
+        spark,
+        sourceTableIdent,
+        targetTable,
+        stagingDir,
+        Collections.emptyMap(),
+        checkDuplicateFiles,
+        1);

Review Comment:
   /LGTM.
   I concern that if we set the default here to our worker pool default thread count, whether this change violates Compatibility, because older version is single threaded.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1087917320


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -442,14 +444,51 @@ public static void importSparkTable(
             "Cannot find any partitions in table %s",
             sourceTableIdent);
         importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
+            spark,
+            sourceTablePartitions,
+            targetTable,
+            spec,
+            stagingDir,
+            checkDuplicateFiles,
+            parallelism);
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
           e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
     }
   }
 
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param partitionFilter only import partitions whose values match those in the map, can be
+   *     partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      Map<String, String> partitionFilter,
+      boolean checkDuplicateFiles) {
+    importSparkTable(
+        spark,
+        sourceTableIdent,
+        targetTable,
+        stagingDir,
+        Collections.emptyMap(),
+        checkDuplicateFiles,
+        1);

Review Comment:
   This is just a performance chance, unless we think this will break somehow we can change the implementation of apis, just not apis or their outputs.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -442,14 +444,51 @@ public static void importSparkTable(
             "Cannot find any partitions in table %s",
             sourceTableIdent);
         importSparkPartitions(
-            spark, sourceTablePartitions, targetTable, spec, stagingDir, checkDuplicateFiles);
+            spark,
+            sourceTablePartitions,
+            targetTable,
+            spec,
+            stagingDir,
+            checkDuplicateFiles,
+            parallelism);
       }
     } catch (AnalysisException e) {
       throw SparkExceptionUtil.toUncheckedException(
           e, "Unable to get partition spec for table: %s", sourceTableIdentWithDB);
     }
   }
 
+  /**
+   * Import files from an existing Spark table to an Iceberg table.
+   *
+   * <p>The import uses the Spark session to get table metadata. It assumes no operation is going on
+   * the original and target table and thus is not thread-safe.
+   *
+   * @param spark a Spark session
+   * @param sourceTableIdent an identifier of the source Spark table
+   * @param targetTable an Iceberg table where to import the data
+   * @param stagingDir a staging directory to store temporary manifest files
+   * @param partitionFilter only import partitions whose values match those in the map, can be
+   *     partially defined
+   * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   */
+  public static void importSparkTable(
+      SparkSession spark,
+      TableIdentifier sourceTableIdent,
+      Table targetTable,
+      String stagingDir,
+      Map<String, String> partitionFilter,
+      boolean checkDuplicateFiles) {
+    importSparkTable(
+        spark,
+        sourceTableIdent,
+        targetTable,
+        stagingDir,
+        Collections.emptyMap(),
+        checkDuplicateFiles,
+        1);

Review Comment:
   This is just a performance change, unless we think this will break somehow we can change the implementation of apis, just not apis or their outputs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083145098


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {
+    throw new UnsupportedOperationException(

Review Comment:
   I think we can default to 1?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083440870


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java:
##########
@@ -39,7 +39,8 @@ class MigrateTableProcedure extends BaseProcedure {
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
         ProcedureParameter.optional("properties", STRING_MAP),
-        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
+        ProcedureParameter.optional("max_concurrent_read_datafiles", DataTypes.IntegerType)

Review Comment:
   @amogh-jahagirdar Another reason is that `importSparkPartitions`  https://github.com/kingeasternsun/iceberg/blob/feature/add-parallelism-add-files/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java#L620-L630 has a local variable named `parallelism`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1143830032


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java:
##########
@@ -57,6 +58,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
   private final StagingTableCatalog destCatalog;
   private final Identifier destTableIdent;
   private final Identifier backupIdent;
+  // Max number of concurrent files to read per partition while indexing table
+  private int readDatafileParallelism = 1;

Review Comment:
   Oh good catch, yeah it seems like the worker pool size is used in the procedures but not in the action. That should be fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1087744708


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {
+    throw new UnsupportedOperationException(

Review Comment:
   Thanks @RussellSpitzer for your reviews again, So I just Keep it as it is using unsupported.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083331517


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {

Review Comment:
   Thanks, I see you changed some places, there are 2 places I think can be also consistent, the first is this place, where we can just do `default MigrateTable withParallelism(int parallelism)` instead of using a different name and introduce a `reader` concept.
   
   The other place is in the procedure, where you have `        ProcedureParameter.optional("max_concurrent_read_datafiles", DataTypes.IntegerType)
   `, we can just name the option `parallelism` as well.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1089591796


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {

Review Comment:
   @jackye1995  Ok, I have fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1089605163


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -119,8 +120,15 @@ public InternalRow[] call(InternalRow args) {
       checkDuplicateFiles = args.getBoolean(3);
     }
 
+    int parallelism;

Review Comment:
   @RussellSpitzer , Thanks for your review, I have fixed it to `ThreadPools.WORKER_THREAD_POOL_SIZE`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1224007730


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {

Review Comment:
   Thank very much for your reviews  @jackye1995 , but for some reason I am not quite able to continue updating this MR In time, So I agree with your suggestion that β€œopen a new PR” . Best wishes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1087967571


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -119,8 +120,15 @@ public InternalRow[] call(InternalRow args) {
       checkDuplicateFiles = args.getBoolean(3);
     }
 
+    int parallelism;

Review Comment:
   I'm not sure how we ended up with this pattern here, in the other procedures we do
   ```java
   var = (arg.nullat(?)) default : arg.get(?);
   ```
   Although There is probably no reason to change this now. I do think a default of 1 is still probably too low. I feel like this is a `gotcha` default and is basically never correct.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] sweetpythoncode commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "sweetpythoncode (via GitHub)" <gi...@apache.org>.
sweetpythoncode commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1463761535

   @RussellSpitzer Can you take a look at it when u have time, the current method is super slow on a big number of partitions, please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1529922237

   Looks like there is no update, @kingeasternsun do you still plan to move this forward? If not we can open a new PR and mark you as coauthor.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1143821714


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/MigrateTableSparkAction.java:
##########
@@ -57,6 +58,8 @@ public class MigrateTableSparkAction extends BaseTableCreationSparkAction<Migrat
   private final StagingTableCatalog destCatalog;
   private final Identifier destTableIdent;
   private final Identifier backupIdent;
+  // Max number of concurrent files to read per partition while indexing table
+  private int readDatafileParallelism = 1;

Review Comment:
   This is different than the default in SparkTableUtil, is that intentional? I thought we were going to do the worker pool size everywhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1084335076


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {
+    throw new UnsupportedOperationException(

Review Comment:
   Also I think our default should probably be > 1 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by GitBox <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1081932750


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java:
##########
@@ -93,10 +94,20 @@ public InternalRow[] call(InternalRow args) {
               });
     }
 
+    int parallelism;
+    if (!args.isNullAt(4)) {
+      parallelism = args.getInt(4);
+    } else {
+      parallelism = 1;
+    }

Review Comment:
   Nit: I think it's a bit cleaner to use ternary here
   
   `int parallelism = args.getInt(4) != null ? args.getInt(4) : 1`



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java:
##########
@@ -99,7 +100,13 @@ public InternalRow[] call(InternalRow args) {
     if (dropBackup) {
       result = migrateTableSparkAction.dropBackup().execute();
     } else {
-      result = migrateTableSparkAction.execute();
+      int parallelism;
+      if (!args.isNullAt(3)) {
+        parallelism = args.getInt(3);
+      } else {
+        parallelism = 1;
+      }

Review Comment:
   Same nit as below i think ternary assignment works well here



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java:
##########
@@ -39,7 +39,8 @@ class MigrateTableProcedure extends BaseProcedure {
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
         ProcedureParameter.optional("properties", STRING_MAP),
-        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
+        ProcedureParameter.optional("max_concurrent_read_datafiles", DataTypes.IntegerType)

Review Comment:
   Any reason we can't just call this parameter "parallelism" ? I think it will only be data files involved



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083237847


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {

Review Comment:
   /LGTM。  @RussellSpitzer @aokolnychyi @szehon-ho @jackye1995 when you get a chance could you please take a look? Thanks。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1084336485


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -405,14 +405,16 @@ private static Iterator<ManifestFile> buildManifest(
    * @param partitionFilter only import partitions whose values match those in the map, can be
    *     partially defined
    * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   * @param parallelism Controls max concurrency of file reads per partition
    */
   public static void importSparkTable(

Review Comment:
   Unfortunately this is a public api, We should probably add a new signature and deprecate the old one, or keep the old one. I think in this case keeping the old one without deprecation is fine but we probably should not break binary compatibility unless we have to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1087750558


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -405,14 +405,16 @@ private static Iterator<ManifestFile> buildManifest(
    * @param partitionFilter only import partitions whose values match those in the map, can be
    *     partially defined
    * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   * @param parallelism Controls max concurrency of file reads per partition
    */
   public static void importSparkTable(

Review Comment:
   Thanks again for your reviews, this PR's changes is almost same with the older #3973 :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1088594167


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/AddFilesProcedure.java:
##########
@@ -119,8 +120,15 @@ public InternalRow[] call(InternalRow args) {
       checkDuplicateFiles = args.getBoolean(3);
     }
 
+    int parallelism;

Review Comment:
   So we set default here to worker pool default thread count too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "jackye1995 (via GitHub)" <gi...@apache.org>.
jackye1995 commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1494597170

   This PR looks ready to go, @kingeasternsun could you rebase it? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by GitBox <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1082061029


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java:
##########
@@ -93,10 +94,20 @@ public InternalRow[] call(InternalRow args) {
               });
     }
 
+    int parallelism;
+    if (!args.isNullAt(4)) {
+      parallelism = args.getInt(4);
+    } else {
+      parallelism = 1;
+    }

Review Comment:
   Thanks for your advice, I will fix this.



##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java:
##########
@@ -99,7 +100,13 @@ public InternalRow[] call(InternalRow args) {
     if (dropBackup) {
       result = migrateTableSparkAction.dropBackup().execute();
     } else {
-      result = migrateTableSparkAction.execute();
+      int parallelism;
+      if (!args.isNullAt(3)) {
+        parallelism = args.getInt(3);
+      } else {
+        parallelism = 1;
+      }

Review Comment:
   Thanks for your advice, I will fix this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] RussellSpitzer commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "RussellSpitzer (via GitHub)" <gi...@apache.org>.
RussellSpitzer commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1084337092


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java:
##########
@@ -405,14 +405,16 @@ private static Iterator<ManifestFile> buildManifest(
    * @param partitionFilter only import partitions whose values match those in the map, can be
    *     partially defined
    * @param checkDuplicateFiles if true, throw exception if import results in a duplicate data file
+   * @param parallelism Controls max concurrency of file reads per partition
    */
   public static void importSparkTable(

Review Comment:
   Oops nvm, I see the original below. This is fine :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] hazelnutsgz commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "hazelnutsgz (via GitHub)" <gi...@apache.org>.
hazelnutsgz commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1542845862

   I feel like the number of concurrent thread should be somehow determined by the number of files in that partition. Let's say partition A got 10000 files and partition B got 10 files. The number of threads should be `
   
   ``` number of files in the partition * configurable_factor```, where configurable_factor is between 0 to 1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by GitBox <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1082057002


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java:
##########
@@ -39,7 +39,8 @@ class MigrateTableProcedure extends BaseProcedure {
       new ProcedureParameter[] {
         ProcedureParameter.required("table", DataTypes.StringType),
         ProcedureParameter.optional("properties", STRING_MAP),
-        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType)
+        ProcedureParameter.optional("drop_backup", DataTypes.BooleanType),
+        ProcedureParameter.optional("max_concurrent_read_datafiles", DataTypes.IntegerType)

Review Comment:
   Thanks for your reviews, It because this  https://github.com/apache/iceberg/pull/3973#discussion_r794150006 @RussellSpitzer 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083420860


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/MigrateTableProcedure.java:
##########
@@ -99,7 +100,13 @@ public InternalRow[] call(InternalRow args) {
     if (dropBackup) {
       result = migrateTableSparkAction.dropBackup().execute();
     } else {
-      result = migrateTableSparkAction.execute();
+      int parallelism;
+      if (!args.isNullAt(3)) {
+        parallelism = args.getInt(3);
+      } else {
+        parallelism = 1;
+      }

Review Comment:
   > Same nit as below i think ternary assignment works well here
   
   Thanks, I have fixed @amogh-jahagirdar .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083420820


##########
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SnapshotTableProcedure.java:
##########
@@ -93,10 +94,20 @@ public InternalRow[] call(InternalRow args) {
               });
     }
 
+    int parallelism;
+    if (!args.isNullAt(4)) {
+      parallelism = args.getInt(4);
+    } else {
+      parallelism = 1;
+    }

Review Comment:
   > Nit: I think it's a bit cleaner to use ternary here
   > 
   > `int parallelism = args.getInt(4) != null ? args.getInt(4) : 1`
   
   Thanks, I have fixed.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kingeasternsun commented on a diff in pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "kingeasternsun (via GitHub)" <gi...@apache.org>.
kingeasternsun commented on code in PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#discussion_r1083441291


##########
api/src/main/java/org/apache/iceberg/actions/MigrateTable.java:
##########
@@ -50,6 +50,15 @@ default MigrateTable dropBackup() {
     throw new UnsupportedOperationException("Dropping a backup is not supported");
   }
 
+  /**
+   * @param numReaders the number of concurrent file read operations to use per partition
+   * @return this for method chaining
+   */
+  default MigrateTable withParallelReads(int numReaders) {

Review Comment:
   Thanks for your advice,It looks good to me,  but in function `importSparkPartitions` https://github.com/kingeasternsun/iceberg/blob/feature/add-parallelism-add-files/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java#L620-L630 already has a local variable named `parallelism`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on pull request #6624: 🎨 Add "parallelism" parameter to "add_files" syscall and MigrateTable, SnapshotTable.

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on PR #6624:
URL: https://github.com/apache/iceberg/pull/6624#issuecomment-1399009685

   @kingeasternsun A maintainer should take a look when they get a chance. @RussellSpitzer @aokolnychyi @szehon-ho @jackye1995  when you get a chance could you take a look? Thanks! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org