You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/08/10 17:51:09 UTC

[GitHub] [iceberg] ajantha-bhat opened a new pull request, #5492: Core: Support bulk table migration from one catalog to another

ajantha-bhat opened a new pull request, #5492:
URL: https://github.com/apache/iceberg/pull/5492

   To support the migration of tables from one catalog to another catalog seamlessly, we have added [#5037 , #4946].
   But still, bulk migration is hard as the API user needs to write some extra code as `registerTable()` expects the table Identifier and metadata location of each table. 
   Hence, adding a `CatalogUtil` API that can help in bulk migration of tables from one catalog to another.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r993019893


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(sourceCatalog, targetCatalog, tableIdentifier);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates) {
+    Preconditions.checkArgument(
+        maxConcurrentMigrates >= 0,
+        "maxConcurrentMigrates should have value >= 0,  value: " + maxConcurrentMigrates);
+    Preconditions.checkArgument(sourceCatalog != null, "source catalog should not be null");
+    Preconditions.checkArgument(targetCatalog != null, "target catalog should not be null");
+    Preconditions.checkArgument(
+        !targetCatalog.equals(sourceCatalog), "target catalog is same as source catalog");
+  }
+
+  private static void migrate(
+      Catalog sourceCatalog, Catalog targetCatalog, TableIdentifier tableIdentifier) {
+    // register the table to the target catalog
+    TableOperations ops =
+        ((HasTableOperations) sourceCatalog.loadTable(tableIdentifier)).operations();
+    targetCatalog.registerTable(tableIdentifier, ops.current().metadataFileLocation());

Review Comment:
   I think we need to educate the users. Updated the doc. 
   
   Or do you mean adding a flag at the source catalog to throw an error for new requests when migration is in progress?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r996225629


##########
core/src/main/java/org/apache/iceberg/CatalogMigrateUtil.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogMigrateUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogMigrateUtil.class);
+
+  private CatalogMigrateUtil() {}
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * <p>Users must make sure that no in-progress commits on the tables of source catalog during
+   * migration.

Review Comment:
   Yeah. Many users don't read java doc first :)
   
   What is your opinion on having a flag in catalog, when that is set, Catalog can throw an error that migration is in progress and discard in-progress commits?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#issuecomment-1442843915

   Will handle it as a separate project as discussed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r945087226


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }

Review Comment:
   Just like other `CatalogUtil` API, this migrate API was intended to work by not depending on any engines.
   If we need engine-specific implementation, we can have a spark action or flink action later on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] nastra commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
nastra commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r945806266


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(sourceCatalog, targetCatalog, tableIdentifier);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates) {
+    Preconditions.checkArgument(

Review Comment:
   I think there should be tests for all those validation checks



##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(sourceCatalog, targetCatalog, tableIdentifier);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates) {
+    Preconditions.checkArgument(
+        maxConcurrentMigrates >= 0,
+        "maxConcurrentMigrates should have value >= 0,  value: " + maxConcurrentMigrates);
+    Preconditions.checkArgument(sourceCatalog != null, "source catalog should not be null");

Review Comment:
   I think it would be better to update this to `Invalid source catalog: null` in order to align with other error messages. Same for the target catalog check



##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(

Review Comment:
   I would suggest to move this into its own class rather than adding more stuff to `CatalogUtil`. The class can then be independently unit-tested.



##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(sourceCatalog, targetCatalog, tableIdentifier);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates) {
+    Preconditions.checkArgument(
+        maxConcurrentMigrates >= 0,
+        "maxConcurrentMigrates should have value >= 0,  value: " + maxConcurrentMigrates);
+    Preconditions.checkArgument(sourceCatalog != null, "source catalog should not be null");
+    Preconditions.checkArgument(targetCatalog != null, "target catalog should not be null");
+    Preconditions.checkArgument(
+        !targetCatalog.equals(sourceCatalog), "target catalog is same as source catalog");
+  }
+
+  private static void migrate(
+      Catalog sourceCatalog, Catalog targetCatalog, TableIdentifier tableIdentifier) {
+    // register the table to the target catalog
+    TableOperations ops =
+        ((HasTableOperations) sourceCatalog.loadTable(tableIdentifier)).operations();
+    targetCatalog.registerTable(tableIdentifier, ops.current().metadataFileLocation());

Review Comment:
   what would happen if tables would be modified in parallel?



##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(sourceCatalog, targetCatalog, tableIdentifier);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates) {
+    Preconditions.checkArgument(
+        maxConcurrentMigrates >= 0,
+        "maxConcurrentMigrates should have value >= 0,  value: " + maxConcurrentMigrates);
+    Preconditions.checkArgument(sourceCatalog != null, "source catalog should not be null");
+    Preconditions.checkArgument(targetCatalog != null, "target catalog should not be null");
+    Preconditions.checkArgument(
+        !targetCatalog.equals(sourceCatalog), "target catalog is same as source catalog");
+  }
+
+  private static void migrate(
+      Catalog sourceCatalog, Catalog targetCatalog, TableIdentifier tableIdentifier) {
+    // register the table to the target catalog
+    TableOperations ops =
+        ((HasTableOperations) sourceCatalog.loadTable(tableIdentifier)).operations();
+    targetCatalog.registerTable(tableIdentifier, ops.current().metadataFileLocation());
+
+    // drop the table from source catalog
+    if (!(sourceCatalog instanceof HadoopCatalog)) {
+      // HadoopCatalog dropTable will delete the table files completely even when purge is false.
+      // So, skip dropTable for HadoopCatalog.
+      sourceCatalog.dropTable(tableIdentifier, false);

Review Comment:
   I'm not sure we'd want to always drop the table from the source catalog by default after it has been migrated



##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(sourceCatalog, targetCatalog, tableIdentifier);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates) {
+    Preconditions.checkArgument(
+        maxConcurrentMigrates >= 0,
+        "maxConcurrentMigrates should have value >= 0,  value: " + maxConcurrentMigrates);

Review Comment:
   migrates -> migrations maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] kbendick commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
kbendick commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r944967219


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }

Review Comment:
   You'll probably want to add the option to allow the user to pass in an executor service, like is done in several other actions etc.
   
   If this is just a temporary utility -- or I should say isn't intended to be used from Flink ever (which was the motivator for allowing passing in executor services) -- then it might not be an issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r993018318


##########
core/src/main/java/org/apache/iceberg/CatalogUtil.java:
##########
@@ -372,4 +385,94 @@ public static void configureHadoopConf(Object maybeConfigurable, Object conf) {
 
     setConf.invoke(conf);
   }
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrates Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrates) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrates);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrates > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrates);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(sourceCatalog, targetCatalog, tableIdentifier);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrates) {
+    Preconditions.checkArgument(
+        maxConcurrentMigrates >= 0,
+        "maxConcurrentMigrates should have value >= 0,  value: " + maxConcurrentMigrates);
+    Preconditions.checkArgument(sourceCatalog != null, "source catalog should not be null");
+    Preconditions.checkArgument(targetCatalog != null, "target catalog should not be null");
+    Preconditions.checkArgument(
+        !targetCatalog.equals(sourceCatalog), "target catalog is same as source catalog");
+  }
+
+  private static void migrate(
+      Catalog sourceCatalog, Catalog targetCatalog, TableIdentifier tableIdentifier) {
+    // register the table to the target catalog
+    TableOperations ops =
+        ((HasTableOperations) sourceCatalog.loadTable(tableIdentifier)).operations();
+    targetCatalog.registerTable(tableIdentifier, ops.current().metadataFileLocation());
+
+    // drop the table from source catalog
+    if (!(sourceCatalog instanceof HadoopCatalog)) {
+      // HadoopCatalog dropTable will delete the table files completely even when purge is false.
+      // So, skip dropTable for HadoopCatalog.
+      sourceCatalog.dropTable(tableIdentifier, false);

Review Comment:
   Added the flag now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r996140631


##########
core/src/main/java/org/apache/iceberg/CatalogMigrateUtil.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogMigrateUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogMigrateUtil.class);
+
+  private CatalogMigrateUtil() {}
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * <p>Users must make sure that no in-progress commits on the tables of source catalog during
+   * migration.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrations Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @param deleteEntriesFromSourceCatalog If set to true, after successful migration, delete the
+   *     table entry from source catalog. This field is not applicable for HadoopCatalog.
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrations,
+      boolean deleteEntriesFromSourceCatalog) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrations);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrations > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrations);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(
+                    sourceCatalog, targetCatalog, tableIdentifier, deleteEntriesFromSourceCatalog);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrations) {
+    Preconditions.checkArgument(
+        maxConcurrentMigrations >= 0,
+        "maxConcurrentMigrations should have value >= 0,  value: " + maxConcurrentMigrations);
+    Preconditions.checkArgument(sourceCatalog != null, "Invalid source catalog: null");
+    Preconditions.checkArgument(targetCatalog != null, "Invalid target catalog: null");
+    Preconditions.checkArgument(
+        !targetCatalog.equals(sourceCatalog), "target catalog is same as source catalog");
+  }
+
+  private static void migrate(
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      TableIdentifier tableIdentifier,
+      boolean deleteEntriesFromSourceCatalog) {
+    // register the table to the target catalog
+    TableOperations ops =
+        ((HasTableOperations) sourceCatalog.loadTable(tableIdentifier)).operations();

Review Comment:
   I think it's usually better to use `BaseTable` than to use `HasTableOperations`. That way you're not trying to move a metadata table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r996140333


##########
core/src/main/java/org/apache/iceberg/CatalogMigrateUtil.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogMigrateUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogMigrateUtil.class);
+
+  private CatalogMigrateUtil() {}
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * <p>Users must make sure that no in-progress commits on the tables of source catalog during
+   * migration.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrations Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @param deleteEntriesFromSourceCatalog If set to true, after successful migration, delete the
+   *     table entry from source catalog. This field is not applicable for HadoopCatalog.
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrations,
+      boolean deleteEntriesFromSourceCatalog) {
+    validate(sourceCatalog, targetCatalog, maxConcurrentMigrations);
+
+    List<TableIdentifier> identifiers;
+    if (tableIdentifiers == null || tableIdentifiers.isEmpty()) {
+      // fetch all the table identifiers from all the namespaces.
+      List<Namespace> namespaces =
+          (sourceCatalog instanceof SupportsNamespaces)
+              ? ((SupportsNamespaces) sourceCatalog).listNamespaces()
+              : ImmutableList.of(Namespace.empty());
+      identifiers =
+          namespaces.stream()
+              .flatMap(namespace -> sourceCatalog.listTables(namespace).stream())
+              .collect(Collectors.toList());
+    } else {
+      identifiers = tableIdentifiers;
+    }
+
+    ExecutorService executorService = null;
+    if (maxConcurrentMigrations > 0) {
+      executorService = ThreadPools.newWorkerPool("migrate-tables", maxConcurrentMigrations);
+    }
+
+    try {
+      Collection<TableIdentifier> migratedTableIdentifiers = new ConcurrentLinkedQueue<>();
+      Tasks.foreach(identifiers.stream().filter(Objects::nonNull))
+          .retry(3)
+          .stopRetryOn(NoSuchTableException.class, NoSuchNamespaceException.class)
+          .suppressFailureWhenFinished()
+          .executeWith(executorService)
+          .onFailure(
+              (tableIdentifier, exc) ->
+                  LOG.warn("Unable to migrate table {}", tableIdentifier, exc))
+          .run(
+              tableIdentifier -> {
+                migrate(
+                    sourceCatalog, targetCatalog, tableIdentifier, deleteEntriesFromSourceCatalog);
+                migratedTableIdentifiers.add(tableIdentifier);
+              });
+      return migratedTableIdentifiers;
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private static void validate(
+      Catalog sourceCatalog, Catalog targetCatalog, int maxConcurrentMigrations) {
+    Preconditions.checkArgument(
+        maxConcurrentMigrations >= 0,
+        "maxConcurrentMigrations should have value >= 0,  value: " + maxConcurrentMigrations);
+    Preconditions.checkArgument(sourceCatalog != null, "Invalid source catalog: null");
+    Preconditions.checkArgument(targetCatalog != null, "Invalid target catalog: null");
+    Preconditions.checkArgument(
+        !targetCatalog.equals(sourceCatalog), "target catalog is same as source catalog");
+  }
+
+  private static void migrate(
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      TableIdentifier tableIdentifier,

Review Comment:
   This uses a different argument order than `migrateTables`. I generally prefer consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat closed pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat closed pull request #5492: Core: Support bulk table migration from one catalog to another
URL: https://github.com/apache/iceberg/pull/5492


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#issuecomment-1213039273

   Hopefully later we can call this API from python CLI and make the migration more easier. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r996138197


##########
core/src/main/java/org/apache/iceberg/CatalogMigrateUtil.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogMigrateUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogMigrateUtil.class);
+
+  private CatalogMigrateUtil() {}
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * <p>Users must make sure that no in-progress commits on the tables of source catalog during
+   * migration.

Review Comment:
   This seems like a major issue. How many people are going to call this without first shutting down all of the running processes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a diff in pull request #5492: Core: Support bulk table migration from one catalog to another

Posted by GitBox <gi...@apache.org>.
rdblue commented on code in PR #5492:
URL: https://github.com/apache/iceberg/pull/5492#discussion_r996139220


##########
core/src/main/java/org/apache/iceberg/CatalogMigrateUtil.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CatalogMigrateUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(CatalogMigrateUtil.class);
+
+  private CatalogMigrateUtil() {}
+
+  /**
+   * Migrates tables from one catalog(source catalog) to another catalog(target catalog).
+   *
+   * <p>Supports bulk migrations with a multi-thread execution. Once the migration is success, table
+   * would be dropped from the source catalog.
+   *
+   * <p>Users must make sure that no in-progress commits on the tables of source catalog during
+   * migration.
+   *
+   * @param tableIdentifiers a list of {@link TableIdentifier} for the tables required to be
+   *     migrated. If not specified, all the tables would be migrated.
+   * @param sourceCatalog Source {@link Catalog} from which the tables are chosen
+   * @param targetCatalog Target {@link Catalog} to which the tables need to be migrated
+   * @param maxConcurrentMigrations Size of the thread pool used for migrate tables (If set to 0, no
+   *     thread pool is used)
+   * @param deleteEntriesFromSourceCatalog If set to true, after successful migration, delete the
+   *     table entry from source catalog. This field is not applicable for HadoopCatalog.
+   * @return Collection of table identifiers for successfully migrated tables
+   */
+  public static Collection<TableIdentifier> migrateTables(
+      List<TableIdentifier> tableIdentifiers,
+      Catalog sourceCatalog,
+      Catalog targetCatalog,
+      int maxConcurrentMigrations,
+      boolean deleteEntriesFromSourceCatalog) {

Review Comment:
   We generally avoid adding boolean arguments to public methods. It's usually better to use different verbs, like `migrate` (with delete) and `register` (copy).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org