You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/04/12 07:36:34 UTC

[iceberg] branch master updated: Hive: Configure catalog type on table level. (#2129)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new db8248c  Hive: Configure catalog type on table level. (#2129)
db8248c is described below

commit db8248c16e99c435ff7eed8fa86bc3913af2756a
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Mon Apr 12 09:36:22 2021 +0200

    Hive: Configure catalog type on table level. (#2129)
---
 .../java/org/apache/iceberg/mr/CatalogLoader.java  |  33 -----
 .../main/java/org/apache/iceberg/mr/Catalogs.java  | 158 ++++++++++++++-------
 .../org/apache/iceberg/mr/InputFormatConfig.java   |  11 +-
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       |  10 +-
 .../mr/hive/HiveIcebergOutputCommitter.java        |  10 +-
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  23 ++-
 .../java/org/apache/iceberg/mr/TestCatalogs.java   | 120 ++++++++++++----
 .../apache/iceberg/mr/TestIcebergInputFormats.java |  24 ++--
 .../iceberg/mr/TestInputFormatReaderDeletes.java   |   8 ++
 .../hive/HiveIcebergStorageHandlerTestUtils.java   |  15 +-
 .../mr/hive/TestHiveIcebergOutputCommitter.java    |  10 +-
 .../iceberg/mr/hive/TestHiveIcebergSerDe.java      |   3 +
 .../TestHiveIcebergStorageHandlerLocalScan.java    |  13 +-
 .../hive/TestHiveIcebergStorageHandlerNoScan.java  | 136 ++++++++++++------
 ...eIcebergStorageHandlerWithMultipleCatalogs.java | 143 +++++++++++++++++++
 .../org/apache/iceberg/mr/hive/TestTables.java     |  97 ++++++++-----
 16 files changed, 590 insertions(+), 224 deletions(-)

diff --git a/mr/src/main/java/org/apache/iceberg/mr/CatalogLoader.java b/mr/src/main/java/org/apache/iceberg/mr/CatalogLoader.java
deleted file mode 100644
index 0d15083..0000000
--- a/mr/src/main/java/org/apache/iceberg/mr/CatalogLoader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.mr;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
-
-public interface CatalogLoader {
-
-  /**
-   * Load and return a {@link Catalog} specified by the configuration.
-   * @param conf a Hadoop conf
-   * @return a {@link Catalog} instance
-   */
-  Catalog load(Configuration conf);
-}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
index c60bc47..0ff9b09 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -24,7 +24,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionSpecParser;
@@ -33,17 +35,12 @@ import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.common.DynConstructors;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
-import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 
 /**
  * Class for catalog resolution and accessing the common functions for {@link Catalog} API.
@@ -56,16 +53,20 @@ import org.slf4j.LoggerFactory;
  * </ol>
  */
 public final class Catalogs {
-  private static final Logger LOG = LoggerFactory.getLogger(Catalogs.class);
 
-  private static final String HADOOP = "hadoop";
-  private static final String HIVE = "hive";
+  public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg";
+  public static final String ICEBERG_HADOOP_TABLE_NAME = "location_based_table";
+
+  private static final String HIVE_CATALOG_TYPE = "hive";
+  private static final String HADOOP_CATALOG_TYPE = "hadoop";
+  private static final String NO_CATALOG_TYPE = "no catalog";
 
   public static final String NAME = "name";
   public static final String LOCATION = "location";
 
   private static final Set<String> PROPERTIES_TO_REMOVE =
-      ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME);
+      ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME,
+              InputFormatConfig.CATALOG_NAME);
 
   private Catalogs() {
   }
@@ -76,14 +77,15 @@ public final class Catalogs {
    * @return an Iceberg table
    */
   public static Table loadTable(Configuration conf) {
-    return loadTable(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER), conf.get(InputFormatConfig.TABLE_LOCATION));
+    return loadTable(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER), conf.get(InputFormatConfig.TABLE_LOCATION),
+            conf.get(InputFormatConfig.CATALOG_NAME));
   }
 
   /**
    * Load an Iceberg table using the catalog specified by the configuration.
    * <p>
-   * The table identifier ({@link Catalogs#NAME}) or table path ({@link Catalogs#LOCATION}) should be specified by
-   * the controlling properties.
+   * The table identifier ({@link Catalogs#NAME}) and the catalog name ({@link InputFormatConfig#CATALOG_NAME}),
+   * or table path ({@link Catalogs#LOCATION}) should be specified by the controlling properties.
    * <p>
    * Used by HiveIcebergSerDe and HiveIcebergStorageHandler
    * @param conf a Hadoop
@@ -91,11 +93,13 @@ public final class Catalogs {
    * @return an Iceberg table
    */
   public static Table loadTable(Configuration conf, Properties props) {
-    return loadTable(conf, props.getProperty(NAME), props.getProperty(LOCATION));
+    return loadTable(conf, props.getProperty(NAME), props.getProperty(LOCATION),
+            props.getProperty(InputFormatConfig.CATALOG_NAME));
   }
 
-  private static Table loadTable(Configuration conf, String tableIdentifier, String tableLocation) {
-    Optional<Catalog> catalog = loadCatalog(conf);
+  private static Table loadTable(Configuration conf, String tableIdentifier, String tableLocation,
+                                 String catalogName) {
+    Optional<Catalog> catalog = loadCatalog(conf, catalogName);
 
     if (catalog.isPresent()) {
       Preconditions.checkArgument(tableIdentifier != null, "Table identifier not set");
@@ -134,6 +138,7 @@ public final class Catalogs {
     }
 
     String location = props.getProperty(LOCATION);
+    String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
 
     // Create a table property map without the controlling properties
     Map<String, String> map = new HashMap<>(props.size());
@@ -143,7 +148,7 @@ public final class Catalogs {
       }
     }
 
-    Optional<Catalog> catalog = loadCatalog(conf);
+    Optional<Catalog> catalog = loadCatalog(conf, catalogName);
 
     if (catalog.isPresent()) {
       String name = props.getProperty(NAME);
@@ -166,8 +171,9 @@ public final class Catalogs {
    */
   public static boolean dropTable(Configuration conf, Properties props) {
     String location = props.getProperty(LOCATION);
+    String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
 
-    Optional<Catalog> catalog = loadCatalog(conf);
+    Optional<Catalog> catalog = loadCatalog(conf, catalogName);
 
     if (catalog.isPresent()) {
       String name = props.getProperty(NAME);
@@ -182,48 +188,100 @@ public final class Catalogs {
   /**
    * Returns true if HiveCatalog is used
    * @param conf a Hadoop conf
+   * @param props the controlling properties
    * @return true if the Catalog is HiveCatalog
    */
-  public static boolean hiveCatalog(Configuration conf) {
-    return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+  public static boolean hiveCatalog(Configuration conf, Properties props) {
+    String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
+    return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(getCatalogType(conf, catalogName));
   }
 
   @VisibleForTesting
-  static Optional<Catalog> loadCatalog(Configuration conf) {
-    String catalogLoaderClass = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
-    if (catalogLoaderClass != null) {
-      CatalogLoader loader = (CatalogLoader) DynConstructors.builder(CatalogLoader.class)
-              .impl(catalogLoaderClass)
-              .build()
-              .newInstance();
-      Catalog catalog = loader.load(conf);
-      LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
-      return Optional.of(catalog);
+  static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) {
+    String catalogType = getCatalogType(conf, catalogName);
+    if (catalogType == null) {
+      throw new NoSuchNamespaceException("Catalog definition for %s is not found.", catalogName);
+    }
+
+    if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) {
+      return Optional.empty();
+    } else {
+      String name = catalogName == null ? ICEBERG_DEFAULT_CATALOG_NAME : catalogName;
+      return Optional.of(CatalogUtil.buildIcebergCatalog(name,
+              getCatalogProperties(conf, name, catalogType), conf));
     }
+  }
+
+  /**
+   * Collect all the catalog specific configuration from the global hive configuration.
+   * @param conf a Hadoop configuration
+   * @param catalogName name of the catalog
+   * @param catalogType type of the catalog
+   * @return complete map of catalog properties
+   */
+  private static Map<String, String> getCatalogProperties(Configuration conf, String catalogName, String catalogType) {
+    String keyPrefix = InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName;
+    Map<String, String> catalogProperties = Streams.stream(conf.iterator())
+            .filter(e -> e.getKey().startsWith(keyPrefix))
+            .collect(Collectors.toMap(e -> e.getKey().substring(keyPrefix.length() + 1), Map.Entry::getValue));
+    return addCatalogPropertiesIfMissing(conf, catalogType, catalogProperties);
+  }
 
-    String catalogName = conf.get(InputFormatConfig.CATALOG);
+  /**
+   * This method is used for backward-compatible catalog configuration.
+   * Collect all the catalog specific configuration from the global hive configuration.
+   * Note: this should be removed when the old catalog configuration is depracated.
+   * @param conf global hive configuration
+   * @param catalogType type of the catalog
+   * @param catalogProperties pre-populated catalog properties
+   * @return complete map of catalog properties
+   */
+  private static Map<String, String> addCatalogPropertiesIfMissing(Configuration conf, String catalogType,
+                                                                   Map<String, String> catalogProperties) {
+    catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE, catalogType);
+    if (catalogType.equalsIgnoreCase(HADOOP_CATALOG_TYPE)) {
+      catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION,
+              conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION));
+    }
+    return catalogProperties;
+  }
 
+  /**
+   * Return the catalog type based on the catalog name.
+   * <p>
+   * If the catalog name is provided get the catalog type from 'iceberg.catalog.<code>catalogName</code>.type' config.
+   * In case the value of this property is null, return with no catalog definition (Hadoop Table)
+   * </p>
+   * <p>
+   * If catalog name is null, check the global conf for 'iceberg.mr.catalog' property. If the value of the property is:
+   * <ul>
+   *     <li>null/hive -> Hive Catalog</li>
+   *     <li>location -> Hadoop Table</li>
+   *     <li>hadoop -> Hadoop Catalog</li>
+   *     <li>any other value -> Custom Catalog</li>
+   * </ul>
+   * </p>
+   * @param conf global hive configuration
+   * @param catalogName name of the catalog
+   * @return type of the catalog, can be null
+   */
+  private static String getCatalogType(Configuration conf, String catalogName) {
     if (catalogName != null) {
-      Catalog catalog;
-      switch (catalogName.toLowerCase()) {
-        case HADOOP:
-          String warehouseLocation = conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
-
-          catalog = (warehouseLocation != null) ? new HadoopCatalog(conf, warehouseLocation) : new HadoopCatalog(conf);
-          LOG.info("Loaded Hadoop catalog {}", catalog);
-          return Optional.of(catalog);
-        case HIVE:
-          catalog = CatalogUtil.loadCatalog(HiveCatalog.class.getName(), CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
-                  ImmutableMap.of(), conf);
-          LOG.info("Loaded Hive Metastore catalog {}", catalog);
-          return Optional.of(catalog);
-        default:
-          throw new NoSuchNamespaceException("Catalog %s is not supported.", catalogName);
+      String catalogType = conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName));
+      if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME) || catalogType == null) {
+        return NO_CATALOG_TYPE;
+      } else {
+        return catalogType;
+      }
+    } else {
+      String catalogType = conf.get(InputFormatConfig.CATALOG);
+      if (catalogType == null) {
+        return HIVE_CATALOG_TYPE;
+      } else if (catalogType.equals(LOCATION)) {
+        return NO_CATALOG_TYPE;
+      } else {
+        return catalogType;
       }
     }
-
-    LOG.info("Catalog is not configured");
-    return Optional.empty();
   }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 38e6606..9521bf0 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -47,6 +47,7 @@ public class InputFormatConfig {
   public static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
   public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";
   public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table.";
+  public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog.";
   public static final String LOCALITY = "iceberg.mr.locality";
   public static final String CATALOG = "iceberg.mr.catalog";
   public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location";
@@ -72,6 +73,11 @@ public class InputFormatConfig {
   public static final String SNAPSHOT_TABLE = "iceberg.snapshots.table";
   public static final String SNAPSHOT_TABLE_SUFFIX = "__snapshots";
 
+  public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
+  public static final String CATALOG_TYPE_TEMPLATE = "iceberg.catalog.%s.type";
+  public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
+  public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
+
   public enum InMemoryDataModel {
     PIG,
     HIVE,
@@ -162,11 +168,6 @@ public class InputFormatConfig {
       return this;
     }
 
-    public ConfigBuilder catalogLoader(Class<? extends CatalogLoader> catalogLoader) {
-      conf.setClass(CATALOG_LOADER_CLASS, catalogLoader, CatalogLoader.class);
-      return this;
-    }
-
     public ConfigBuilder useHiveRows() {
       conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.HIVE.name());
       return this;
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 4ade847..438ca87 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -80,7 +80,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
         BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
 
-    if (!Catalogs.hiveCatalog(conf)) {
+    if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
       // For non-HiveCatalog tables too, we should set the input and output format
       // so that the table can be read by other engines like Impala
       hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
@@ -125,7 +125,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     }
 
     // If the table is not managed by Hive catalog then the location should be set
-    if (!Catalogs.hiveCatalog(conf)) {
+    if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
       Preconditions.checkArgument(hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null,
           "Table location not set");
     }
@@ -142,7 +142,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   @Override
   public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
     if (icebergTable == null) {
-      if (Catalogs.hiveCatalog(conf)) {
+      if (Catalogs.hiveCatalog(conf, catalogProperties)) {
         catalogProperties.put(TableProperties.ENGINE_HIVE_ENABLED, true);
       }
 
@@ -156,7 +156,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
     this.deleteIcebergTable = hmsTable.getParameters() != null &&
         "TRUE".equalsIgnoreCase(hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
 
-    if (deleteIcebergTable && Catalogs.hiveCatalog(conf)) {
+    if (deleteIcebergTable && Catalogs.hiveCatalog(conf, catalogProperties)) {
       // Store the metadata and the id for deleting the actual table data
       String metadataLocation = hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
       this.deleteIo = Catalogs.loadTable(conf, catalogProperties).io();
@@ -173,7 +173,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
   public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean deleteData) {
     if (deleteData && deleteIcebergTable) {
       try {
-        if (!Catalogs.hiveCatalog(conf)) {
+        if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
           LOG.info("Dropping with purge all the data for table {}.{}", hmsTable.getDbName(), hmsTable.getTableName());
           Catalogs.dropTable(conf, catalogProperties);
         } else {
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 3479b36..bf298ad 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -170,8 +170,9 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
           .executeWith(tableExecutor)
           .run(output -> {
             Table table = HiveIcebergStorageHandler.table(jobConf, output);
+            String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
             jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
-            commitTable(table.io(), fileExecutor, jobContext, output, table.location());
+            commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
           });
     } finally {
       fileExecutor.shutdown();
@@ -245,12 +246,17 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
    * @param jobContext The job context
    * @param name The name of the table used for loading from the catalog
    * @param location The location of the table used for loading from the catalog
+   * @param catalogName The name of the catalog that contains the table
    */
-  private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location) {
+  private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location,
+                           String catalogName) {
     JobConf conf = jobContext.getJobConf();
     Properties catalogProperties = new Properties();
     catalogProperties.put(Catalogs.NAME, name);
     catalogProperties.put(Catalogs.LOCATION, location);
+    if (catalogName != null) {
+      catalogProperties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+    }
     Table table = Catalogs.loadTable(conf, catalogProperties);
 
     long startTime = System.currentTimeMillis();
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 015bcb0..f51259e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -113,13 +113,18 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
     if (tableDesc != null && tableDesc.getProperties() != null &&
         tableDesc.getProperties().get(WRITE_KEY) != null) {
-      Preconditions.checkArgument(!tableDesc.getTableName().contains(TABLE_NAME_SEPARATOR),
-          "Can not handle table " + tableDesc.getTableName() + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
+      String tableName = tableDesc.getTableName();
+      Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR),
+          "Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
       String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
-      tables = tables == null ? tableDesc.getTableName() : tables + TABLE_NAME_SEPARATOR + tableDesc.getTableName();
-
+      tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName;
       jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
       jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
+
+      String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME);
+      if (catalogName != null) {
+        jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName);
+      }
     }
   }
 
@@ -172,6 +177,16 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   }
 
   /**
+   * Returns the catalog name serialized to the configuration.
+   * @param config The configuration used to get the data from
+   * @param name The name of the table we neeed as returned by TableDesc.getTableName()
+   * @return catalog name
+   */
+  public static String catalogName(Configuration config, String name) {
+    return config.get(InputFormatConfig.TABLE_CATALOG_PREFIX + name);
+  }
+
+  /**
    * Returns the Table Schema serialized to the configuration.
    * @param config The configuration used to get the data from
    * @return The Table Schema object
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
index 3412ad5..04168eb 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
@@ -25,6 +25,7 @@ import java.util.Optional;
 import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
@@ -32,7 +33,6 @@ import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
@@ -63,6 +63,7 @@ public class TestCatalogs {
 
   @Test
   public void testLoadTableFromLocation() throws IOException {
+    conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
     AssertHelpers.assertThrows(
             "Should complain about table location not set", IllegalArgumentException.class,
             "location not set", () -> Catalogs.loadTable(conf));
@@ -77,14 +78,15 @@ public class TestCatalogs {
 
   @Test
   public void testLoadTableFromCatalog() throws IOException {
-    conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString());
-    conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalogLoader.class, CatalogLoader.class);
+    String defaultCatalogName = "default";
+    String warehouseLocation = temp.newFolder("hadoop", "warehouse").toString();
+    setCustomCatalogProperties(defaultCatalogName, warehouseLocation);
 
     AssertHelpers.assertThrows(
             "Should complain about table identifier not set", IllegalArgumentException.class,
             "identifier not set", () -> Catalogs.loadTable(conf));
 
-    HadoopCatalog catalog = new CustomHadoopCatalog(conf);
+    HadoopCatalog catalog = new CustomHadoopCatalog(conf, warehouseLocation);
     Table hadoopCatalogTable = catalog.createTable(TableIdentifier.of("table"), SCHEMA);
 
     conf.set(InputFormatConfig.TABLE_IDENTIFIER, "table");
@@ -100,6 +102,7 @@ public class TestCatalogs {
         "Should complain about table schema not set", NullPointerException.class,
         "schema not set", () -> Catalogs.createTable(conf, missingSchema));
 
+    conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
     Properties missingLocation = new Properties();
     missingLocation.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA));
     AssertHelpers.assertThrows(
@@ -138,18 +141,21 @@ public class TestCatalogs {
   @Test
   public void testCreateDropTableToCatalog() throws IOException {
     TableIdentifier identifier = TableIdentifier.of("test", "table");
+    String defaultCatalogName = "default";
+    String warehouseLocation = temp.newFolder("hadoop", "warehouse").toString();
 
-    conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString());
-    conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalogLoader.class, CatalogLoader.class);
+    setCustomCatalogProperties(defaultCatalogName, warehouseLocation);
 
     Properties missingSchema = new Properties();
     missingSchema.put("name", identifier.toString());
+    missingSchema.put(InputFormatConfig.CATALOG_NAME, defaultCatalogName);
     AssertHelpers.assertThrows(
         "Should complain about table schema not set", NullPointerException.class,
         "schema not set", () -> Catalogs.createTable(conf, missingSchema));
 
     Properties missingIdentifier = new Properties();
     missingIdentifier.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA));
+    missingIdentifier.put(InputFormatConfig.CATALOG_NAME, defaultCatalogName);
     AssertHelpers.assertThrows(
         "Should complain about table identifier not set", NullPointerException.class,
         "identifier not set", () -> Catalogs.createTable(conf, missingIdentifier));
@@ -159,10 +165,11 @@ public class TestCatalogs {
     properties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA));
     properties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC));
     properties.put("dummy", "test");
+    properties.put(InputFormatConfig.CATALOG_NAME, defaultCatalogName);
 
     Catalogs.createTable(conf, properties);
 
-    HadoopCatalog catalog = new CustomHadoopCatalog(conf);
+    HadoopCatalog catalog = new CustomHadoopCatalog(conf, warehouseLocation);
     Table table = catalog.loadTable(identifier);
 
     Assert.assertEquals(SchemaParser.toJson(SCHEMA), SchemaParser.toJson(table.schema()));
@@ -175,6 +182,7 @@ public class TestCatalogs {
 
     Properties dropProperties = new Properties();
     dropProperties.put("name", identifier.toString());
+    dropProperties.put(InputFormatConfig.CATALOG_NAME, defaultCatalogName);
     Catalogs.dropTable(conf, dropProperties);
 
     AssertHelpers.assertThrows(
@@ -184,50 +192,108 @@ public class TestCatalogs {
 
   @Test
   public void testLoadCatalog() throws IOException {
-    Assert.assertFalse(Catalogs.loadCatalog(conf).isPresent());
+    conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
+    Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
 
-    conf.set(InputFormatConfig.CATALOG, "foo");
+    String nonExistentCatalogType = "fooType";
+
+    conf.set(InputFormatConfig.CATALOG, nonExistentCatalogType);
     AssertHelpers.assertThrows(
-            "Should complain about catalog not supported", NoSuchNamespaceException.class,
-            "is not supported", () -> Catalogs.loadCatalog(conf));
+            "should complain about catalog not supported", UnsupportedOperationException.class,
+            "Unknown catalog type", () -> Catalogs.loadCatalog(conf, null));
 
-    conf.set(InputFormatConfig.CATALOG, "hadoop");
-    Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf);
+    conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation");
+    Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, null);
 
     Assert.assertTrue(hadoopCatalog.isPresent());
     Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
 
-    conf.set(InputFormatConfig.CATALOG, "hive");
-    Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf);
+    conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+    Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null);
 
     Assert.assertTrue(hiveCatalog.isPresent());
     Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
 
-    conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString());
-    conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalogLoader.class, CatalogLoader.class);
-    Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf);
+    conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
+    Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
+
+    // arbitrary catalog name with non existent catalog type
+    String catalogName = "barCatalog";
+    conf.unset(InputFormatConfig.CATALOG);
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), nonExistentCatalogType);
+    AssertHelpers.assertThrows(
+            "should complain about catalog not supported", UnsupportedOperationException.class,
+            "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, catalogName));
+
+    // arbitrary catalog name with hadoop catalog type and default warehouse location
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName),
+            CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+
+    Assert.assertTrue(hadoopCatalog.isPresent());
+    Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
+
+    // arbitrary catalog name with hadoop catalog type and provided warehouse location
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName),
+            CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalogName), "/tmp/mylocation");
+    hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+
+    Assert.assertTrue(hadoopCatalog.isPresent());
+    Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
+    Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString());
+
+    // arbitrary catalog name with hive catalog type
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName),
+            CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+    hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
+
+    Assert.assertTrue(hiveCatalog.isPresent());
+    Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
+
+    // arbitrary catalog name with custom catalog type without specific classloader
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom");
+    AssertHelpers.assertThrows(
+            "should complain about catalog not supported", UnsupportedOperationException.class,
+            "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, catalogName));
+
+    // arbitrary catalog name with custom catalog type and provided classloader
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom");
+    conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalogName), CustomHadoopCatalog.class.getName());
+    Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
 
     Assert.assertTrue(customHadoopCatalog.isPresent());
     Assert.assertTrue(customHadoopCatalog.get() instanceof CustomHadoopCatalog);
+
+    // arbitrary catalog name with location catalog type
+    conf.unset(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName));
+    Assert.assertFalse(Catalogs.loadCatalog(conf, catalogName).isPresent());
+
+    // default catalog configuration
+    conf.unset(InputFormatConfig.CATALOG);
+    hiveCatalog = Catalogs.loadCatalog(conf, null);
+
+    Assert.assertTrue(hiveCatalog.isPresent());
+    Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
   }
 
   public static class CustomHadoopCatalog extends HadoopCatalog {
 
-    public static final String WAREHOUSE_LOCATION = "warehouse.location";
+    public CustomHadoopCatalog() {
+
+    }
 
     public CustomHadoopCatalog(Configuration conf, String warehouseLocation) {
       super(conf, warehouseLocation);
     }
 
-    public CustomHadoopCatalog(Configuration conf) {
-      this(conf, conf.get(WAREHOUSE_LOCATION));
-    }
   }
 
-  public static class CustomHadoopCatalogLoader implements CatalogLoader {
-    @Override
-    public Catalog load(Configuration conf) {
-      return new CustomHadoopCatalog(conf);
-    }
+  private void setCustomCatalogProperties(String catalogName, String warehouseLocation) {
+    conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalogName), warehouseLocation);
+    conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalogName), CustomHadoopCatalog.class.getName());
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom");
+    conf.set(InputFormatConfig.CATALOG_NAME, catalogName);
   }
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index 4804435..6f3cbee 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
@@ -103,6 +104,7 @@ public class TestIcebergInputFormats {
   @Before
   public void before() throws IOException {
     conf = new Configuration();
+    conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
     HadoopTables tables = new HadoopTables(conf);
 
     File location = temp.newFolder(testInputFormat.name(), fileFormat.name());
@@ -344,18 +346,17 @@ public class TestIcebergInputFormats {
     }
   }
 
-  public static class HadoopCatalogLoader implements CatalogLoader {
-    @Override
-    public Catalog load(Configuration conf) {
-      return new HadoopCatalog(conf, conf.get("warehouse.location"));
-    }
-  }
-
   @Test
   public void testCustomCatalog() throws IOException {
-    conf.set("warehouse.location", temp.newFolder("hadoop_catalog").getAbsolutePath());
-
-    Catalog catalog = new HadoopCatalogLoader().load(conf);
+    String warehouseLocation = temp.newFolder("hadoop_catalog").getAbsolutePath();
+    conf.set("warehouse.location", warehouseLocation);
+    conf.set(InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
+    conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME),
+            CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME),
+            warehouseLocation);
+
+    Catalog catalog = new HadoopCatalog(conf, conf.get("warehouse.location"));
     TableIdentifier identifier = TableIdentifier.of("db", "t");
     Table table = catalog.createTable(identifier, SCHEMA, SPEC, helper.properties());
     helper.setTable(table);
@@ -364,8 +365,7 @@ public class TestIcebergInputFormats {
     expectedRecords.get(0).set(2, "2020-03-20");
     helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
 
-    builder.catalogLoader(HadoopCatalogLoader.class)
-           .readFrom(identifier);
+    builder.readFrom(identifier);
 
     testInputFormat.create(builder.conf()).validate(expectedRecords);
   }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
index d41acb7..ef964f8 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.data.DeleteReadTests;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.util.StructLikeSet;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -60,6 +61,13 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests {
     };
   }
 
+  @Before
+  @Override
+  public void writeTestDataFile() throws IOException {
+    conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
+    super.writeTestDataFile();
+  }
+
   public TestInputFormatReaderDeletes(String inputFormat, FileFormat fileFormat) {
     this.inputFormat = inputFormat;
     this.fileFormat = fileFormat;
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
index 6a2db8b..b08673c 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
@@ -20,11 +20,13 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.OrcConf;
@@ -59,9 +61,14 @@ public class HiveIcebergStorageHandlerTestUtils {
   }
 
   static TestHiveShell shell() {
+    return shell(Collections.emptyMap());
+  }
+
+  static TestHiveShell shell(Map<String, String> configs) {
     TestHiveShell shell = new TestHiveShell();
     shell.setHiveConfValue("hive.notification.event.poll.interval", "-1");
     shell.setHiveConfValue("hive.tez.exec.print.summary", "true");
+    configs.forEach((k, v) -> shell.setHiveConfValue(k, v));
     // We would like to make sure that ORC reading overrides this config, so reading Iceberg tables could work in
     // systems (like Hive 3.2 and higher) where this value is set to true explicitly.
     shell.setHiveConfValue(OrcConf.FORCE_POSITIONAL_EVOLUTION.getHiveConfName(), "true");
@@ -70,9 +77,13 @@ public class HiveIcebergStorageHandlerTestUtils {
   }
 
   static TestTables testTables(TestHiveShell shell, TestTables.TestTableType testTableType, TemporaryFolder temp)
-      throws IOException {
+          throws IOException {
+    return testTables(shell, testTableType, temp, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
+  }
 
-    return testTableType.instance(shell.metastore().hiveConf(), temp);
+  static TestTables testTables(TestHiveShell shell, TestTables.TestTableType testTableType, TemporaryFolder temp,
+                               String catalogName) throws IOException {
+    return testTableType.instance(shell.metastore().hiveConf(), temp, catalogName);
   }
 
   static void init(TestHiveShell shell, TestTables testTables, TemporaryFolder temp, String engine) {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 1f5466b..af8c383 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SerializationUtil;
@@ -108,6 +109,7 @@ public class TestHiveIcebergOutputCommitter {
   public void testSuccessfulUnpartitionedWrite() throws IOException {
     HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
     Table table = table(temp.getRoot().getPath(), false);
+
     JobConf conf = jobConf(table, 1);
     List<Record> expected = writeRecords(table.name(), 1, 0, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
@@ -217,7 +219,9 @@ public class TestHiveIcebergOutputCommitter {
 
   private Table table(String location, boolean partitioned) {
     HadoopTables tables = new HadoopTables();
-    return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(), location);
+
+    return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(),
+            ImmutableMap.of(InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_HADOOP_TABLE_NAME), location);
   }
 
   private JobConf jobConf(Table table, int taskNum) {
@@ -226,6 +230,8 @@ public class TestHiveIcebergOutputCommitter {
     conf.setNumReduceTasks(0);
     conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID);
     conf.set(InputFormatConfig.OUTPUT_TABLES, table.name());
+    conf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + table.name(),
+            table.properties().get(InputFormatConfig.CATALOG_NAME));
     conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + table.name(), SerializationUtil.serializeToBase64(table));
 
     Map<String, String> propMap = Maps.newHashMap();
@@ -233,6 +239,8 @@ public class TestHiveIcebergOutputCommitter {
     tableDesc.setProperties(new Properties());
     tableDesc.getProperties().setProperty(Catalogs.NAME, table.name());
     tableDesc.getProperties().setProperty(Catalogs.LOCATION, table.location());
+    tableDesc.getProperties().setProperty(InputFormatConfig.CATALOG_NAME, table.properties()
+            .get(InputFormatConfig.CATALOG_NAME));
     HiveIcebergStorageHandler.overlayTableProperties(conf, tableDesc, propMap);
     propMap.forEach((key, value) -> conf.set(key, value));
     return conf;
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSerDe.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSerDe.java
index d47c1d6..7e043a8 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSerDe.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSerDe.java
@@ -28,6 +28,8 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.data.RandomGenericData;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.types.Types;
@@ -54,6 +56,7 @@ public class TestHiveIcebergSerDe {
 
     Properties properties = new Properties();
     properties.setProperty("location", location.toString());
+    properties.setProperty(InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_HADOOP_TABLE_NAME);
 
     HadoopTables tables = new HadoopTables(conf);
     tables.create(schema, location.toString());
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
index d6d4244..97c2c31 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.TestHelpers.Row;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestHelper;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -250,7 +251,8 @@ public class TestHiveIcebergStorageHandlerLocalScan {
         " (customer_id BIGINT, first_name STRING COMMENT 'This is first name', " +
         "last_name STRING COMMENT 'This is last name')" +
         " STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-        testTables.locationForCreateTableSQL(identifier);
+        testTables.locationForCreateTableSQL(identifier) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of());
     runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
         PartitionSpec.unpartitioned(), data);
   }
@@ -268,7 +270,8 @@ public class TestHiveIcebergStorageHandlerLocalScan {
         " (customer_id BIGINT, first_name STRING COMMENT 'This is first name') " +
         "PARTITIONED BY (last_name STRING COMMENT 'This is last name') STORED BY " +
          "'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-        testTables.locationForCreateTableSQL(identifier);
+        testTables.locationForCreateTableSQL(identifier) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of());
     runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
   }
 
@@ -286,7 +289,8 @@ public class TestHiveIcebergStorageHandlerLocalScan {
         testTables.locationForCreateTableSQL(identifier) +
         "TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " +
         "'" + InputFormatConfig.TABLE_SCHEMA + "'='" +
-        SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "')";
+        SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', "  +
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')";
     runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
   }
 
@@ -303,7 +307,8 @@ public class TestHiveIcebergStorageHandlerLocalScan {
         "PARTITIONED BY (first_name STRING COMMENT 'This is first name', " +
         "last_name STRING COMMENT 'This is last name') " +
         "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-        testTables.locationForCreateTableSQL(identifier);
+        testTables.locationForCreateTableSQL(identifier) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of());
     runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
   }
 
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 94e1e29..9ace489 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
@@ -164,7 +165,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
         SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
         "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
         PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
-        "'dummy'='test')");
+        "'dummy'='test', " +
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
 
     // Check the Iceberg table data
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -172,7 +174,12 @@ public class TestHiveIcebergStorageHandlerNoScan {
         icebergTable.schema().asStruct());
     Assert.assertEquals(PartitionSpec.unpartitioned(), icebergTable.spec());
 
-    if (!Catalogs.hiveCatalog(shell.getHiveConf())) {
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
+    Properties tableProperties = new Properties();
+    hmsTable.getParameters().entrySet().stream()
+              .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
+              .forEach(e -> tableProperties.put(e.getKey(), e.getValue()));
+    if (!Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
       shell.executeStatement("DROP TABLE customers");
 
       // Check if the table was really dropped even from the Catalog
@@ -182,7 +189,6 @@ public class TestHiveIcebergStorageHandlerNoScan {
           }
       );
     } else {
-      org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
       Path hmsTableLocation = new Path(hmsTable.getSd().getLocation());
 
       // Drop the table
@@ -207,6 +213,33 @@ public class TestHiveIcebergStorageHandlerNoScan {
   }
 
   @Test
+  public void testCreateDropTableNonDefaultCatalog() throws TException, InterruptedException {
+    TableIdentifier identifier = TableIdentifier.of("default", "customers");
+    String catalogName = "nondefaultcatalog";
+    testTables.properties().entrySet()
+        .forEach(e -> shell.setHiveSessionValue(e.getKey().replace(testTables.catalog, catalogName), e.getValue()));
+    String createSql = "CREATE EXTERNAL TABLE " + identifier +
+        " (customer_id BIGINT, first_name STRING COMMENT 'This is first name'," +
+        " last_name STRING COMMENT 'This is last name')" +
+        " STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
+        testTables.locationForCreateTableSQL(identifier) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of());
+    shell.executeStatement(createSql);
+
+    Table icebergTable = testTables.loadTable(identifier);
+    Assert.assertEquals(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA.asStruct(),
+        icebergTable.schema().asStruct());
+
+    shell.executeStatement("DROP TABLE default.customers");
+    // Check if the table was really dropped even from the Catalog
+    AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class,
+        "Table does not exist", () -> {
+          testTables.loadTable(identifier);
+        }
+    );
+  }
+
+  @Test
   public void testCreateTableWithoutSpec() {
     TableIdentifier identifier = TableIdentifier.of("default", "customers");
 
@@ -214,7 +247,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
         testTables.locationForCreateTableSQL(identifier) +
         "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
-        SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "')");
+        SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" +
+        InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
 
     // Check the Iceberg table partition data
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -224,7 +258,6 @@ public class TestHiveIcebergStorageHandlerNoScan {
   @Test
   public void testCreateTableWithUnpartitionedSpec() {
     TableIdentifier identifier = TableIdentifier.of("default", "customers");
-
     // We need the location for HadoopTable based tests only
     shell.executeStatement("CREATE EXTERNAL TABLE customers " +
         "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
@@ -232,7 +265,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
         SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
         "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
-        PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "')");
+        PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
 
     // Check the Iceberg table partition data
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -248,16 +282,21 @@ public class TestHiveIcebergStorageHandlerNoScan {
         testTables.locationForCreateTableSQL(identifier) +
         "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
         SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
-        "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE')");
+        "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE', " +
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
 
-    if (!Catalogs.hiveCatalog(shell.getHiveConf())) {
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
+    Properties tableProperties = new Properties();
+    hmsTable.getParameters().entrySet().stream()
+            .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
+            .forEach(e -> tableProperties.put(e.getKey(), e.getValue()));
+    if (!Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
       shell.executeStatement("DROP TABLE customers");
 
       // Check if the table remains
       testTables.loadTable(identifier);
     } else {
       // Check the HMS table parameters
-      org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
       Path hmsTableLocation = new Path(hmsTable.getSd().getLocation());
 
       // Drop the table
@@ -287,7 +326,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
           shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
               "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
               testTables.locationForCreateTableSQL(identifier) +
-              "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema')");
+              "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema'" +
+              ",'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
         }
     );
 
@@ -296,7 +336,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "Please provide ", () -> {
           shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
               "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-              testTables.locationForCreateTableSQL(identifier));
+              testTables.locationForCreateTableSQL(identifier) +
+              testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
         }
     );
 
@@ -307,7 +348,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
             shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
                 "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
                 "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
-                SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "')");
+                SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" +
+                InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
           }
       );
     }
@@ -319,21 +361,23 @@ public class TestHiveIcebergStorageHandlerNoScan {
     testTables.createIcebergTable(shell.getHiveConf(), "customers", COMPLEX_SCHEMA, FileFormat.PARQUET,
         Collections.emptyList());
 
-    if (Catalogs.hiveCatalog(shell.getHiveConf())) {
+    if (testTableType == TestTables.TestTableType.HIVE_CATALOG) {
       // In HiveCatalog we just expect an exception since the table is already exists
       AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
           "customers already exists", () -> {
             shell.executeStatement("CREATE EXTERNAL TABLE customers " +
                 "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
                 "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
-                SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "')");
+                SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "',' " +
+                InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
           }
       );
     } else {
       // With other catalogs, table creation should succeed
       shell.executeStatement("CREATE EXTERNAL TABLE customers " +
           "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-          testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")));
+          testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")) +
+          testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     }
   }
 
@@ -366,7 +410,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "current_address STRUCT < street_address: STRUCT " +
         "<street_number: INT, street_name: STRING, street_type: STRING>, country: STRING, postal_code: STRING >) " +
         "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-        testTables.locationForCreateTableSQL(identifier));
+        testTables.locationForCreateTableSQL(identifier) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
 
     // Check the Iceberg table data
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -394,7 +439,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "t_Float FLOaT, t_dOuble DOUBLE, t_boolean BOOLEAN, t_int INT, t_bigint BIGINT, t_binary BINARY, " +
         "t_string STRING, t_timestamp TIMESTAMP, t_date DATE, t_decimal DECIMAL(3,2)) " +
         "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-        testTables.locationForCreateTableSQL(identifier));
+        testTables.locationForCreateTableSQL(identifier) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
 
     // Check the Iceberg table data
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -417,7 +463,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
             shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types " +
                 "(not_supported " + notSupportedType + ") " +
                 "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-                testTables.locationForCreateTableSQL(identifier));
+                testTables.locationForCreateTableSQL(identifier) +
+                testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
           }
       );
     }
@@ -438,7 +485,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
     for (String notSupportedType : notSupportedTypes.keySet()) {
       shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types (not_supported " + notSupportedType + ") " +
               "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-              testTables.locationForCreateTableSQL(identifier));
+              testTables.locationForCreateTableSQL(identifier) +
+              testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
 
       org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
       Assert.assertEquals(notSupportedTypes.get(notSupportedType), icebergTable.schema().columns().get(0).type());
@@ -453,7 +501,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
         "t_int INT COMMENT 'int column',  " +
         "t_string STRING COMMENT 'string column') " +
         "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-        testTables.locationForCreateTableSQL(identifier));
+        testTables.locationForCreateTableSQL(identifier) +
+        testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
 
     List<Object[]> rows = shell.executeStatement("DESCRIBE default.comment_table");
@@ -472,7 +521,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
             "t_int INT,  " +
             "t_string STRING) " +
             "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
-            testTables.locationForCreateTableSQL(identifier));
+            testTables.locationForCreateTableSQL(identifier) +
+            testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
     org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
 
     List<Object[]> rows = shell.executeStatement("DESCRIBE default.without_comment_table");
@@ -491,11 +541,12 @@ public class TestHiveIcebergStorageHandlerNoScan {
 
     shell.executeStatement(String.format("CREATE EXTERNAL TABLE default.customers " +
         "STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' %s" +
-        "TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s')",
+        "TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s')",
         testTables.locationForCreateTableSQL(identifier), // we need the location for HadoopTable based tests only
         InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA),
         InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC),
-        "custom_property", "initial_val"));
+        "custom_property", "initial_val",
+        InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME));
 
 
     // Check the Iceberg table parameters
@@ -505,7 +556,17 @@ public class TestHiveIcebergStorageHandlerNoScan {
     expectedIcebergProperties.put("custom_property", "initial_val");
     expectedIcebergProperties.put("EXTERNAL", "TRUE");
     expectedIcebergProperties.put("storage_handler", HiveIcebergStorageHandler.class.getName());
-    if (Catalogs.hiveCatalog(shell.getHiveConf())) {
+
+    // Check the HMS table parameters
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
+    Map<String, String> hmsParams = hmsTable.getParameters()
+            .entrySet().stream()
+            .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    Properties tableProperties = new Properties();
+    tableProperties.putAll(hmsParams);
+
+    if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
       expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true");
     }
     if (MetastoreUtil.hive3PresentOnClasspath()) {
@@ -513,15 +574,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
     }
     Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());
 
-    // Check the HMS table parameters
-    org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
-    Map<String, String> hmsParams = hmsTable.getParameters()
-        .entrySet().stream()
-        .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
-    if (Catalogs.hiveCatalog(shell.getHiveConf())) {
-      Assert.assertEquals(9, hmsParams.size());
+    if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
+      Assert.assertEquals(10, hmsParams.size());
       Assert.assertEquals("initial_val", hmsParams.get("custom_property"));
       Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
       Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL"));
@@ -534,8 +588,9 @@ public class TestHiveIcebergStorageHandlerNoScan {
               getCurrentSnapshotForHiveCatalogTable(icebergTable));
       Assert.assertNull(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP));
       Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME));
+      Assert.assertNotNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC));
     } else {
-      Assert.assertEquals(7, hmsParams.size());
+      Assert.assertEquals(8, hmsParams.size());
       Assert.assertNull(hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED));
     }
 
@@ -557,8 +612,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
         .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
         .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
-    if (Catalogs.hiveCatalog(shell.getHiveConf())) {
-      Assert.assertEquals(12, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
+    if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
+      Assert.assertEquals(13, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
       Assert.assertEquals("true", hmsParams.get("new_prop_1"));
       Assert.assertEquals("false", hmsParams.get("new_prop_2"));
       Assert.assertEquals("new_val", hmsParams.get("custom_property"));
@@ -568,11 +623,11 @@ public class TestHiveIcebergStorageHandlerNoScan {
       Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP), prevSnapshot);
       Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP), newSnapshot);
     } else {
-      Assert.assertEquals(7, hmsParams.size());
+      Assert.assertEquals(8, hmsParams.size());
     }
 
     // Remove some Iceberg props and see if they're removed from HMS table props as well
-    if (Catalogs.hiveCatalog(shell.getHiveConf())) {
+    if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
       icebergTable.updateProperties()
           .remove("custom_property")
           .remove("new_prop_1")
@@ -584,7 +639,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
     }
 
     // append some data and check whether HMS stats are aligned with snapshot summary
-    if (Catalogs.hiveCatalog(shell.getHiveConf())) {
+    if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
       List<Record> records = HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS;
       testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, FileFormat.PARQUET, null, records);
       hmsParams = shell.metastore().getTable("default", "customers").getParameters();
@@ -642,7 +697,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
 
   @Test
   public void testDropHiveTableWithoutUnderlyingTable() throws IOException {
-    Assume.assumeFalse("Not relevant for HiveCatalog", Catalogs.hiveCatalog(shell.getHiveConf()));
+    Assume.assumeFalse("Not relevant for HiveCatalog",
+            testTableType.equals(TestTables.TestTableType.HIVE_CATALOG));
 
     TableIdentifier identifier = TableIdentifier.of("default", "customers");
     // Create the Iceberg table in non-HiveCatalog
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
new file mode 100644
index 0000000..e1c23f3
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergStorageHandlerWithMultipleCatalogs {
+
+  private static final String[] EXECUTION_ENGINES = new String[] { "tez", "mr" };
+  private static final String HIVECATALOGNAME = "table1_catalog";
+  private static final String OTHERCATALOGNAME = "table2_catalog";
+  private static TestHiveShell shell;
+
+  @Parameterized.Parameter(0)
+  public FileFormat fileFormat1;
+  @Parameterized.Parameter(1)
+  public FileFormat fileFormat2;
+  @Parameterized.Parameter(2)
+  public String executionEngine;
+  @Parameterized.Parameter(3)
+  public TestTables.TestTableType testTableType1;
+  @Parameterized.Parameter(4)
+  public String table1CatalogName;
+  @Parameterized.Parameter(5)
+  public TestTables.TestTableType testTableType2;
+  @Parameterized.Parameter(6)
+  public String table2CatalogName;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+  private TestTables testTables1;
+  private TestTables testTables2;
+
+  @Parameterized.Parameters(name = "fileFormat1={0}, fileFormat2={1}, engine={2}, tableType1={3}, catalogName1={4}, " +
+          "tableType2={5}, catalogName2={6}")
+  public static Collection<Object[]> parameters() {
+    Collection<Object[]> testParams = new ArrayList<>();
+    String javaVersion = System.getProperty("java.specification.version");
+
+    // Run tests with PARQUET and ORC file formats for a two Catalogs
+    for (String engine : EXECUTION_ENGINES) {
+      // include Tez tests only for Java 8
+      if (javaVersion.equals("1.8") || "mr".equals(engine)) {
+        for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) {
+          if (!TestTables.TestTableType.HIVE_CATALOG.equals(testTableType)) {
+            testParams.add(new Object[]{FileFormat.PARQUET, FileFormat.ORC, engine,
+                TestTables.TestTableType.HIVE_CATALOG, HIVECATALOGNAME, testTableType, OTHERCATALOGNAME});
+          }
+        }
+      }
+    }
+    return testParams;
+  }
+
+  @BeforeClass
+  public static void beforeClass() {
+    shell = HiveIcebergStorageHandlerTestUtils.shell();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    shell.stop();
+  }
+
+  @Before
+  public void before() throws IOException {
+    testTables1 = HiveIcebergStorageHandlerTestUtils.testTables(shell, testTableType1, temp, table1CatalogName);
+    HiveIcebergStorageHandlerTestUtils.init(shell, testTables1, temp, executionEngine);
+    testTables1.properties().entrySet().forEach(e -> shell.setHiveSessionValue(e.getKey(), e.getValue()));
+
+    testTables2 = HiveIcebergStorageHandlerTestUtils.testTables(shell, testTableType2, temp, table2CatalogName);
+    testTables2.properties().entrySet().forEach(e -> shell.setHiveSessionValue(e.getKey(), e.getValue()));
+  }
+
+  @After
+  public void after() throws Exception {
+    HiveIcebergStorageHandlerTestUtils.close(shell);
+  }
+
+  @Test
+  public void testJoinTablesFromDifferentCatalogs() throws IOException {
+    createAndAddRecords(testTables1, fileFormat1, TableIdentifier.of("default", "customers1"), table1CatalogName,
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+    createAndAddRecords(testTables2, fileFormat2, TableIdentifier.of("default", "customers2"), table2CatalogName,
+        HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+    List<Object[]> rows = shell.executeStatement("SELECT c2.customer_id, c2.first_name, c2.last_name " +
+            "FROM default.customers2 c2 JOIN default.customers1 c1 ON c2.customer_id = c1.customer_id " +
+            "ORDER BY c2.customer_id");
+    Assert.assertEquals(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size(), rows.size());
+    HiveIcebergTestUtils.validateData(new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS),
+            HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, rows), 0);
+  }
+
+  private void createAndAddRecords(TestTables testTables, FileFormat fileFormat, TableIdentifier identifier,
+                                   String catalogName, List<Record> records) throws IOException {
+    String createSql =
+        "CREATE EXTERNAL TABLE " + identifier + " (customer_id BIGINT, first_name STRING, last_name STRING)" +
+            " STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
+            testTables.locationForCreateTableSQL(identifier) +
+            " TBLPROPERTIES ('" + InputFormatConfig.CATALOG_NAME + "'='" + catalogName + "')";
+    shell.executeStatement(createSql);
+    Table icebergTable = testTables.loadTable(identifier);
+    testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, null, records);
+  }
+
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 55c7b58..4d3a60e 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -26,6 +26,7 @@ import java.sql.Timestamp;
 import java.time.LocalDateTime;
 import java.time.OffsetDateTime;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -48,6 +49,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.hive.HiveCatalog;
 import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.mr.TestCatalogs;
 import org.apache.iceberg.mr.TestHelper;
@@ -71,14 +73,16 @@ abstract class TestTables {
 
   private final Tables tables;
   protected final TemporaryFolder temp;
+  protected final String catalog;
 
-  protected TestTables(Tables tables, TemporaryFolder temp) {
+  protected TestTables(Tables tables, TemporaryFolder temp, String catalogName) {
     this.tables = tables;
     this.temp = temp;
+    this.catalog = catalogName;
   }
 
-  protected TestTables(Catalog catalog, TemporaryFolder temp) {
-    this(new CatalogToTables(catalog), temp);
+  protected TestTables(Catalog catalog, TemporaryFolder temp, String catalogName) {
+    this(new CatalogToTables(catalog), temp, catalogName);
   }
 
   public Map<String, String> properties() {
@@ -103,6 +107,20 @@ abstract class TestTables {
   public abstract String locationForCreateTableSQL(TableIdentifier identifier);
 
   /**
+   * The table properties string needed for the CREATE TABLE ... commands,
+   * like "TBLPROPERTIES('iceberg.catalog'='mycatalog')
+   * @return
+   */
+  public String propertiesForCreateTableSQL(Map<String, String> tableProperties) {
+    Map<String, String> properties = new HashMap<>(tableProperties);
+    properties.putIfAbsent(InputFormatConfig.CATALOG_NAME, catalog);
+    String props = properties.entrySet().stream()
+            .map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue()))
+            .collect(Collectors.joining(","));
+    return " TBLPROPERTIES (" + props + ")";
+  }
+
+  /**
    * If an independent Hive table creation is needed for the given Catalog then this should return the Hive SQL
    * string which we have to execute. Overridden for HiveCatalog where the Hive table is immediately created
    * during the Iceberg table creation so no extra sql execution is required.
@@ -113,15 +131,9 @@ abstract class TestTables {
   public String createHiveTableSQL(TableIdentifier identifier, Map<String, String> tableProps) {
     Preconditions.checkArgument(!identifier.namespace().isEmpty(), "Namespace should not be empty");
     Preconditions.checkArgument(identifier.namespace().levels().length == 1, "Namespace should be single level");
-    String sql = String.format("CREATE TABLE %s.%s STORED BY '%s' %s", identifier.namespace(), identifier.name(),
-        HiveIcebergStorageHandler.class.getName(), locationForCreateTableSQL(identifier));
-    if (tableProps != null && !tableProps.isEmpty()) {
-      String props = tableProps.entrySet().stream()
-          .map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue()))
-          .collect(Collectors.joining(","));
-      sql += " TBLPROPERTIES (" + props + ")";
-    }
-    return sql;
+    return String.format("CREATE TABLE %s.%s STORED BY '%s' %s %s", identifier.namespace(), identifier.name(),
+        HiveIcebergStorageHandler.class.getName(), locationForCreateTableSQL(identifier),
+            propertiesForCreateTableSQL(tableProps));
   }
 
   /**
@@ -179,7 +191,8 @@ abstract class TestTables {
         SchemaParser.toJson(schema) + "', " +
         "'" + InputFormatConfig.PARTITION_SPEC + "'='" +
         PartitionSpecParser.toJson(spec) + "', " +
-        "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
+        "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "', " +
+        "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
 
     if (records != null && !records.isEmpty()) {
       StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
@@ -296,21 +309,24 @@ abstract class TestTables {
 
     private final String warehouseLocation;
 
-    CustomCatalogTestTables(Configuration conf, TemporaryFolder temp) throws IOException {
+    CustomCatalogTestTables(Configuration conf, TemporaryFolder temp, String catalogName) throws IOException {
       this(conf, temp, (MetastoreUtil.hive3PresentOnClasspath() ? "file:" : "") +
-          temp.newFolder("custom", "warehouse").toString());
+          temp.newFolder("custom", "warehouse").toString(), catalogName);
     }
 
-    CustomCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation) {
-      super(new TestCatalogs.CustomHadoopCatalog(conf, warehouseLocation), temp);
+    CustomCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation, String catalogName) {
+      super(new TestCatalogs.CustomHadoopCatalog(conf, warehouseLocation), temp, catalogName);
       this.warehouseLocation = warehouseLocation;
     }
 
     @Override
     public Map<String, String> properties() {
       return ImmutableMap.of(
-              InputFormatConfig.CATALOG_LOADER_CLASS, TestCatalogs.CustomHadoopCatalogLoader.class.getName(),
-              TestCatalogs.CustomHadoopCatalog.WAREHOUSE_LOCATION, warehouseLocation
+              String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "custom",
+              String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalog),
+              TestCatalogs.CustomHadoopCatalog.class.getName(),
+              String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalog),
+              warehouseLocation
       );
     }
 
@@ -325,21 +341,21 @@ abstract class TestTables {
 
     private final String warehouseLocation;
 
-    HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp) throws IOException {
+    HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp, String catalogName) throws IOException {
       this(conf, temp, (MetastoreUtil.hive3PresentOnClasspath() ? "file:" : "") +
-          temp.newFolder("hadoop", "warehouse").toString());
+          temp.newFolder("hadoop", "warehouse").toString(), catalogName);
     }
 
-    HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation) {
-      super(new HadoopCatalog(conf, warehouseLocation), temp);
+    HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation, String catalogName) {
+      super(new HadoopCatalog(conf, warehouseLocation), temp, catalogName);
       this.warehouseLocation = warehouseLocation;
     }
 
     @Override
     public Map<String, String> properties() {
       return ImmutableMap.of(
-              InputFormatConfig.CATALOG, "hadoop",
-              InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, warehouseLocation
+              String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "hadoop",
+              String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalog), warehouseLocation
       );
     }
 
@@ -349,8 +365,8 @@ abstract class TestTables {
   }
 
   static class HadoopTestTables extends TestTables {
-    HadoopTestTables(Configuration conf, TemporaryFolder temp) {
-      super(new HadoopTables(conf), temp);
+    HadoopTestTables(Configuration conf, TemporaryFolder temp, String catalogName) {
+      super(new HadoopTables(conf), temp, catalogName);
     }
 
     @Override
@@ -382,14 +398,14 @@ abstract class TestTables {
 
   static class HiveTestTables extends TestTables {
 
-    HiveTestTables(Configuration conf, TemporaryFolder temp) {
+    HiveTestTables(Configuration conf, TemporaryFolder temp, String catalogName) {
       super(CatalogUtil.loadCatalog(HiveCatalog.class.getName(), CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
-              ImmutableMap.of(), conf), temp);
+              ImmutableMap.of(), conf), temp, catalogName);
     }
 
     @Override
     public Map<String, String> properties() {
-      return ImmutableMap.of(InputFormatConfig.CATALOG, "hive");
+      return ImmutableMap.of(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "hive");
     }
 
     @Override
@@ -423,26 +439,29 @@ abstract class TestTables {
 
   enum TestTableType {
     HADOOP_TABLE {
-      public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {
-        return new HadoopTestTables(conf, temporaryFolder);
+      public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName) {
+        return new HadoopTestTables(conf, temporaryFolder, catalogName);
       }
     },
     HADOOP_CATALOG {
-      public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException {
-        return new HadoopCatalogTestTables(conf, temporaryFolder);
+      public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName)
+          throws IOException {
+        return new HadoopCatalogTestTables(conf, temporaryFolder, catalogName);
       }
     },
     CUSTOM_CATALOG {
-      public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException {
-        return new CustomCatalogTestTables(conf, temporaryFolder);
+      public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName)
+          throws IOException {
+        return new CustomCatalogTestTables(conf, temporaryFolder, catalogName);
       }
     },
     HIVE_CATALOG {
-      public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {
-        return new HiveTestTables(conf, temporaryFolder);
+      public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName) {
+        return new HiveTestTables(conf, temporaryFolder, catalogName);
       }
     };
 
-    public abstract TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException;
+    public abstract TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName)
+        throws IOException;
   }
 }