You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/13 05:34:08 UTC

[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

ajantha-bhat commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r945087226


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }

Review Comment:
   Just like other `CatalogUtil` API, this migrate API was intended to work by not depending on any engines.
   If we need engine-specific implementation, we can have a spark action or flink action later on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org