You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/05 23:04:09 UTC
[iceberg] branch master updated: Hive: Make HiveCatalog based
tables readable from Hive (#1505)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b403009 Hive: Make HiveCatalog based tables readable from Hive (#1505)
b403009 is described below
commit b4030093b222a96a979d2964eecd6cdd3e9f7608
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Oct 6 01:03:59 2020 +0200
Hive: Make HiveCatalog based tables readable from Hive (#1505)
---
.../java/org/apache/iceberg/TableProperties.java | 3 +
.../apache/iceberg/hadoop/ConfigProperties.java | 28 +++++++
.../apache/iceberg/hive/HiveTableOperations.java | 53 +++++++++++--
.../org/apache/iceberg/hive/HiveTableTest.java | 86 ++++++++++++++++++++++
.../java/org/apache/iceberg/mr/TestHelper.java | 3 +-
...stHiveIcebergStorageHandlerWithHiveCatalog.java | 20 +----
6 files changed, 167 insertions(+), 26 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index ff7e47b..6193b0c 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -126,4 +126,7 @@ public class TableProperties {
public static final String SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled";
public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false;
+
+ public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
+ public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
}
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java b/core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java
new file mode 100644
index 0000000..bd0a414
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+public class ConfigProperties {
+
+ private ConfigProperties() {
+ }
+
+ public static final String ENGINE_HIVE_ENABLED = "iceberg.engine.hive.enabled";
+}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 23e98df..ed3c96e 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -45,14 +45,17 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.Schema;
import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -140,6 +143,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
@Override
protected void doCommit(TableMetadata base, TableMetadata metadata) {
String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+ boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
boolean threw = true;
Optional<Long> lockId = Optional.empty();
@@ -150,7 +154,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
if (base != null) {
LOG.debug("Committing existing table: {}", fullName);
tbl = metaClients.run(client -> client.getTable(database, tableName));
- tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
+ tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes
} else {
LOG.debug("Committing new table: {}", fullName);
final long currentTimeMillis = System.currentTimeMillis();
@@ -160,7 +164,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
(int) currentTimeMillis / 1000,
(int) currentTimeMillis / 1000,
Integer.MAX_VALUE,
- storageDescriptor(metadata),
+ storageDescriptor(metadata, hiveEngineEnabled),
Collections.emptyList(),
new HashMap<>(),
null,
@@ -169,6 +173,14 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
tbl.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
}
+ // If needed set the 'storate_handler' property to enable query from Hive
+ if (hiveEngineEnabled) {
+ tbl.getParameters().put(hive_metastoreConstants.META_TABLE_STORAGE,
+ "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler");
+ } else {
+ tbl.getParameters().remove(hive_metastoreConstants.META_TABLE_STORAGE);
+ }
+
String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
@@ -236,15 +248,21 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
tbl.setParameters(parameters);
}
- private StorageDescriptor storageDescriptor(TableMetadata metadata) {
+ private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {
final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(columns(metadata.schema()));
storageDescriptor.setLocation(metadata.location());
- storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat");
- storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
SerDeInfo serDeInfo = new SerDeInfo();
- serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+ if (hiveEngineEnabled) {
+ storageDescriptor.setInputFormat(null);
+ storageDescriptor.setOutputFormat(null);
+ serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe");
+ } else {
+ storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat");
+ storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
+ serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+ }
storageDescriptor.setSerdeInfo(serDeInfo);
return storageDescriptor;
}
@@ -319,4 +337,27 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
NoSuchIcebergTableException.check(table.getParameters().get(METADATA_LOCATION_PROP) != null,
"Not an iceberg table: %s missing %s", fullName, METADATA_LOCATION_PROP);
}
+
+ /**
+ * Returns if the hive engine related values should be enabled on the table, or not.
+ * <p>
+ * The decision is made like this:
+ * <ol>
+ * <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
+ * <li>If the table property is not set then check the hive-site.xml property value
+ * {@link ConfigProperties#ENGINE_HIVE_ENABLED}
+ * <li>If none of the above is enabled then use the default value {@link TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
+ * </ol>
+ * @param metadata Table metadata to use
+ * @param conf The hive configuration to use
+ * @return if the hive engine related values should be enabled or not
+ */
+ private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration conf) {
+ if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) {
+ // We know that the property is set, so default value will not be used,
+ return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false);
+ }
+
+ return conf.getBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
+ }
}
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index 916c9ef..ae39997 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -22,24 +22,29 @@ package org.apache.iceberg.hive;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Files;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.ConfigProperties;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
@@ -325,4 +330,85 @@ public class HiveTableTest extends HiveTableBaseTest {
metastoreClient.dropDatabase(NON_DEFAULT_DATABASE, true, true, true);
}
+ @Test
+ public void testEngineHiveEnabledDefault() throws TException {
+ // Drop the previously created table to make place for the new one
+ catalog.dropTable(TABLE_IDENTIFIER);
+
+ // Unset in hive-conf
+ hiveConf.unset(ConfigProperties.ENGINE_HIVE_ENABLED);
+
+ catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned());
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+ assertHiveEnabled(hmsTable, false);
+ }
+
+ @Test
+ public void testEngineHiveEnabledConfig() throws TException {
+ // Drop the previously created table to make place for the new one
+ catalog.dropTable(TABLE_IDENTIFIER);
+
+ // Enable by hive-conf
+ hiveConf.set(ConfigProperties.ENGINE_HIVE_ENABLED, "true");
+
+ catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned());
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+ assertHiveEnabled(hmsTable, true);
+
+ catalog.dropTable(TABLE_IDENTIFIER);
+
+ // Disable by hive-conf
+ hiveConf.set(ConfigProperties.ENGINE_HIVE_ENABLED, "false");
+
+ catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned());
+ hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+ assertHiveEnabled(hmsTable, false);
+ }
+
+ @Test
+ public void testEngineHiveEnabledTableProperty() throws TException {
+ // Drop the previously created table to make place for the new one
+ catalog.dropTable(TABLE_IDENTIFIER);
+
+ // Enabled by table property - also check that the hive-conf is ignored
+ Map<String, String> tableProperties = new HashMap<>();
+ tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true");
+ hiveConf.set(ConfigProperties.ENGINE_HIVE_ENABLED, "false");
+
+ catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned(), tableProperties);
+ org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+ assertHiveEnabled(hmsTable, true);
+
+ catalog.dropTable(TABLE_IDENTIFIER);
+
+ // Disabled by table property - also check that the hive-conf is ignored
+ tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "false");
+ hiveConf.set(ConfigProperties.ENGINE_HIVE_ENABLED, "true");
+
+ catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned(), tableProperties);
+ hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+ assertHiveEnabled(hmsTable, false);
+ }
+
+ private void assertHiveEnabled(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean expected) {
+ if (expected) {
+ Assert.assertEquals("org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+ hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE));
+ Assert.assertEquals("org.apache.iceberg.mr.hive.HiveIcebergSerDe",
+ hmsTable.getSd().getSerdeInfo().getSerializationLib());
+ Assert.assertNull(hmsTable.getSd().getInputFormat());
+ Assert.assertNull(hmsTable.getSd().getOutputFormat());
+ } else {
+ Assert.assertNull(hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE));
+ Assert.assertEquals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
+ hmsTable.getSd().getSerdeInfo().getSerializationLib());
+ Assert.assertEquals("org.apache.hadoop.mapred.FileInputFormat", hmsTable.getSd().getInputFormat());
+ Assert.assertEquals("org.apache.hadoop.mapred.FileOutputFormat", hmsTable.getSd().getOutputFormat());
+ }
+ }
}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
index 91b814b..394b747 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -74,7 +74,8 @@ public class TestHelper {
}
public Map<String, String> properties() {
- return ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+ return ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name(),
+ TableProperties.ENGINE_HIVE_ENABLED, "true");
}
public Table createTable(Schema theSchema, PartitionSpec theSpec) {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java
index e3ede72..22e3474 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java
@@ -20,10 +20,6 @@
package org.apache.iceberg.mr.hive;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
import org.junit.rules.TemporaryFolder;
public class TestHiveIcebergStorageHandlerWithHiveCatalog extends HiveIcebergStorageHandlerBaseTest {
@@ -36,20 +32,6 @@ public class TestHiveIcebergStorageHandlerWithHiveCatalog extends HiveIcebergSto
@Override
protected void createHiveTable(String tableName, String location) {
// The Hive catalog has already created the Hive table so there's no need to issue another
- // 'CREATE TABLE ...' statement. However, we still need to set up the storage handler properly,
- // which can't be done directly using the Hive DDL so we resort to the HMS API.
- try {
- IMetaStoreClient client = new HiveMetaStoreClient(metastore.hiveConf());
- Table table = client.getTable("default", tableName);
-
- table.getParameters().put("storage_handler", HiveIcebergStorageHandler.class.getName());
- table.getSd().getSerdeInfo().setSerializationLib(HiveIcebergSerDe.class.getName());
- table.getSd().setInputFormat(null);
- table.getSd().setOutputFormat(null);
-
- client.alter_table("default", tableName, table);
- } catch (TException te) {
- throw new RuntimeException(te);
- }
+ // 'CREATE TABLE ...' statement.
}
}