You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/09/21 15:04:40 UTC

[incubator-hudi] branch master updated: HUDI-180 : Adding support for hive registration using metastore along with JDBC

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1104f95  HUDI-180 : Adding support for hive registration using metastore along with JDBC
1104f95 is described below

commit 1104f9526fcd05270ed5cd084228f023f1a01c46
Author: Nishith Agarwal <na...@uber.com>
AuthorDate: Fri Aug 30 12:30:56 2019 -0700

    HUDI-180 : Adding support for hive registration using metastore along with JDBC
---
 .../java/org/apache/hudi/hive/HiveSyncConfig.java  |  40 ++---
 .../org/apache/hudi/hive/HoodieHiveClient.java     | 161 ++++++++++++++++-----
 .../org/apache/hudi/hive/HiveSyncToolTest.java     |  23 +++
 3 files changed, 173 insertions(+), 51 deletions(-)

diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 8296163..2fc3a2e 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -48,11 +48,10 @@ public class HiveSyncConfig implements Serializable {
       "--base-path"}, description = "Basepath of hoodie dataset to sync", required = true)
   public String basePath;
 
-  @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by",
-      required = false)
+  @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
   public List<String> partitionFields = new ArrayList<>();
 
-  @Parameter(names = "-partition-value-extractor", description = "Class which implements "
+  @Parameter(names = "--partition-value-extractor", description = "Class which implements "
       + "PartitionValueExtractor "
       + "to extract the partition "
       + "values from HDFS path")
@@ -74,9 +73,27 @@ public class HiveSyncConfig implements Serializable {
       + "org.apache.hudi input format.")
   public Boolean usePreApacheInputFormat = false;
 
+  @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
+  public Boolean useJdbc = true;
+
   @Parameter(names = {"--help", "-h"}, help = true)
   public Boolean help = false;
 
+  public static HiveSyncConfig copy(HiveSyncConfig cfg) {
+    HiveSyncConfig newConfig = new HiveSyncConfig();
+    newConfig.basePath = cfg.basePath;
+    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
+    newConfig.databaseName = cfg.databaseName;
+    newConfig.hivePass = cfg.hivePass;
+    newConfig.hiveUser = cfg.hiveUser;
+    newConfig.partitionFields = cfg.partitionFields;
+    newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
+    newConfig.jdbcUrl = cfg.jdbcUrl;
+    newConfig.tableName = cfg.tableName;
+    newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
+    return newConfig;
+  }
+
   @Override
   public String toString() {
     return "HiveSyncConfig{"
@@ -89,22 +106,9 @@ public class HiveSyncConfig implements Serializable {
         + ", partitionFields=" + partitionFields
         + ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
         + ", assumeDatePartitioning=" + assumeDatePartitioning
+        + ", usePreApacheInputFormat=" + usePreApacheInputFormat
+        + ", useJdbc=" + useJdbc
         + ", help=" + help
         + '}';
   }
-
-  public static HiveSyncConfig copy(HiveSyncConfig cfg) {
-    HiveSyncConfig newConfig = new HiveSyncConfig();
-    newConfig.basePath = cfg.basePath;
-    newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
-    newConfig.databaseName = cfg.databaseName;
-    newConfig.hivePass = cfg.hivePass;
-    newConfig.hiveUser = cfg.hiveUser;
-    newConfig.partitionFields = cfg.partitionFields;
-    newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
-    newConfig.jdbcUrl = cfg.jdbcUrl;
-    newConfig.tableName = cfg.tableName;
-    newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
-    return newConfig;
-  }
 }
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 533c29b..771f45a 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -29,7 +29,9 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -40,6 +42,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.jdbc.HiveDriver;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -86,6 +90,7 @@ public class HoodieHiveClient {
   private FileSystem fs;
   private Connection connection;
   private HoodieTimeline activeTimeline;
+  private HiveConf configuration;
 
   public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
     this.syncConfig = cfg;
@@ -93,8 +98,13 @@ public class HoodieHiveClient {
     this.metaClient = new HoodieTableMetaClient(fs.getConf(), cfg.basePath, true);
     this.tableType = metaClient.getTableType();
 
-    LOG.info("Creating hive connection " + cfg.jdbcUrl);
-    createHiveConnection();
+    this.configuration = configuration;
+    // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should
+    // disable jdbc and depend on metastore client for all hive registrations
+    if (cfg.useJdbc) {
+      LOG.info("Creating hive connection " + cfg.jdbcUrl);
+      createHiveConnection();
+    }
     try {
       this.client = new HiveMetaStoreClient(configuration);
     } catch (MetaException e) {
@@ -269,32 +279,59 @@ public class HoodieHiveClient {
    * Get the table schema
    */
   public Map<String, String> getTableSchema() {
-    if (!doesTableExist()) {
-      throw new IllegalArgumentException(
-          "Failed to get schema for table " + syncConfig.tableName + " does not exist");
-    }
-    Map<String, String> schema = Maps.newHashMap();
-    ResultSet result = null;
-    try {
-      DatabaseMetaData databaseMetaData = connection.getMetaData();
-      result = databaseMetaData
-          .getColumns(null, syncConfig.databaseName, syncConfig.tableName, null);
-      while (result.next()) {
-        String columnName = result.getString(4);
-        String columnType = result.getString(6);
-        if ("DECIMAL".equals(columnType)) {
-          int columnSize = result.getInt("COLUMN_SIZE");
-          int decimalDigits = result.getInt("DECIMAL_DIGITS");
-          columnType += String.format("(%s,%s)", columnSize, decimalDigits);
+    if (syncConfig.useJdbc) {
+      if (!doesTableExist()) {
+        throw new IllegalArgumentException(
+            "Failed to get schema for table " + syncConfig.tableName + " does not exist");
+      }
+      Map<String, String> schema = Maps.newHashMap();
+      ResultSet result = null;
+      try {
+        DatabaseMetaData databaseMetaData = connection.getMetaData();
+        result = databaseMetaData
+            .getColumns(null, syncConfig.databaseName, syncConfig.tableName, null);
+        while (result.next()) {
+          String columnName = result.getString(4);
+          String columnType = result.getString(6);
+          if ("DECIMAL".equals(columnType)) {
+            int columnSize = result.getInt("COLUMN_SIZE");
+            int decimalDigits = result.getInt("DECIMAL_DIGITS");
+            columnType += String.format("(%s,%s)", columnSize, decimalDigits);
+          }
+          schema.put(columnName, columnType);
         }
-        schema.put(columnName, columnType);
+        return schema;
+      } catch (SQLException e) {
+        throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName,
+            e);
+      } finally {
+        closeQuietly(result, null);
       }
+    } else {
+      return getTableSchemaUsingMetastoreClient();
+    }
+  }
+
+  public Map<String, String> getTableSchemaUsingMetastoreClient() {
+    try {
+      // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
+      // get the Schema of the table.
+      final long start = System.currentTimeMillis();
+      Table table = this.client.getTable(syncConfig.databaseName, syncConfig.tableName);
+      Map<String, String> partitionKeysMap = table.getPartitionKeys().stream()
+          .collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase()));
+
+      Map<String, String> columnsMap = table.getSd().getCols().stream()
+          .collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase()));
+
+      Map<String, String> schema = new HashMap<>();
+      schema.putAll(columnsMap);
+      schema.putAll(partitionKeysMap);
+      final long end = System.currentTimeMillis();
+      LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
       return schema;
-    } catch (SQLException e) {
-      throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName,
-          e);
-    } finally {
-      closeQuietly(result, null);
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e);
     }
   }
 
@@ -455,19 +492,71 @@ public class HoodieHiveClient {
    * @param s SQL to execute
    */
   public void updateHiveSQL(String s) {
-    Statement stmt = null;
+    if (syncConfig.useJdbc) {
+      Statement stmt = null;
+      try {
+        stmt = connection.createStatement();
+        LOG.info("Executing SQL " + s);
+        stmt.execute(s);
+      } catch (SQLException e) {
+        throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
+      } finally {
+        closeQuietly(null, stmt);
+      }
+    } else {
+      updateHiveSQLUsingHiveDriver(s);
+    }
+  }
+
+  /**
+   * Execute a update in hive using Hive Driver
+   *
+   * @param sql SQL statement to execute
+   */
+  public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) throws HoodieHiveSyncException {
+    List<CommandProcessorResponse> responses = updateHiveSQLs(Arrays.asList(sql));
+    return responses.get(responses.size() - 1);
+  }
+
+  private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) throws HoodieHiveSyncException {
+    SessionState ss = null;
+    org.apache.hadoop.hive.ql.Driver hiveDriver = null;
+    List<CommandProcessorResponse> responses = new ArrayList<>();
     try {
-      stmt = connection.createStatement();
-      LOG.info("Executing SQL " + s);
-      stmt.execute(s);
-    } catch (SQLException e) {
-      throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
+      final long startTime = System.currentTimeMillis();
+      ss = SessionState.start(configuration);
+      hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
+      final long endTime = System.currentTimeMillis();
+      LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
+      for (String sql : sqls) {
+        final long start = System.currentTimeMillis();
+        responses.add(hiveDriver.run(sql));
+        final long end = System.currentTimeMillis();
+        LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
+      }
+    } catch (Exception e) {
+      throw new HoodieHiveSyncException("Failed in executing SQL", e);
     } finally {
-      closeQuietly(null, stmt);
+      if (ss != null) {
+        try {
+          ss.close();
+        } catch (IOException ie) {
+          LOG.error("Error while closing SessionState", ie);
+        }
+      }
+      if (hiveDriver != null) {
+        try {
+          hiveDriver.close();
+        } catch (Exception e) {
+          LOG.error("Error while closing hiveDriver", e);
+        }
+      }
     }
+    return responses;
   }
 
 
+
   private void createHiveConnection() {
     if (connection == null) {
       try {
@@ -505,6 +594,11 @@ public class HoodieHiveClient {
       if (stmt != null) {
         stmt.close();
       }
+    } catch (SQLException e) {
+      LOG.error("Could not close the statement opened ", e);
+    }
+
+    try {
       if (resultSet != null) {
         resultSet.close();
       }
@@ -544,6 +638,7 @@ public class HoodieHiveClient {
       }
       if (client != null) {
         client.close();
+        client = null;
       }
     } catch (SQLException e) {
       LOG.error("Could not close connection ", e);
@@ -622,4 +717,4 @@ public class HoodieHiveClient {
       return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
     }
   }
-}
+}
\ No newline at end of file
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java b/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java
index 2947d8b..c1ef756 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hudi.common.util.Option;
@@ -39,10 +41,25 @@ import org.apache.parquet.schema.Types;
 import org.joda.time.DateTime;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 @SuppressWarnings("ConstantConditions")
+@RunWith(Parameterized.class)
 public class HiveSyncToolTest {
 
+  // Test sync tool using both jdbc and metastore client
+  private boolean useJdbc;
+
+  public HiveSyncToolTest(Boolean useJdbc) {
+    this.useJdbc = useJdbc;
+  }
+
+  @Parameterized.Parameters(name = "UseJdbc")
+  public static Collection<Boolean[]> data() {
+    return Arrays.asList(new Boolean[][]{{false}, {true}});
+  }
+
   @Before
   public void setUp() throws IOException, InterruptedException, URISyntaxException {
     TestUtil.setUp();
@@ -146,6 +163,7 @@ public class HiveSyncToolTest {
 
   @Test
   public void testBasicSync() throws Exception {
+    TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
     String commitTime = "100";
     TestUtil.createCOWDataset(commitTime, 5);
     HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -168,6 +186,7 @@ public class HiveSyncToolTest {
 
   @Test
   public void testSyncIncremental() throws Exception {
+    TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
     String commitTime1 = "100";
     TestUtil.createCOWDataset(commitTime1, 5);
     HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -211,6 +230,7 @@ public class HiveSyncToolTest {
 
   @Test
   public void testSyncIncrementalWithSchemaEvolution() throws Exception {
+    TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
     String commitTime1 = "100";
     TestUtil.createCOWDataset(commitTime1, 5);
     HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -247,6 +267,7 @@ public class HiveSyncToolTest {
 
   @Test
   public void testSyncMergeOnRead() throws Exception {
+    TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
     String commitTime = "100";
     String deltaCommitTime = "101";
     TestUtil.createMORDataset(commitTime, deltaCommitTime, 5);
@@ -295,6 +316,7 @@ public class HiveSyncToolTest {
   @Test
   public void testSyncMergeOnReadRT()
       throws Exception {
+    TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
     String commitTime = "100";
     String deltaCommitTime = "101";
     String roTablename = TestUtil.hiveSyncConfig.tableName;
@@ -350,6 +372,7 @@ public class HiveSyncToolTest {
   @Test
   public void testMultiPartitionKeySync()
       throws Exception {
+    TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
     String commitTime = "100";
     TestUtil.createCOWDataset(commitTime, 5);