You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by pv...@apache.org on 2021/04/12 07:36:34 UTC
[iceberg] branch master updated: Hive: Configure catalog type on
table level. (#2129)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new db8248c Hive: Configure catalog type on table level. (#2129)
db8248c is described below
commit db8248c16e99c435ff7eed8fa86bc3913af2756a
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Mon Apr 12 09:36:22 2021 +0200
Hive: Configure catalog type on table level. (#2129)
---
.../java/org/apache/iceberg/mr/CatalogLoader.java | 33 -----
.../main/java/org/apache/iceberg/mr/Catalogs.java | 158 ++++++++++++++-------
.../org/apache/iceberg/mr/InputFormatConfig.java | 11 +-
.../iceberg/mr/hive/HiveIcebergMetaHook.java | 10 +-
.../mr/hive/HiveIcebergOutputCommitter.java | 10 +-
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 23 ++-
.../java/org/apache/iceberg/mr/TestCatalogs.java | 120 ++++++++++++----
.../apache/iceberg/mr/TestIcebergInputFormats.java | 24 ++--
.../iceberg/mr/TestInputFormatReaderDeletes.java | 8 ++
.../hive/HiveIcebergStorageHandlerTestUtils.java | 15 +-
.../mr/hive/TestHiveIcebergOutputCommitter.java | 10 +-
.../iceberg/mr/hive/TestHiveIcebergSerDe.java | 3 +
.../TestHiveIcebergStorageHandlerLocalScan.java | 13 +-
.../hive/TestHiveIcebergStorageHandlerNoScan.java | 136 ++++++++++++------
...eIcebergStorageHandlerWithMultipleCatalogs.java | 143 +++++++++++++++++++
.../org/apache/iceberg/mr/hive/TestTables.java | 97 ++++++++-----
16 files changed, 590 insertions(+), 224 deletions(-)
diff --git a/mr/src/main/java/org/apache/iceberg/mr/CatalogLoader.java b/mr/src/main/java/org/apache/iceberg/mr/CatalogLoader.java
deleted file mode 100644
index 0d15083..0000000
--- a/mr/src/main/java/org/apache/iceberg/mr/CatalogLoader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.mr;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.catalog.Catalog;
-
-public interface CatalogLoader {
-
- /**
- * Load and return a {@link Catalog} specified by the configuration.
- * @param conf a Hadoop conf
- * @return a {@link Catalog} instance
- */
- Catalog load(Configuration conf);
-}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
index c60bc47..0ff9b09 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/Catalogs.java
@@ -24,7 +24,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
@@ -33,17 +35,12 @@ import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
-import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
/**
* Class for catalog resolution and accessing the common functions for {@link Catalog} API.
@@ -56,16 +53,20 @@ import org.slf4j.LoggerFactory;
* </ol>
*/
public final class Catalogs {
- private static final Logger LOG = LoggerFactory.getLogger(Catalogs.class);
- private static final String HADOOP = "hadoop";
- private static final String HIVE = "hive";
+ public static final String ICEBERG_DEFAULT_CATALOG_NAME = "default_iceberg";
+ public static final String ICEBERG_HADOOP_TABLE_NAME = "location_based_table";
+
+ private static final String HIVE_CATALOG_TYPE = "hive";
+ private static final String HADOOP_CATALOG_TYPE = "hadoop";
+ private static final String NO_CATALOG_TYPE = "no catalog";
public static final String NAME = "name";
public static final String LOCATION = "location";
private static final Set<String> PROPERTIES_TO_REMOVE =
- ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME);
+ ImmutableSet.of(InputFormatConfig.TABLE_SCHEMA, InputFormatConfig.PARTITION_SPEC, LOCATION, NAME,
+ InputFormatConfig.CATALOG_NAME);
private Catalogs() {
}
@@ -76,14 +77,15 @@ public final class Catalogs {
* @return an Iceberg table
*/
public static Table loadTable(Configuration conf) {
- return loadTable(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER), conf.get(InputFormatConfig.TABLE_LOCATION));
+ return loadTable(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER), conf.get(InputFormatConfig.TABLE_LOCATION),
+ conf.get(InputFormatConfig.CATALOG_NAME));
}
/**
* Load an Iceberg table using the catalog specified by the configuration.
* <p>
- * The table identifier ({@link Catalogs#NAME}) or table path ({@link Catalogs#LOCATION}) should be specified by
- * the controlling properties.
+ * The table identifier ({@link Catalogs#NAME}) and the catalog name ({@link InputFormatConfig#CATALOG_NAME}),
+ * or table path ({@link Catalogs#LOCATION}) should be specified by the controlling properties.
* <p>
* Used by HiveIcebergSerDe and HiveIcebergStorageHandler
* @param conf a Hadoop
@@ -91,11 +93,13 @@ public final class Catalogs {
* @return an Iceberg table
*/
public static Table loadTable(Configuration conf, Properties props) {
- return loadTable(conf, props.getProperty(NAME), props.getProperty(LOCATION));
+ return loadTable(conf, props.getProperty(NAME), props.getProperty(LOCATION),
+ props.getProperty(InputFormatConfig.CATALOG_NAME));
}
- private static Table loadTable(Configuration conf, String tableIdentifier, String tableLocation) {
- Optional<Catalog> catalog = loadCatalog(conf);
+ private static Table loadTable(Configuration conf, String tableIdentifier, String tableLocation,
+ String catalogName) {
+ Optional<Catalog> catalog = loadCatalog(conf, catalogName);
if (catalog.isPresent()) {
Preconditions.checkArgument(tableIdentifier != null, "Table identifier not set");
@@ -134,6 +138,7 @@ public final class Catalogs {
}
String location = props.getProperty(LOCATION);
+ String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
// Create a table property map without the controlling properties
Map<String, String> map = new HashMap<>(props.size());
@@ -143,7 +148,7 @@ public final class Catalogs {
}
}
- Optional<Catalog> catalog = loadCatalog(conf);
+ Optional<Catalog> catalog = loadCatalog(conf, catalogName);
if (catalog.isPresent()) {
String name = props.getProperty(NAME);
@@ -166,8 +171,9 @@ public final class Catalogs {
*/
public static boolean dropTable(Configuration conf, Properties props) {
String location = props.getProperty(LOCATION);
+ String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
- Optional<Catalog> catalog = loadCatalog(conf);
+ Optional<Catalog> catalog = loadCatalog(conf, catalogName);
if (catalog.isPresent()) {
String name = props.getProperty(NAME);
@@ -182,48 +188,100 @@ public final class Catalogs {
/**
* Returns true if HiveCatalog is used
* @param conf a Hadoop conf
+ * @param props the controlling properties
* @return true if the Catalog is HiveCatalog
*/
- public static boolean hiveCatalog(Configuration conf) {
- return HIVE.equalsIgnoreCase(conf.get(InputFormatConfig.CATALOG));
+ public static boolean hiveCatalog(Configuration conf, Properties props) {
+ String catalogName = props.getProperty(InputFormatConfig.CATALOG_NAME);
+ return CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE.equalsIgnoreCase(getCatalogType(conf, catalogName));
}
@VisibleForTesting
- static Optional<Catalog> loadCatalog(Configuration conf) {
- String catalogLoaderClass = conf.get(InputFormatConfig.CATALOG_LOADER_CLASS);
-
- if (catalogLoaderClass != null) {
- CatalogLoader loader = (CatalogLoader) DynConstructors.builder(CatalogLoader.class)
- .impl(catalogLoaderClass)
- .build()
- .newInstance();
- Catalog catalog = loader.load(conf);
- LOG.info("Loaded catalog {} using {}", catalog, catalogLoaderClass);
- return Optional.of(catalog);
+ static Optional<Catalog> loadCatalog(Configuration conf, String catalogName) {
+ String catalogType = getCatalogType(conf, catalogName);
+ if (catalogType == null) {
+ throw new NoSuchNamespaceException("Catalog definition for %s is not found.", catalogName);
+ }
+
+ if (NO_CATALOG_TYPE.equalsIgnoreCase(catalogType)) {
+ return Optional.empty();
+ } else {
+ String name = catalogName == null ? ICEBERG_DEFAULT_CATALOG_NAME : catalogName;
+ return Optional.of(CatalogUtil.buildIcebergCatalog(name,
+ getCatalogProperties(conf, name, catalogType), conf));
}
+ }
+
+ /**
+ * Collect all the catalog specific configuration from the global hive configuration.
+ * @param conf a Hadoop configuration
+ * @param catalogName name of the catalog
+ * @param catalogType type of the catalog
+ * @return complete map of catalog properties
+ */
+ private static Map<String, String> getCatalogProperties(Configuration conf, String catalogName, String catalogType) {
+ String keyPrefix = InputFormatConfig.CATALOG_CONFIG_PREFIX + catalogName;
+ Map<String, String> catalogProperties = Streams.stream(conf.iterator())
+ .filter(e -> e.getKey().startsWith(keyPrefix))
+ .collect(Collectors.toMap(e -> e.getKey().substring(keyPrefix.length() + 1), Map.Entry::getValue));
+ return addCatalogPropertiesIfMissing(conf, catalogType, catalogProperties);
+ }
- String catalogName = conf.get(InputFormatConfig.CATALOG);
+ /**
+ * This method is used for backward-compatible catalog configuration.
+ * Collect all the catalog specific configuration from the global hive configuration.
+ * Note: this should be removed when the old catalog configuration is depracated.
+ * @param conf global hive configuration
+ * @param catalogType type of the catalog
+ * @param catalogProperties pre-populated catalog properties
+ * @return complete map of catalog properties
+ */
+ private static Map<String, String> addCatalogPropertiesIfMissing(Configuration conf, String catalogType,
+ Map<String, String> catalogProperties) {
+ catalogProperties.putIfAbsent(CatalogUtil.ICEBERG_CATALOG_TYPE, catalogType);
+ if (catalogType.equalsIgnoreCase(HADOOP_CATALOG_TYPE)) {
+ catalogProperties.putIfAbsent(CatalogProperties.WAREHOUSE_LOCATION,
+ conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION));
+ }
+ return catalogProperties;
+ }
+ /**
+ * Return the catalog type based on the catalog name.
+ * <p>
+ * If the catalog name is provided get the catalog type from 'iceberg.catalog.<code>catalogName</code>.type' config.
+ * In case the value of this property is null, return with no catalog definition (Hadoop Table)
+ * </p>
+ * <p>
+ * If catalog name is null, check the global conf for 'iceberg.mr.catalog' property. If the value of the property is:
+ * <ul>
+ * <li>null/hive -> Hive Catalog</li>
+ * <li>location -> Hadoop Table</li>
+ * <li>hadoop -> Hadoop Catalog</li>
+ * <li>any other value -> Custom Catalog</li>
+ * </ul>
+ * </p>
+ * @param conf global hive configuration
+ * @param catalogName name of the catalog
+ * @return type of the catalog, can be null
+ */
+ private static String getCatalogType(Configuration conf, String catalogName) {
if (catalogName != null) {
- Catalog catalog;
- switch (catalogName.toLowerCase()) {
- case HADOOP:
- String warehouseLocation = conf.get(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION);
-
- catalog = (warehouseLocation != null) ? new HadoopCatalog(conf, warehouseLocation) : new HadoopCatalog(conf);
- LOG.info("Loaded Hadoop catalog {}", catalog);
- return Optional.of(catalog);
- case HIVE:
- catalog = CatalogUtil.loadCatalog(HiveCatalog.class.getName(), CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
- ImmutableMap.of(), conf);
- LOG.info("Loaded Hive Metastore catalog {}", catalog);
- return Optional.of(catalog);
- default:
- throw new NoSuchNamespaceException("Catalog %s is not supported.", catalogName);
+ String catalogType = conf.get(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName));
+ if (catalogName.equals(ICEBERG_HADOOP_TABLE_NAME) || catalogType == null) {
+ return NO_CATALOG_TYPE;
+ } else {
+ return catalogType;
+ }
+ } else {
+ String catalogType = conf.get(InputFormatConfig.CATALOG);
+ if (catalogType == null) {
+ return HIVE_CATALOG_TYPE;
+ } else if (catalogType.equals(LOCATION)) {
+ return NO_CATALOG_TYPE;
+ } else {
+ return catalogType;
}
}
-
- LOG.info("Catalog is not configured");
- return Optional.empty();
}
}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index 38e6606..9521bf0 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -47,6 +47,7 @@ public class InputFormatConfig {
public static final String TABLE_SCHEMA = "iceberg.mr.table.schema";
public static final String PARTITION_SPEC = "iceberg.mr.table.partition.spec";
public static final String SERIALIZED_TABLE_PREFIX = "iceberg.mr.serialized.table.";
+ public static final String TABLE_CATALOG_PREFIX = "iceberg.mr.table.catalog.";
public static final String LOCALITY = "iceberg.mr.locality";
public static final String CATALOG = "iceberg.mr.catalog";
public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = "iceberg.mr.catalog.hadoop.warehouse.location";
@@ -72,6 +73,11 @@ public class InputFormatConfig {
public static final String SNAPSHOT_TABLE = "iceberg.snapshots.table";
public static final String SNAPSHOT_TABLE_SUFFIX = "__snapshots";
+ public static final String CATALOG_CONFIG_PREFIX = "iceberg.catalog.";
+ public static final String CATALOG_TYPE_TEMPLATE = "iceberg.catalog.%s.type";
+ public static final String CATALOG_WAREHOUSE_TEMPLATE = "iceberg.catalog.%s.warehouse";
+ public static final String CATALOG_CLASS_TEMPLATE = "iceberg.catalog.%s.catalog-impl";
+
public enum InMemoryDataModel {
PIG,
HIVE,
@@ -162,11 +168,6 @@ public class InputFormatConfig {
return this;
}
- public ConfigBuilder catalogLoader(Class<? extends CatalogLoader> catalogLoader) {
- conf.setClass(CATALOG_LOADER_CLASS, catalogLoader, CatalogLoader.class);
- return this;
- }
-
public ConfigBuilder useHiveRows() {
conf.set(IN_MEMORY_DATA_MODEL, InMemoryDataModel.HIVE.name());
return this;
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index 4ade847..438ca87 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -80,7 +80,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
hmsTable.getParameters().put(BaseMetastoreTableOperations.TABLE_TYPE_PROP,
BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE.toUpperCase());
- if (!Catalogs.hiveCatalog(conf)) {
+ if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
// For non-HiveCatalog tables too, we should set the input and output format
// so that the table can be read by other engines like Impala
hmsTable.getSd().setInputFormat(HiveIcebergInputFormat.class.getCanonicalName());
@@ -125,7 +125,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
}
// If the table is not managed by Hive catalog then the location should be set
- if (!Catalogs.hiveCatalog(conf)) {
+ if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
Preconditions.checkArgument(hmsTable.getSd() != null && hmsTable.getSd().getLocation() != null,
"Table location not set");
}
@@ -142,7 +142,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
@Override
public void commitCreateTable(org.apache.hadoop.hive.metastore.api.Table hmsTable) {
if (icebergTable == null) {
- if (Catalogs.hiveCatalog(conf)) {
+ if (Catalogs.hiveCatalog(conf, catalogProperties)) {
catalogProperties.put(TableProperties.ENGINE_HIVE_ENABLED, true);
}
@@ -156,7 +156,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
this.deleteIcebergTable = hmsTable.getParameters() != null &&
"TRUE".equalsIgnoreCase(hmsTable.getParameters().get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
- if (deleteIcebergTable && Catalogs.hiveCatalog(conf)) {
+ if (deleteIcebergTable && Catalogs.hiveCatalog(conf, catalogProperties)) {
// Store the metadata and the id for deleting the actual table data
String metadataLocation = hmsTable.getParameters().get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP);
this.deleteIo = Catalogs.loadTable(conf, catalogProperties).io();
@@ -173,7 +173,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
public void commitDropTable(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean deleteData) {
if (deleteData && deleteIcebergTable) {
try {
- if (!Catalogs.hiveCatalog(conf)) {
+ if (!Catalogs.hiveCatalog(conf, catalogProperties)) {
LOG.info("Dropping with purge all the data for table {}.{}", hmsTable.getDbName(), hmsTable.getTableName());
Catalogs.dropTable(conf, catalogProperties);
} else {
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 3479b36..bf298ad 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -170,8 +170,9 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
.executeWith(tableExecutor)
.run(output -> {
Table table = HiveIcebergStorageHandler.table(jobConf, output);
+ String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
- commitTable(table.io(), fileExecutor, jobContext, output, table.location());
+ commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
});
} finally {
fileExecutor.shutdown();
@@ -245,12 +246,17 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
* @param jobContext The job context
* @param name The name of the table used for loading from the catalog
* @param location The location of the table used for loading from the catalog
+ * @param catalogName The name of the catalog that contains the table
*/
- private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location) {
+ private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location,
+ String catalogName) {
JobConf conf = jobContext.getJobConf();
Properties catalogProperties = new Properties();
catalogProperties.put(Catalogs.NAME, name);
catalogProperties.put(Catalogs.LOCATION, location);
+ if (catalogName != null) {
+ catalogProperties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+ }
Table table = Catalogs.loadTable(conf, catalogProperties);
long startTime = System.currentTimeMillis();
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 015bcb0..f51259e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -113,13 +113,18 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
if (tableDesc != null && tableDesc.getProperties() != null &&
tableDesc.getProperties().get(WRITE_KEY) != null) {
- Preconditions.checkArgument(!tableDesc.getTableName().contains(TABLE_NAME_SEPARATOR),
- "Can not handle table " + tableDesc.getTableName() + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
+ String tableName = tableDesc.getTableName();
+ Preconditions.checkArgument(!tableName.contains(TABLE_NAME_SEPARATOR),
+ "Can not handle table " + tableName + ". Its name contains '" + TABLE_NAME_SEPARATOR + "'");
String tables = jobConf.get(InputFormatConfig.OUTPUT_TABLES);
- tables = tables == null ? tableDesc.getTableName() : tables + TABLE_NAME_SEPARATOR + tableDesc.getTableName();
-
+ tables = tables == null ? tableName : tables + TABLE_NAME_SEPARATOR + tableName;
jobConf.set("mapred.output.committer.class", HiveIcebergOutputCommitter.class.getName());
jobConf.set(InputFormatConfig.OUTPUT_TABLES, tables);
+
+ String catalogName = tableDesc.getProperties().getProperty(InputFormatConfig.CATALOG_NAME);
+ if (catalogName != null) {
+ jobConf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + tableName, catalogName);
+ }
}
}
@@ -172,6 +177,16 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
}
/**
+ * Returns the catalog name serialized to the configuration.
+ * @param config The configuration used to get the data from
+ * @param name The name of the table we neeed as returned by TableDesc.getTableName()
+ * @return catalog name
+ */
+ public static String catalogName(Configuration config, String name) {
+ return config.get(InputFormatConfig.TABLE_CATALOG_PREFIX + name);
+ }
+
+ /**
* Returns the Table Schema serialized to the configuration.
* @param config The configuration used to get the data from
* @return The Table Schema object
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
index 3412ad5..04168eb 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestCatalogs.java
@@ -25,6 +25,7 @@ import java.util.Optional;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
@@ -32,7 +33,6 @@ import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
@@ -63,6 +63,7 @@ public class TestCatalogs {
@Test
public void testLoadTableFromLocation() throws IOException {
+ conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
AssertHelpers.assertThrows(
"Should complain about table location not set", IllegalArgumentException.class,
"location not set", () -> Catalogs.loadTable(conf));
@@ -77,14 +78,15 @@ public class TestCatalogs {
@Test
public void testLoadTableFromCatalog() throws IOException {
- conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString());
- conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalogLoader.class, CatalogLoader.class);
+ String defaultCatalogName = "default";
+ String warehouseLocation = temp.newFolder("hadoop", "warehouse").toString();
+ setCustomCatalogProperties(defaultCatalogName, warehouseLocation);
AssertHelpers.assertThrows(
"Should complain about table identifier not set", IllegalArgumentException.class,
"identifier not set", () -> Catalogs.loadTable(conf));
- HadoopCatalog catalog = new CustomHadoopCatalog(conf);
+ HadoopCatalog catalog = new CustomHadoopCatalog(conf, warehouseLocation);
Table hadoopCatalogTable = catalog.createTable(TableIdentifier.of("table"), SCHEMA);
conf.set(InputFormatConfig.TABLE_IDENTIFIER, "table");
@@ -100,6 +102,7 @@ public class TestCatalogs {
"Should complain about table schema not set", NullPointerException.class,
"schema not set", () -> Catalogs.createTable(conf, missingSchema));
+ conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
Properties missingLocation = new Properties();
missingLocation.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA));
AssertHelpers.assertThrows(
@@ -138,18 +141,21 @@ public class TestCatalogs {
@Test
public void testCreateDropTableToCatalog() throws IOException {
TableIdentifier identifier = TableIdentifier.of("test", "table");
+ String defaultCatalogName = "default";
+ String warehouseLocation = temp.newFolder("hadoop", "warehouse").toString();
- conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString());
- conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalogLoader.class, CatalogLoader.class);
+ setCustomCatalogProperties(defaultCatalogName, warehouseLocation);
Properties missingSchema = new Properties();
missingSchema.put("name", identifier.toString());
+ missingSchema.put(InputFormatConfig.CATALOG_NAME, defaultCatalogName);
AssertHelpers.assertThrows(
"Should complain about table schema not set", NullPointerException.class,
"schema not set", () -> Catalogs.createTable(conf, missingSchema));
Properties missingIdentifier = new Properties();
missingIdentifier.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA));
+ missingIdentifier.put(InputFormatConfig.CATALOG_NAME, defaultCatalogName);
AssertHelpers.assertThrows(
"Should complain about table identifier not set", NullPointerException.class,
"identifier not set", () -> Catalogs.createTable(conf, missingIdentifier));
@@ -159,10 +165,11 @@ public class TestCatalogs {
properties.put(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(SCHEMA));
properties.put(InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC));
properties.put("dummy", "test");
+ properties.put(InputFormatConfig.CATALOG_NAME, defaultCatalogName);
Catalogs.createTable(conf, properties);
- HadoopCatalog catalog = new CustomHadoopCatalog(conf);
+ HadoopCatalog catalog = new CustomHadoopCatalog(conf, warehouseLocation);
Table table = catalog.loadTable(identifier);
Assert.assertEquals(SchemaParser.toJson(SCHEMA), SchemaParser.toJson(table.schema()));
@@ -175,6 +182,7 @@ public class TestCatalogs {
Properties dropProperties = new Properties();
dropProperties.put("name", identifier.toString());
+ dropProperties.put(InputFormatConfig.CATALOG_NAME, defaultCatalogName);
Catalogs.dropTable(conf, dropProperties);
AssertHelpers.assertThrows(
@@ -184,50 +192,108 @@ public class TestCatalogs {
@Test
public void testLoadCatalog() throws IOException {
- Assert.assertFalse(Catalogs.loadCatalog(conf).isPresent());
+ conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
+ Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
- conf.set(InputFormatConfig.CATALOG, "foo");
+ String nonExistentCatalogType = "fooType";
+
+ conf.set(InputFormatConfig.CATALOG, nonExistentCatalogType);
AssertHelpers.assertThrows(
- "Should complain about catalog not supported", NoSuchNamespaceException.class,
- "is not supported", () -> Catalogs.loadCatalog(conf));
+ "should complain about catalog not supported", UnsupportedOperationException.class,
+ "Unknown catalog type", () -> Catalogs.loadCatalog(conf, null));
- conf.set(InputFormatConfig.CATALOG, "hadoop");
- Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf);
+ conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ conf.set(InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, "/tmp/mylocation");
+ Optional<Catalog> hadoopCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(hadoopCatalog.isPresent());
Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
- conf.set(InputFormatConfig.CATALOG, "hive");
- Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf);
+ conf.set(InputFormatConfig.CATALOG, CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+ Optional<Catalog> hiveCatalog = Catalogs.loadCatalog(conf, null);
Assert.assertTrue(hiveCatalog.isPresent());
Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
- conf.set("warehouse.location", temp.newFolder("hadoop", "warehouse").toString());
- conf.setClass(InputFormatConfig.CATALOG_LOADER_CLASS, CustomHadoopCatalogLoader.class, CatalogLoader.class);
- Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf);
+ conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
+ Assert.assertFalse(Catalogs.loadCatalog(conf, null).isPresent());
+
+ // arbitrary catalog name with non existent catalog type
+ String catalogName = "barCatalog";
+ conf.unset(InputFormatConfig.CATALOG);
+ conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), nonExistentCatalogType);
+ AssertHelpers.assertThrows(
+ "should complain about catalog not supported", UnsupportedOperationException.class,
+ "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, catalogName));
+
+ // arbitrary catalog name with hadoop catalog type and default warehouse location
+ conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+
+ Assert.assertTrue(hadoopCatalog.isPresent());
+ Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
+
+ // arbitrary catalog name with hadoop catalog type and provided warehouse location
+ conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalogName), "/tmp/mylocation");
+ hadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
+
+ Assert.assertTrue(hadoopCatalog.isPresent());
+ Assert.assertTrue(hadoopCatalog.get() instanceof HadoopCatalog);
+ Assert.assertEquals("HadoopCatalog{name=barCatalog, location=/tmp/mylocation}", hadoopCatalog.get().toString());
+
+ // arbitrary catalog name with hive catalog type
+ conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE);
+ hiveCatalog = Catalogs.loadCatalog(conf, catalogName);
+
+ Assert.assertTrue(hiveCatalog.isPresent());
+ Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
+
+ // arbitrary catalog name with custom catalog type without specific classloader
+ conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom");
+ AssertHelpers.assertThrows(
+ "should complain about catalog not supported", UnsupportedOperationException.class,
+ "Unknown catalog type:", () -> Catalogs.loadCatalog(conf, catalogName));
+
+ // arbitrary catalog name with custom catalog type and provided classloader
+ conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom");
+ conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalogName), CustomHadoopCatalog.class.getName());
+ Optional<Catalog> customHadoopCatalog = Catalogs.loadCatalog(conf, catalogName);
Assert.assertTrue(customHadoopCatalog.isPresent());
Assert.assertTrue(customHadoopCatalog.get() instanceof CustomHadoopCatalog);
+
+ // arbitrary catalog name with location catalog type
+ conf.unset(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName));
+ Assert.assertFalse(Catalogs.loadCatalog(conf, catalogName).isPresent());
+
+ // default catalog configuration
+ conf.unset(InputFormatConfig.CATALOG);
+ hiveCatalog = Catalogs.loadCatalog(conf, null);
+
+ Assert.assertTrue(hiveCatalog.isPresent());
+ Assert.assertTrue(hiveCatalog.get() instanceof HiveCatalog);
}
public static class CustomHadoopCatalog extends HadoopCatalog {
- public static final String WAREHOUSE_LOCATION = "warehouse.location";
+ public CustomHadoopCatalog() {
+
+ }
public CustomHadoopCatalog(Configuration conf, String warehouseLocation) {
super(conf, warehouseLocation);
}
- public CustomHadoopCatalog(Configuration conf) {
- this(conf, conf.get(WAREHOUSE_LOCATION));
- }
}
- public static class CustomHadoopCatalogLoader implements CatalogLoader {
- @Override
- public Catalog load(Configuration conf) {
- return new CustomHadoopCatalog(conf);
- }
+ private void setCustomCatalogProperties(String catalogName, String warehouseLocation) {
+ conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalogName), warehouseLocation);
+ conf.set(String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalogName), CustomHadoopCatalog.class.getName());
+ conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalogName), "custom");
+ conf.set(InputFormatConfig.CATALOG_NAME, catalogName);
}
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
index 4804435..6f3cbee 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
@@ -103,6 +104,7 @@ public class TestIcebergInputFormats {
@Before
public void before() throws IOException {
conf = new Configuration();
+ conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
HadoopTables tables = new HadoopTables(conf);
File location = temp.newFolder(testInputFormat.name(), fileFormat.name());
@@ -344,18 +346,17 @@ public class TestIcebergInputFormats {
}
}
- public static class HadoopCatalogLoader implements CatalogLoader {
- @Override
- public Catalog load(Configuration conf) {
- return new HadoopCatalog(conf, conf.get("warehouse.location"));
- }
- }
-
@Test
public void testCustomCatalog() throws IOException {
- conf.set("warehouse.location", temp.newFolder("hadoop_catalog").getAbsolutePath());
-
- Catalog catalog = new HadoopCatalogLoader().load(conf);
+ String warehouseLocation = temp.newFolder("hadoop_catalog").getAbsolutePath();
+ conf.set("warehouse.location", warehouseLocation);
+ conf.set(InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
+ conf.set(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME),
+ CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ conf.set(String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME),
+ warehouseLocation);
+
+ Catalog catalog = new HadoopCatalog(conf, conf.get("warehouse.location"));
TableIdentifier identifier = TableIdentifier.of("db", "t");
Table table = catalog.createTable(identifier, SCHEMA, SPEC, helper.properties());
helper.setTable(table);
@@ -364,8 +365,7 @@ public class TestIcebergInputFormats {
expectedRecords.get(0).set(2, "2020-03-20");
helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
- builder.catalogLoader(HadoopCatalogLoader.class)
- .readFrom(identifier);
+ builder.readFrom(identifier);
testInputFormat.create(builder.conf()).validate(expectedRecords);
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
index d41acb7..ef964f8 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestInputFormatReaderDeletes.java
@@ -35,6 +35,7 @@ import org.apache.iceberg.data.DeleteReadTests;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.util.StructLikeSet;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -60,6 +61,13 @@ public class TestInputFormatReaderDeletes extends DeleteReadTests {
};
}
+ @Before
+ @Override
+ public void writeTestDataFile() throws IOException {
+ conf.set(InputFormatConfig.CATALOG, Catalogs.LOCATION);
+ super.writeTestDataFile();
+ }
+
public TestInputFormatReaderDeletes(String inputFormat, FileFormat fileFormat) {
this.inputFormat = inputFormat;
this.fileFormat = fileFormat;
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
index 6a2db8b..b08673c 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandlerTestUtils.java
@@ -20,11 +20,13 @@
package org.apache.iceberg.mr.hive;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.types.Types;
import org.apache.orc.OrcConf;
@@ -59,9 +61,14 @@ public class HiveIcebergStorageHandlerTestUtils {
}
static TestHiveShell shell() {
+ return shell(Collections.emptyMap());
+ }
+
+ static TestHiveShell shell(Map<String, String> configs) {
TestHiveShell shell = new TestHiveShell();
shell.setHiveConfValue("hive.notification.event.poll.interval", "-1");
shell.setHiveConfValue("hive.tez.exec.print.summary", "true");
+ configs.forEach((k, v) -> shell.setHiveConfValue(k, v));
// We would like to make sure that ORC reading overrides this config, so reading Iceberg tables could work in
// systems (like Hive 3.2 and higher) where this value is set to true explicitly.
shell.setHiveConfValue(OrcConf.FORCE_POSITIONAL_EVOLUTION.getHiveConfName(), "true");
@@ -70,9 +77,13 @@ public class HiveIcebergStorageHandlerTestUtils {
}
static TestTables testTables(TestHiveShell shell, TestTables.TestTableType testTableType, TemporaryFolder temp)
- throws IOException {
+ throws IOException {
+ return testTables(shell, testTableType, temp, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME);
+ }
- return testTableType.instance(shell.metastore().hiveConf(), temp);
+ static TestTables testTables(TestHiveShell shell, TestTables.TestTableType testTableType, TemporaryFolder temp,
+ String catalogName) throws IOException {
+ return testTableType.instance(shell.metastore().hiveConf(), temp, catalogName);
}
static void init(TestHiveShell shell, TestTables testTables, TemporaryFolder temp, String engine) {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 1f5466b..af8c383 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -50,6 +50,7 @@ import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
@@ -108,6 +109,7 @@ public class TestHiveIcebergOutputCommitter {
public void testSuccessfulUnpartitionedWrite() throws IOException {
HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
Table table = table(temp.getRoot().getPath(), false);
+
JobConf conf = jobConf(table, 1);
List<Record> expected = writeRecords(table.name(), 1, 0, true, false, conf);
committer.commitJob(new JobContextImpl(conf, JOB_ID));
@@ -217,7 +219,9 @@ public class TestHiveIcebergOutputCommitter {
private Table table(String location, boolean partitioned) {
HadoopTables tables = new HadoopTables();
- return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(), location);
+
+ return tables.create(CUSTOMER_SCHEMA, partitioned ? PARTITIONED_SPEC : PartitionSpec.unpartitioned(),
+ ImmutableMap.of(InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_HADOOP_TABLE_NAME), location);
}
private JobConf jobConf(Table table, int taskNum) {
@@ -226,6 +230,8 @@ public class TestHiveIcebergOutputCommitter {
conf.setNumReduceTasks(0);
conf.set(HiveConf.ConfVars.HIVEQUERYID.varname, QUERY_ID);
conf.set(InputFormatConfig.OUTPUT_TABLES, table.name());
+ conf.set(InputFormatConfig.TABLE_CATALOG_PREFIX + table.name(),
+ table.properties().get(InputFormatConfig.CATALOG_NAME));
conf.set(InputFormatConfig.SERIALIZED_TABLE_PREFIX + table.name(), SerializationUtil.serializeToBase64(table));
Map<String, String> propMap = Maps.newHashMap();
@@ -233,6 +239,8 @@ public class TestHiveIcebergOutputCommitter {
tableDesc.setProperties(new Properties());
tableDesc.getProperties().setProperty(Catalogs.NAME, table.name());
tableDesc.getProperties().setProperty(Catalogs.LOCATION, table.location());
+ tableDesc.getProperties().setProperty(InputFormatConfig.CATALOG_NAME, table.properties()
+ .get(InputFormatConfig.CATALOG_NAME));
HiveIcebergStorageHandler.overlayTableProperties(conf, tableDesc, propMap);
propMap.forEach((key, value) -> conf.set(key, value));
return conf;
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSerDe.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSerDe.java
index d47c1d6..7e043a8 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSerDe.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergSerDe.java
@@ -28,6 +28,8 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.serde.objectinspector.IcebergObjectInspector;
import org.apache.iceberg.mr.mapred.Container;
import org.apache.iceberg.types.Types;
@@ -54,6 +56,7 @@ public class TestHiveIcebergSerDe {
Properties properties = new Properties();
properties.setProperty("location", location.toString());
+ properties.setProperty(InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_HADOOP_TABLE_NAME);
HadoopTables tables = new HadoopTables(conf);
tables.create(schema, location.toString());
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
index d6d4244..97c2c31 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerLocalScan.java
@@ -37,6 +37,7 @@ import org.apache.iceberg.TestHelpers.Row;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestHelper;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
@@ -250,7 +251,8 @@ public class TestHiveIcebergStorageHandlerLocalScan {
" (customer_id BIGINT, first_name STRING COMMENT 'This is first name', " +
"last_name STRING COMMENT 'This is last name')" +
" STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier);
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of());
runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
PartitionSpec.unpartitioned(), data);
}
@@ -268,7 +270,8 @@ public class TestHiveIcebergStorageHandlerLocalScan {
" (customer_id BIGINT, first_name STRING COMMENT 'This is first name') " +
"PARTITIONED BY (last_name STRING COMMENT 'This is last name') STORED BY " +
"'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier);
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of());
runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
}
@@ -286,7 +289,8 @@ public class TestHiveIcebergStorageHandlerLocalScan {
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.PARTITION_SPEC + "'='" + PartitionSpecParser.toJson(spec) + "', " +
"'" + InputFormatConfig.TABLE_SCHEMA + "'='" +
- SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "')";
+ SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')";
runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
}
@@ -303,7 +307,8 @@ public class TestHiveIcebergStorageHandlerLocalScan {
"PARTITIONED BY (first_name STRING COMMENT 'This is first name', " +
"last_name STRING COMMENT 'This is last name') " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier);
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of());
runCreateAndReadTest(identifier, createSql, HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, spec, data);
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
index 94e1e29..9ace489 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerNoScan.java
@@ -42,6 +42,7 @@ import org.apache.iceberg.PartitionSpecParser;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
@@ -164,7 +165,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
"'" + InputFormatConfig.PARTITION_SPEC + "'='" +
PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
- "'dummy'='test')");
+ "'dummy'='test', " +
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
// Check the Iceberg table data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -172,7 +174,12 @@ public class TestHiveIcebergStorageHandlerNoScan {
icebergTable.schema().asStruct());
Assert.assertEquals(PartitionSpec.unpartitioned(), icebergTable.spec());
- if (!Catalogs.hiveCatalog(shell.getHiveConf())) {
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
+ Properties tableProperties = new Properties();
+ hmsTable.getParameters().entrySet().stream()
+ .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
+ .forEach(e -> tableProperties.put(e.getKey(), e.getValue()));
+ if (!Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
shell.executeStatement("DROP TABLE customers");
// Check if the table was really dropped even from the Catalog
@@ -182,7 +189,6 @@ public class TestHiveIcebergStorageHandlerNoScan {
}
);
} else {
- org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
Path hmsTableLocation = new Path(hmsTable.getSd().getLocation());
// Drop the table
@@ -207,6 +213,33 @@ public class TestHiveIcebergStorageHandlerNoScan {
}
@Test
+ public void testCreateDropTableNonDefaultCatalog() throws TException, InterruptedException {
+ TableIdentifier identifier = TableIdentifier.of("default", "customers");
+ String catalogName = "nondefaultcatalog";
+ testTables.properties().entrySet()
+ .forEach(e -> shell.setHiveSessionValue(e.getKey().replace(testTables.catalog, catalogName), e.getValue()));
+ String createSql = "CREATE EXTERNAL TABLE " + identifier +
+ " (customer_id BIGINT, first_name STRING COMMENT 'This is first name'," +
+ " last_name STRING COMMENT 'This is last name')" +
+ " STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of());
+ shell.executeStatement(createSql);
+
+ Table icebergTable = testTables.loadTable(identifier);
+ Assert.assertEquals(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA.asStruct(),
+ icebergTable.schema().asStruct());
+
+ shell.executeStatement("DROP TABLE default.customers");
+ // Check if the table was really dropped even from the Catalog
+ AssertHelpers.assertThrows("should throw exception", NoSuchTableException.class,
+ "Table does not exist", () -> {
+ testTables.loadTable(identifier);
+ }
+ );
+ }
+
+ @Test
public void testCreateTableWithoutSpec() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
@@ -214,7 +247,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
- SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "')");
+ SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" +
+ InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
// Check the Iceberg table partition data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -224,7 +258,6 @@ public class TestHiveIcebergStorageHandlerNoScan {
@Test
public void testCreateTableWithUnpartitionedSpec() {
TableIdentifier identifier = TableIdentifier.of("default", "customers");
-
// We need the location for HadoopTable based tests only
shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
@@ -232,7 +265,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
"'" + InputFormatConfig.PARTITION_SPEC + "'='" +
- PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "')");
+ PartitionSpecParser.toJson(PartitionSpec.unpartitioned()) + "', " +
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
// Check the Iceberg table partition data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -248,16 +282,21 @@ public class TestHiveIcebergStorageHandlerNoScan {
testTables.locationForCreateTableSQL(identifier) +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "', " +
- "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE')");
+ "'" + InputFormatConfig.EXTERNAL_TABLE_PURGE + "'='FALSE', " +
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
- if (!Catalogs.hiveCatalog(shell.getHiveConf())) {
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
+ Properties tableProperties = new Properties();
+ hmsTable.getParameters().entrySet().stream()
+ .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
+ .forEach(e -> tableProperties.put(e.getKey(), e.getValue()));
+ if (!Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
shell.executeStatement("DROP TABLE customers");
// Check if the table remains
testTables.loadTable(identifier);
} else {
// Check the HMS table parameters
- org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
Path hmsTableLocation = new Path(hmsTable.getSd().getLocation());
// Drop the table
@@ -287,7 +326,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
testTables.locationForCreateTableSQL(identifier) +
- "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema')");
+ "TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='WrongSchema'" +
+ ",'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
}
);
@@ -296,7 +336,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
"Please provide ", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier));
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
}
);
@@ -307,7 +348,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
shell.executeStatement("CREATE EXTERNAL TABLE withShell2 " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
- SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "')");
+ SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "','" +
+ InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
}
);
}
@@ -319,21 +361,23 @@ public class TestHiveIcebergStorageHandlerNoScan {
testTables.createIcebergTable(shell.getHiveConf(), "customers", COMPLEX_SCHEMA, FileFormat.PARQUET,
Collections.emptyList());
- if (Catalogs.hiveCatalog(shell.getHiveConf())) {
+ if (testTableType == TestTables.TestTableType.HIVE_CATALOG) {
// In HiveCatalog we just expect an exception since the table is already exists
AssertHelpers.assertThrows("should throw exception", IllegalArgumentException.class,
"customers already exists", () -> {
shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
"TBLPROPERTIES ('" + InputFormatConfig.TABLE_SCHEMA + "'='" +
- SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "')");
+ SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA) + "',' " +
+ InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
}
);
} else {
// With other catalogs, table creation should succeed
shell.executeStatement("CREATE EXTERNAL TABLE customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")));
+ testTables.locationForCreateTableSQL(TableIdentifier.of("default", "customers")) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
}
}
@@ -366,7 +410,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
"current_address STRUCT < street_address: STRUCT " +
"<street_number: INT, street_name: STRING, street_type: STRING>, country: STRING, postal_code: STRING >) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier));
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
// Check the Iceberg table data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -394,7 +439,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
"t_Float FLOaT, t_dOuble DOUBLE, t_boolean BOOLEAN, t_int INT, t_bigint BIGINT, t_binary BINARY, " +
"t_string STRING, t_timestamp TIMESTAMP, t_date DATE, t_decimal DECIMAL(3,2)) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier));
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
// Check the Iceberg table data
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
@@ -417,7 +463,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types " +
"(not_supported " + notSupportedType + ") " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier));
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
}
);
}
@@ -438,7 +485,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
for (String notSupportedType : notSupportedTypes.keySet()) {
shell.executeStatement("CREATE EXTERNAL TABLE not_supported_types (not_supported " + notSupportedType + ") " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier));
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
Assert.assertEquals(notSupportedTypes.get(notSupportedType), icebergTable.schema().columns().get(0).type());
@@ -453,7 +501,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
"t_int INT COMMENT 'int column', " +
"t_string STRING COMMENT 'string column') " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier));
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
List<Object[]> rows = shell.executeStatement("DESCRIBE default.comment_table");
@@ -472,7 +521,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
"t_int INT, " +
"t_string STRING) " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
- testTables.locationForCreateTableSQL(identifier));
+ testTables.locationForCreateTableSQL(identifier) +
+ testTables.propertiesForCreateTableSQL(ImmutableMap.of()));
org.apache.iceberg.Table icebergTable = testTables.loadTable(identifier);
List<Object[]> rows = shell.executeStatement("DESCRIBE default.without_comment_table");
@@ -491,11 +541,12 @@ public class TestHiveIcebergStorageHandlerNoScan {
shell.executeStatement(String.format("CREATE EXTERNAL TABLE default.customers " +
"STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' %s" +
- "TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s')",
+ "TBLPROPERTIES ('%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s')",
testTables.locationForCreateTableSQL(identifier), // we need the location for HadoopTable based tests only
InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA),
InputFormatConfig.PARTITION_SPEC, PartitionSpecParser.toJson(SPEC),
- "custom_property", "initial_val"));
+ "custom_property", "initial_val",
+ InputFormatConfig.CATALOG_NAME, Catalogs.ICEBERG_DEFAULT_CATALOG_NAME));
// Check the Iceberg table parameters
@@ -505,7 +556,17 @@ public class TestHiveIcebergStorageHandlerNoScan {
expectedIcebergProperties.put("custom_property", "initial_val");
expectedIcebergProperties.put("EXTERNAL", "TRUE");
expectedIcebergProperties.put("storage_handler", HiveIcebergStorageHandler.class.getName());
- if (Catalogs.hiveCatalog(shell.getHiveConf())) {
+
+ // Check the HMS table parameters
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
+ Map<String, String> hmsParams = hmsTable.getParameters()
+ .entrySet().stream()
+ .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ Properties tableProperties = new Properties();
+ tableProperties.putAll(hmsParams);
+
+ if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
expectedIcebergProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true");
}
if (MetastoreUtil.hive3PresentOnClasspath()) {
@@ -513,15 +574,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
}
Assert.assertEquals(expectedIcebergProperties, icebergTable.properties());
- // Check the HMS table parameters
- org.apache.hadoop.hive.metastore.api.Table hmsTable = shell.metastore().getTable("default", "customers");
- Map<String, String> hmsParams = hmsTable.getParameters()
- .entrySet().stream()
- .filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-
- if (Catalogs.hiveCatalog(shell.getHiveConf())) {
- Assert.assertEquals(9, hmsParams.size());
+ if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
+ Assert.assertEquals(10, hmsParams.size());
Assert.assertEquals("initial_val", hmsParams.get("custom_property"));
Assert.assertEquals("TRUE", hmsParams.get(InputFormatConfig.EXTERNAL_TABLE_PURGE));
Assert.assertEquals("TRUE", hmsParams.get("EXTERNAL"));
@@ -534,8 +588,9 @@ public class TestHiveIcebergStorageHandlerNoScan {
getCurrentSnapshotForHiveCatalogTable(icebergTable));
Assert.assertNull(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP));
Assert.assertNotNull(hmsParams.get(hive_metastoreConstants.DDL_TIME));
+ Assert.assertNotNull(hmsParams.get(InputFormatConfig.PARTITION_SPEC));
} else {
- Assert.assertEquals(7, hmsParams.size());
+ Assert.assertEquals(8, hmsParams.size());
Assert.assertNull(hmsParams.get(TableProperties.ENGINE_HIVE_ENABLED));
}
@@ -557,8 +612,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
.filter(e -> !IGNORED_PARAMS.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- if (Catalogs.hiveCatalog(shell.getHiveConf())) {
- Assert.assertEquals(12, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
+ if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
+ Assert.assertEquals(13, hmsParams.size()); // 2 newly-added properties + previous_metadata_location prop
Assert.assertEquals("true", hmsParams.get("new_prop_1"));
Assert.assertEquals("false", hmsParams.get("new_prop_2"));
Assert.assertEquals("new_val", hmsParams.get("custom_property"));
@@ -568,11 +623,11 @@ public class TestHiveIcebergStorageHandlerNoScan {
Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP), prevSnapshot);
Assert.assertEquals(hmsParams.get(BaseMetastoreTableOperations.METADATA_LOCATION_PROP), newSnapshot);
} else {
- Assert.assertEquals(7, hmsParams.size());
+ Assert.assertEquals(8, hmsParams.size());
}
// Remove some Iceberg props and see if they're removed from HMS table props as well
- if (Catalogs.hiveCatalog(shell.getHiveConf())) {
+ if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
icebergTable.updateProperties()
.remove("custom_property")
.remove("new_prop_1")
@@ -584,7 +639,7 @@ public class TestHiveIcebergStorageHandlerNoScan {
}
// append some data and check whether HMS stats are aligned with snapshot summary
- if (Catalogs.hiveCatalog(shell.getHiveConf())) {
+ if (Catalogs.hiveCatalog(shell.getHiveConf(), tableProperties)) {
List<Record> records = HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS;
testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, FileFormat.PARQUET, null, records);
hmsParams = shell.metastore().getTable("default", "customers").getParameters();
@@ -642,7 +697,8 @@ public class TestHiveIcebergStorageHandlerNoScan {
@Test
public void testDropHiveTableWithoutUnderlyingTable() throws IOException {
- Assume.assumeFalse("Not relevant for HiveCatalog", Catalogs.hiveCatalog(shell.getHiveConf()));
+ Assume.assumeFalse("Not relevant for HiveCatalog",
+ testTableType.equals(TestTables.TestTableType.HIVE_CATALOG));
TableIdentifier identifier = TableIdentifier.of("default", "customers");
// Create the Iceberg table in non-HiveCatalog
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
new file mode 100644
index 0000000..e1c23f3
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithMultipleCatalogs.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestHiveIcebergStorageHandlerWithMultipleCatalogs {
+
+ private static final String[] EXECUTION_ENGINES = new String[] { "tez", "mr" };
+ private static final String HIVECATALOGNAME = "table1_catalog";
+ private static final String OTHERCATALOGNAME = "table2_catalog";
+ private static TestHiveShell shell;
+
+ @Parameterized.Parameter(0)
+ public FileFormat fileFormat1;
+ @Parameterized.Parameter(1)
+ public FileFormat fileFormat2;
+ @Parameterized.Parameter(2)
+ public String executionEngine;
+ @Parameterized.Parameter(3)
+ public TestTables.TestTableType testTableType1;
+ @Parameterized.Parameter(4)
+ public String table1CatalogName;
+ @Parameterized.Parameter(5)
+ public TestTables.TestTableType testTableType2;
+ @Parameterized.Parameter(6)
+ public String table2CatalogName;
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+ private TestTables testTables1;
+ private TestTables testTables2;
+
+ @Parameterized.Parameters(name = "fileFormat1={0}, fileFormat2={1}, engine={2}, tableType1={3}, catalogName1={4}, " +
+ "tableType2={5}, catalogName2={6}")
+ public static Collection<Object[]> parameters() {
+ Collection<Object[]> testParams = new ArrayList<>();
+ String javaVersion = System.getProperty("java.specification.version");
+
+ // Run tests with PARQUET and ORC file formats for a two Catalogs
+ for (String engine : EXECUTION_ENGINES) {
+ // include Tez tests only for Java 8
+ if (javaVersion.equals("1.8") || "mr".equals(engine)) {
+ for (TestTables.TestTableType testTableType : TestTables.ALL_TABLE_TYPES) {
+ if (!TestTables.TestTableType.HIVE_CATALOG.equals(testTableType)) {
+ testParams.add(new Object[]{FileFormat.PARQUET, FileFormat.ORC, engine,
+ TestTables.TestTableType.HIVE_CATALOG, HIVECATALOGNAME, testTableType, OTHERCATALOGNAME});
+ }
+ }
+ }
+ }
+ return testParams;
+ }
+
+ @BeforeClass
+ public static void beforeClass() {
+ shell = HiveIcebergStorageHandlerTestUtils.shell();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ shell.stop();
+ }
+
+ @Before
+ public void before() throws IOException {
+ testTables1 = HiveIcebergStorageHandlerTestUtils.testTables(shell, testTableType1, temp, table1CatalogName);
+ HiveIcebergStorageHandlerTestUtils.init(shell, testTables1, temp, executionEngine);
+ testTables1.properties().entrySet().forEach(e -> shell.setHiveSessionValue(e.getKey(), e.getValue()));
+
+ testTables2 = HiveIcebergStorageHandlerTestUtils.testTables(shell, testTableType2, temp, table2CatalogName);
+ testTables2.properties().entrySet().forEach(e -> shell.setHiveSessionValue(e.getKey(), e.getValue()));
+ }
+
+ @After
+ public void after() throws Exception {
+ HiveIcebergStorageHandlerTestUtils.close(shell);
+ }
+
+ @Test
+ public void testJoinTablesFromDifferentCatalogs() throws IOException {
+ createAndAddRecords(testTables1, fileFormat1, TableIdentifier.of("default", "customers1"), table1CatalogName,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+ createAndAddRecords(testTables2, fileFormat2, TableIdentifier.of("default", "customers2"), table2CatalogName,
+ HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS);
+
+ List<Object[]> rows = shell.executeStatement("SELECT c2.customer_id, c2.first_name, c2.last_name " +
+ "FROM default.customers2 c2 JOIN default.customers1 c1 ON c2.customer_id = c1.customer_id " +
+ "ORDER BY c2.customer_id");
+ Assert.assertEquals(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS.size(), rows.size());
+ HiveIcebergTestUtils.validateData(new ArrayList<>(HiveIcebergStorageHandlerTestUtils.CUSTOMER_RECORDS),
+ HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, rows), 0);
+ }
+
+ private void createAndAddRecords(TestTables testTables, FileFormat fileFormat, TableIdentifier identifier,
+ String catalogName, List<Record> records) throws IOException {
+ String createSql =
+ "CREATE EXTERNAL TABLE " + identifier + " (customer_id BIGINT, first_name STRING, last_name STRING)" +
+ " STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' " +
+ testTables.locationForCreateTableSQL(identifier) +
+ " TBLPROPERTIES ('" + InputFormatConfig.CATALOG_NAME + "'='" + catalogName + "')";
+ shell.executeStatement(createSql);
+ Table icebergTable = testTables.loadTable(identifier);
+ testTables.appendIcebergTable(shell.getHiveConf(), icebergTable, fileFormat, null, records);
+ }
+
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index 55c7b58..4d3a60e 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -26,6 +26,7 @@ import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -48,6 +49,7 @@ import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.MetastoreUtil;
+import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.TestCatalogs;
import org.apache.iceberg.mr.TestHelper;
@@ -71,14 +73,16 @@ abstract class TestTables {
private final Tables tables;
protected final TemporaryFolder temp;
+ protected final String catalog;
- protected TestTables(Tables tables, TemporaryFolder temp) {
+ protected TestTables(Tables tables, TemporaryFolder temp, String catalogName) {
this.tables = tables;
this.temp = temp;
+ this.catalog = catalogName;
}
- protected TestTables(Catalog catalog, TemporaryFolder temp) {
- this(new CatalogToTables(catalog), temp);
+ protected TestTables(Catalog catalog, TemporaryFolder temp, String catalogName) {
+ this(new CatalogToTables(catalog), temp, catalogName);
}
public Map<String, String> properties() {
@@ -103,6 +107,20 @@ abstract class TestTables {
public abstract String locationForCreateTableSQL(TableIdentifier identifier);
/**
+ * The table properties string needed for the CREATE TABLE ... commands,
+ * like "TBLPROPERTIES('iceberg.catalog'='mycatalog')
+ * @return
+ */
+ public String propertiesForCreateTableSQL(Map<String, String> tableProperties) {
+ Map<String, String> properties = new HashMap<>(tableProperties);
+ properties.putIfAbsent(InputFormatConfig.CATALOG_NAME, catalog);
+ String props = properties.entrySet().stream()
+ .map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue()))
+ .collect(Collectors.joining(","));
+ return " TBLPROPERTIES (" + props + ")";
+ }
+
+ /**
* If an independent Hive table creation is needed for the given Catalog then this should return the Hive SQL
* string which we have to execute. Overridden for HiveCatalog where the Hive table is immediately created
* during the Iceberg table creation so no extra sql execution is required.
@@ -113,15 +131,9 @@ abstract class TestTables {
public String createHiveTableSQL(TableIdentifier identifier, Map<String, String> tableProps) {
Preconditions.checkArgument(!identifier.namespace().isEmpty(), "Namespace should not be empty");
Preconditions.checkArgument(identifier.namespace().levels().length == 1, "Namespace should be single level");
- String sql = String.format("CREATE TABLE %s.%s STORED BY '%s' %s", identifier.namespace(), identifier.name(),
- HiveIcebergStorageHandler.class.getName(), locationForCreateTableSQL(identifier));
- if (tableProps != null && !tableProps.isEmpty()) {
- String props = tableProps.entrySet().stream()
- .map(entry -> String.format("'%s'='%s'", entry.getKey(), entry.getValue()))
- .collect(Collectors.joining(","));
- sql += " TBLPROPERTIES (" + props + ")";
- }
- return sql;
+ return String.format("CREATE TABLE %s.%s STORED BY '%s' %s %s", identifier.namespace(), identifier.name(),
+ HiveIcebergStorageHandler.class.getName(), locationForCreateTableSQL(identifier),
+ propertiesForCreateTableSQL(tableProps));
}
/**
@@ -179,7 +191,8 @@ abstract class TestTables {
SchemaParser.toJson(schema) + "', " +
"'" + InputFormatConfig.PARTITION_SPEC + "'='" +
PartitionSpecParser.toJson(spec) + "', " +
- "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "')");
+ "'" + TableProperties.DEFAULT_FILE_FORMAT + "'='" + fileFormat + "', " +
+ "'" + InputFormatConfig.CATALOG_NAME + "'='" + Catalogs.ICEBERG_DEFAULT_CATALOG_NAME + "')");
if (records != null && !records.isEmpty()) {
StringBuilder query = new StringBuilder().append("INSERT INTO " + identifier + " VALUES ");
@@ -296,21 +309,24 @@ abstract class TestTables {
private final String warehouseLocation;
- CustomCatalogTestTables(Configuration conf, TemporaryFolder temp) throws IOException {
+ CustomCatalogTestTables(Configuration conf, TemporaryFolder temp, String catalogName) throws IOException {
this(conf, temp, (MetastoreUtil.hive3PresentOnClasspath() ? "file:" : "") +
- temp.newFolder("custom", "warehouse").toString());
+ temp.newFolder("custom", "warehouse").toString(), catalogName);
}
- CustomCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation) {
- super(new TestCatalogs.CustomHadoopCatalog(conf, warehouseLocation), temp);
+ CustomCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation, String catalogName) {
+ super(new TestCatalogs.CustomHadoopCatalog(conf, warehouseLocation), temp, catalogName);
this.warehouseLocation = warehouseLocation;
}
@Override
public Map<String, String> properties() {
return ImmutableMap.of(
- InputFormatConfig.CATALOG_LOADER_CLASS, TestCatalogs.CustomHadoopCatalogLoader.class.getName(),
- TestCatalogs.CustomHadoopCatalog.WAREHOUSE_LOCATION, warehouseLocation
+ String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "custom",
+ String.format(InputFormatConfig.CATALOG_CLASS_TEMPLATE, catalog),
+ TestCatalogs.CustomHadoopCatalog.class.getName(),
+ String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalog),
+ warehouseLocation
);
}
@@ -325,21 +341,21 @@ abstract class TestTables {
private final String warehouseLocation;
- HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp) throws IOException {
+ HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp, String catalogName) throws IOException {
this(conf, temp, (MetastoreUtil.hive3PresentOnClasspath() ? "file:" : "") +
- temp.newFolder("hadoop", "warehouse").toString());
+ temp.newFolder("hadoop", "warehouse").toString(), catalogName);
}
- HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation) {
- super(new HadoopCatalog(conf, warehouseLocation), temp);
+ HadoopCatalogTestTables(Configuration conf, TemporaryFolder temp, String warehouseLocation, String catalogName) {
+ super(new HadoopCatalog(conf, warehouseLocation), temp, catalogName);
this.warehouseLocation = warehouseLocation;
}
@Override
public Map<String, String> properties() {
return ImmutableMap.of(
- InputFormatConfig.CATALOG, "hadoop",
- InputFormatConfig.HADOOP_CATALOG_WAREHOUSE_LOCATION, warehouseLocation
+ String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "hadoop",
+ String.format(InputFormatConfig.CATALOG_WAREHOUSE_TEMPLATE, catalog), warehouseLocation
);
}
@@ -349,8 +365,8 @@ abstract class TestTables {
}
static class HadoopTestTables extends TestTables {
- HadoopTestTables(Configuration conf, TemporaryFolder temp) {
- super(new HadoopTables(conf), temp);
+ HadoopTestTables(Configuration conf, TemporaryFolder temp, String catalogName) {
+ super(new HadoopTables(conf), temp, catalogName);
}
@Override
@@ -382,14 +398,14 @@ abstract class TestTables {
static class HiveTestTables extends TestTables {
- HiveTestTables(Configuration conf, TemporaryFolder temp) {
+ HiveTestTables(Configuration conf, TemporaryFolder temp, String catalogName) {
super(CatalogUtil.loadCatalog(HiveCatalog.class.getName(), CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
- ImmutableMap.of(), conf), temp);
+ ImmutableMap.of(), conf), temp, catalogName);
}
@Override
public Map<String, String> properties() {
- return ImmutableMap.of(InputFormatConfig.CATALOG, "hive");
+ return ImmutableMap.of(String.format(InputFormatConfig.CATALOG_TYPE_TEMPLATE, catalog), "hive");
}
@Override
@@ -423,26 +439,29 @@ abstract class TestTables {
enum TestTableType {
HADOOP_TABLE {
- public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {
- return new HadoopTestTables(conf, temporaryFolder);
+ public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName) {
+ return new HadoopTestTables(conf, temporaryFolder, catalogName);
}
},
HADOOP_CATALOG {
- public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException {
- return new HadoopCatalogTestTables(conf, temporaryFolder);
+ public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName)
+ throws IOException {
+ return new HadoopCatalogTestTables(conf, temporaryFolder, catalogName);
}
},
CUSTOM_CATALOG {
- public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException {
- return new CustomCatalogTestTables(conf, temporaryFolder);
+ public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName)
+ throws IOException {
+ return new CustomCatalogTestTables(conf, temporaryFolder, catalogName);
}
},
HIVE_CATALOG {
- public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) {
- return new HiveTestTables(conf, temporaryFolder);
+ public TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName) {
+ return new HiveTestTables(conf, temporaryFolder, catalogName);
}
};
- public abstract TestTables instance(Configuration conf, TemporaryFolder temporaryFolder) throws IOException;
+ public abstract TestTables instance(Configuration conf, TemporaryFolder temporaryFolder, String catalogName)
+ throws IOException;
}
}