You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/13 00:20:47 UTC

[GitHub] [iceberg] kbendick commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

kbendick commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r944967219


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }

Review Comment:
   You'll probably want to add the option to allow the user to pass in an executor service, like is done in several other actions etc.
   
   If this is just a temporary utility -- or I should say isn't intended to be used from Flink ever (which was the motivator for allowing passing in executor services) -- then it might not be an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org