You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/10/05 23:04:09 UTC

[iceberg] branch master updated: Hive: Make HiveCatalog based tables readable from Hive (#1505)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b403009  Hive: Make HiveCatalog based tables readable from Hive (#1505)
b403009 is described below

commit b4030093b222a96a979d2964eecd6cdd3e9f7608
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Oct 6 01:03:59 2020 +0200

    Hive: Make HiveCatalog based tables readable from Hive (#1505)
---
 .../java/org/apache/iceberg/TableProperties.java   |  3 +
 .../apache/iceberg/hadoop/ConfigProperties.java    | 28 +++++++
 .../apache/iceberg/hive/HiveTableOperations.java   | 53 +++++++++++--
 .../org/apache/iceberg/hive/HiveTableTest.java     | 86 ++++++++++++++++++++++
 .../java/org/apache/iceberg/mr/TestHelper.java     |  3 +-
 ...stHiveIcebergStorageHandlerWithHiveCatalog.java | 20 +----
 6 files changed, 167 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java
index ff7e47b..6193b0c 100644
--- a/core/src/main/java/org/apache/iceberg/TableProperties.java
+++ b/core/src/main/java/org/apache/iceberg/TableProperties.java
@@ -126,4 +126,7 @@ public class TableProperties {
 
   public static final String SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled";
   public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false;
+
+  public static final String ENGINE_HIVE_ENABLED = "engine.hive.enabled";
+  public static final boolean ENGINE_HIVE_ENABLED_DEFAULT = false;
 }
diff --git a/core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java b/core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java
new file mode 100644
index 0000000..bd0a414
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/hadoop/ConfigProperties.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.hadoop;
+
+public class ConfigProperties {
+
+  private ConfigProperties() {
+  }
+
+  public static final String ENGINE_HIVE_ENABLED = "iceberg.engine.hive.enabled";
+}
diff --git a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
index 23e98df..ed3c96e 100644
--- a/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
+++ b/hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
@@ -45,14 +45,17 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -140,6 +143,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
   @Override
   protected void doCommit(TableMetadata base, TableMetadata metadata) {
     String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1);
+    boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
 
     boolean threw = true;
     Optional<Long> lockId = Optional.empty();
@@ -150,7 +154,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
       if (base != null) {
         LOG.debug("Committing existing table: {}", fullName);
         tbl = metaClients.run(client -> client.getTable(database, tableName));
-        tbl.setSd(storageDescriptor(metadata)); // set to pickup any schema changes
+        tbl.setSd(storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes
       } else {
         LOG.debug("Committing new table: {}", fullName);
         final long currentTimeMillis = System.currentTimeMillis();
@@ -160,7 +164,7 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
             (int) currentTimeMillis / 1000,
             (int) currentTimeMillis / 1000,
             Integer.MAX_VALUE,
-            storageDescriptor(metadata),
+            storageDescriptor(metadata, hiveEngineEnabled),
             Collections.emptyList(),
             new HashMap<>(),
             null,
@@ -169,6 +173,14 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
         tbl.getParameters().put("EXTERNAL", "TRUE"); // using the external table type also requires this
       }
 
+      // If needed set the 'storate_handler' property to enable query from Hive
+      if (hiveEngineEnabled) {
+        tbl.getParameters().put(hive_metastoreConstants.META_TABLE_STORAGE,
+            "org.apache.iceberg.mr.hive.HiveIcebergStorageHandler");
+      } else {
+        tbl.getParameters().remove(hive_metastoreConstants.META_TABLE_STORAGE);
+      }
+
       String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
       String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
       if (!Objects.equals(baseMetadataLocation, metadataLocation)) {
@@ -236,15 +248,21 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     tbl.setParameters(parameters);
   }
 
-  private StorageDescriptor storageDescriptor(TableMetadata metadata) {
+  private StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {
 
     final StorageDescriptor storageDescriptor = new StorageDescriptor();
     storageDescriptor.setCols(columns(metadata.schema()));
     storageDescriptor.setLocation(metadata.location());
-    storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat");
-    storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
     SerDeInfo serDeInfo = new SerDeInfo();
-    serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+    if (hiveEngineEnabled) {
+      storageDescriptor.setInputFormat(null);
+      storageDescriptor.setOutputFormat(null);
+      serDeInfo.setSerializationLib("org.apache.iceberg.mr.hive.HiveIcebergSerDe");
+    } else {
+      storageDescriptor.setOutputFormat("org.apache.hadoop.mapred.FileOutputFormat");
+      storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
+      serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
+    }
     storageDescriptor.setSerdeInfo(serDeInfo);
     return storageDescriptor;
   }
@@ -319,4 +337,27 @@ public class HiveTableOperations extends BaseMetastoreTableOperations {
     NoSuchIcebergTableException.check(table.getParameters().get(METADATA_LOCATION_PROP) != null,
         "Not an iceberg table: %s missing %s", fullName, METADATA_LOCATION_PROP);
   }
+
+  /**
+   * Returns if the hive engine related values should be enabled on the table, or not.
+   * <p>
+   * The decision is made like this:
+   * <ol>
+   * <li>Table property value {@link TableProperties#ENGINE_HIVE_ENABLED}
+   * <li>If the table property is not set then check the hive-site.xml property value
+   * {@link ConfigProperties#ENGINE_HIVE_ENABLED}
+   * <li>If none of the above is enabled then use the default value {@link TableProperties#ENGINE_HIVE_ENABLED_DEFAULT}
+   * </ol>
+   * @param metadata Table metadata to use
+   * @param conf The hive configuration to use
+   * @return if the hive engine related values should be enabled or not
+   */
+  private static boolean hiveEngineEnabled(TableMetadata metadata, Configuration conf) {
+    if (metadata.properties().get(TableProperties.ENGINE_HIVE_ENABLED) != null) {
+      // We know that the property is set, so default value will not be used,
+      return metadata.propertyAsBoolean(TableProperties.ENGINE_HIVE_ENABLED, false);
+    }
+
+    return conf.getBoolean(ConfigProperties.ENGINE_HIVE_ENABLED, TableProperties.ENGINE_HIVE_ENABLED_DEFAULT);
+  }
 }
diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
index 916c9ef..ae39997 100644
--- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
+++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java
@@ -22,24 +22,29 @@ package org.apache.iceberg.hive;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.HasTableOperations;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroSchemaUtil;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.ConfigProperties;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.types.Types;
@@ -325,4 +330,85 @@ public class HiveTableTest extends HiveTableBaseTest {
     metastoreClient.dropDatabase(NON_DEFAULT_DATABASE, true, true, true);
   }
 
+  @Test
+  public void testEngineHiveEnabledDefault() throws TException {
+    // Drop the previously created table to make place for the new one
+    catalog.dropTable(TABLE_IDENTIFIER);
+
+    // Unset in hive-conf
+    hiveConf.unset(ConfigProperties.ENGINE_HIVE_ENABLED);
+
+    catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned());
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+    assertHiveEnabled(hmsTable, false);
+  }
+
+  @Test
+  public void testEngineHiveEnabledConfig() throws TException {
+    // Drop the previously created table to make place for the new one
+    catalog.dropTable(TABLE_IDENTIFIER);
+
+    // Enable by hive-conf
+    hiveConf.set(ConfigProperties.ENGINE_HIVE_ENABLED, "true");
+
+    catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned());
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+    assertHiveEnabled(hmsTable, true);
+
+    catalog.dropTable(TABLE_IDENTIFIER);
+
+    // Disable by hive-conf
+    hiveConf.set(ConfigProperties.ENGINE_HIVE_ENABLED, "false");
+
+    catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned());
+    hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+    assertHiveEnabled(hmsTable, false);
+  }
+
+  @Test
+  public void testEngineHiveEnabledTableProperty() throws TException {
+    // Drop the previously created table to make place for the new one
+    catalog.dropTable(TABLE_IDENTIFIER);
+
+    // Enabled by table property - also check that the hive-conf is ignored
+    Map<String, String> tableProperties = new HashMap<>();
+    tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true");
+    hiveConf.set(ConfigProperties.ENGINE_HIVE_ENABLED, "false");
+
+    catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned(), tableProperties);
+    org.apache.hadoop.hive.metastore.api.Table hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+    assertHiveEnabled(hmsTable, true);
+
+    catalog.dropTable(TABLE_IDENTIFIER);
+
+    // Disabled by table property - also check that the hive-conf is ignored
+    tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "false");
+    hiveConf.set(ConfigProperties.ENGINE_HIVE_ENABLED, "true");
+
+    catalog.createTable(TABLE_IDENTIFIER, schema, PartitionSpec.unpartitioned(), tableProperties);
+    hmsTable = metastoreClient.getTable(DB_NAME, TABLE_NAME);
+
+    assertHiveEnabled(hmsTable, false);
+  }
+
+  private void assertHiveEnabled(org.apache.hadoop.hive.metastore.api.Table hmsTable, boolean expected) {
+    if (expected) {
+      Assert.assertEquals("org.apache.iceberg.mr.hive.HiveIcebergStorageHandler",
+          hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE));
+      Assert.assertEquals("org.apache.iceberg.mr.hive.HiveIcebergSerDe",
+          hmsTable.getSd().getSerdeInfo().getSerializationLib());
+      Assert.assertNull(hmsTable.getSd().getInputFormat());
+      Assert.assertNull(hmsTable.getSd().getOutputFormat());
+    } else {
+      Assert.assertNull(hmsTable.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE));
+      Assert.assertEquals("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
+          hmsTable.getSd().getSerdeInfo().getSerializationLib());
+      Assert.assertEquals("org.apache.hadoop.mapred.FileInputFormat", hmsTable.getSd().getInputFormat());
+      Assert.assertEquals("org.apache.hadoop.mapred.FileOutputFormat", hmsTable.getSd().getOutputFormat());
+    }
+  }
 }
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
index 91b814b..394b747 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -74,7 +74,8 @@ public class TestHelper {
   }
 
   public Map<String, String> properties() {
-    return ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+    return ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name(),
+        TableProperties.ENGINE_HIVE_ENABLED, "true");
   }
 
   public Table createTable(Schema theSchema, PartitionSpec theSpec) {
diff --git a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java
index e3ede72..22e3474 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithHiveCatalog.java
@@ -20,10 +20,6 @@
 package org.apache.iceberg.mr.hive;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.thrift.TException;
 import org.junit.rules.TemporaryFolder;
 
 public class TestHiveIcebergStorageHandlerWithHiveCatalog extends HiveIcebergStorageHandlerBaseTest {
@@ -36,20 +32,6 @@ public class TestHiveIcebergStorageHandlerWithHiveCatalog extends HiveIcebergSto
   @Override
   protected void createHiveTable(String tableName, String location) {
     // The Hive catalog has already created the Hive table so there's no need to issue another
-    // 'CREATE TABLE ...' statement. However, we still need to set up the storage handler properly,
-    // which can't be done directly using the Hive DDL so we resort to the HMS API.
-    try {
-      IMetaStoreClient client = new HiveMetaStoreClient(metastore.hiveConf());
-      Table table = client.getTable("default", tableName);
-
-      table.getParameters().put("storage_handler", HiveIcebergStorageHandler.class.getName());
-      table.getSd().getSerdeInfo().setSerializationLib(HiveIcebergSerDe.class.getName());
-      table.getSd().setInputFormat(null);
-      table.getSd().setOutputFormat(null);
-
-      client.alter_table("default", tableName, table);
-    } catch (TException te) {
-      throw new RuntimeException(te);
-    }
+    // 'CREATE TABLE ...' statement.
   }
 }