You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/09/21 15:04:40 UTC
[incubator-hudi] branch master updated: HUDI-180 : Adding support
for hive registration using metastore along with JDBC
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1104f95 HUDI-180 : Adding support for hive registration using metastore along with JDBC
1104f95 is described below
commit 1104f9526fcd05270ed5cd084228f023f1a01c46
Author: Nishith Agarwal <na...@uber.com>
AuthorDate: Fri Aug 30 12:30:56 2019 -0700
HUDI-180 : Adding support for hive registration using metastore along with JDBC
---
.../java/org/apache/hudi/hive/HiveSyncConfig.java | 40 ++---
.../org/apache/hudi/hive/HoodieHiveClient.java | 161 ++++++++++++++++-----
.../org/apache/hudi/hive/HiveSyncToolTest.java | 23 +++
3 files changed, 173 insertions(+), 51 deletions(-)
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
index 8296163..2fc3a2e 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HiveSyncConfig.java
@@ -48,11 +48,10 @@ public class HiveSyncConfig implements Serializable {
"--base-path"}, description = "Basepath of hoodie dataset to sync", required = true)
public String basePath;
- @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by",
- required = false)
+ @Parameter(names = "--partitioned-by", description = "Fields in the schema partitioned by")
public List<String> partitionFields = new ArrayList<>();
- @Parameter(names = "-partition-value-extractor", description = "Class which implements "
+ @Parameter(names = "--partition-value-extractor", description = "Class which implements "
+ "PartitionValueExtractor "
+ "to extract the partition "
+ "values from HDFS path")
@@ -74,9 +73,27 @@ public class HiveSyncConfig implements Serializable {
+ "org.apache.hudi input format.")
public Boolean usePreApacheInputFormat = false;
+ @Parameter(names = {"--use-jdbc"}, description = "Hive jdbc connect url")
+ public Boolean useJdbc = true;
+
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
+ public static HiveSyncConfig copy(HiveSyncConfig cfg) {
+ HiveSyncConfig newConfig = new HiveSyncConfig();
+ newConfig.basePath = cfg.basePath;
+ newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
+ newConfig.databaseName = cfg.databaseName;
+ newConfig.hivePass = cfg.hivePass;
+ newConfig.hiveUser = cfg.hiveUser;
+ newConfig.partitionFields = cfg.partitionFields;
+ newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
+ newConfig.jdbcUrl = cfg.jdbcUrl;
+ newConfig.tableName = cfg.tableName;
+ newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
+ return newConfig;
+ }
+
@Override
public String toString() {
return "HiveSyncConfig{"
@@ -89,22 +106,9 @@ public class HiveSyncConfig implements Serializable {
+ ", partitionFields=" + partitionFields
+ ", partitionValueExtractorClass='" + partitionValueExtractorClass + '\''
+ ", assumeDatePartitioning=" + assumeDatePartitioning
+ + ", usePreApacheInputFormat=" + usePreApacheInputFormat
+ + ", useJdbc=" + useJdbc
+ ", help=" + help
+ '}';
}
-
- public static HiveSyncConfig copy(HiveSyncConfig cfg) {
- HiveSyncConfig newConfig = new HiveSyncConfig();
- newConfig.basePath = cfg.basePath;
- newConfig.assumeDatePartitioning = cfg.assumeDatePartitioning;
- newConfig.databaseName = cfg.databaseName;
- newConfig.hivePass = cfg.hivePass;
- newConfig.hiveUser = cfg.hiveUser;
- newConfig.partitionFields = cfg.partitionFields;
- newConfig.partitionValueExtractorClass = cfg.partitionValueExtractorClass;
- newConfig.jdbcUrl = cfg.jdbcUrl;
- newConfig.tableName = cfg.tableName;
- newConfig.usePreApacheInputFormat = cfg.usePreApacheInputFormat;
- return newConfig;
- }
}
diff --git a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 533c29b..771f45a 100644
--- a/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -29,7 +29,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -40,6 +42,8 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.jdbc.HiveDriver;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -86,6 +90,7 @@ public class HoodieHiveClient {
private FileSystem fs;
private Connection connection;
private HoodieTimeline activeTimeline;
+ private HiveConf configuration;
public HoodieHiveClient(HiveSyncConfig cfg, HiveConf configuration, FileSystem fs) {
this.syncConfig = cfg;
@@ -93,8 +98,13 @@ public class HoodieHiveClient {
this.metaClient = new HoodieTableMetaClient(fs.getConf(), cfg.basePath, true);
this.tableType = metaClient.getTableType();
- LOG.info("Creating hive connection " + cfg.jdbcUrl);
- createHiveConnection();
+ this.configuration = configuration;
+ // Support both JDBC and metastore based implementations for backwards compatiblity. Future users should
+ // disable jdbc and depend on metastore client for all hive registrations
+ if (cfg.useJdbc) {
+ LOG.info("Creating hive connection " + cfg.jdbcUrl);
+ createHiveConnection();
+ }
try {
this.client = new HiveMetaStoreClient(configuration);
} catch (MetaException e) {
@@ -269,32 +279,59 @@ public class HoodieHiveClient {
* Get the table schema
*/
public Map<String, String> getTableSchema() {
- if (!doesTableExist()) {
- throw new IllegalArgumentException(
- "Failed to get schema for table " + syncConfig.tableName + " does not exist");
- }
- Map<String, String> schema = Maps.newHashMap();
- ResultSet result = null;
- try {
- DatabaseMetaData databaseMetaData = connection.getMetaData();
- result = databaseMetaData
- .getColumns(null, syncConfig.databaseName, syncConfig.tableName, null);
- while (result.next()) {
- String columnName = result.getString(4);
- String columnType = result.getString(6);
- if ("DECIMAL".equals(columnType)) {
- int columnSize = result.getInt("COLUMN_SIZE");
- int decimalDigits = result.getInt("DECIMAL_DIGITS");
- columnType += String.format("(%s,%s)", columnSize, decimalDigits);
+ if (syncConfig.useJdbc) {
+ if (!doesTableExist()) {
+ throw new IllegalArgumentException(
+ "Failed to get schema for table " + syncConfig.tableName + " does not exist");
+ }
+ Map<String, String> schema = Maps.newHashMap();
+ ResultSet result = null;
+ try {
+ DatabaseMetaData databaseMetaData = connection.getMetaData();
+ result = databaseMetaData
+ .getColumns(null, syncConfig.databaseName, syncConfig.tableName, null);
+ while (result.next()) {
+ String columnName = result.getString(4);
+ String columnType = result.getString(6);
+ if ("DECIMAL".equals(columnType)) {
+ int columnSize = result.getInt("COLUMN_SIZE");
+ int decimalDigits = result.getInt("DECIMAL_DIGITS");
+ columnType += String.format("(%s,%s)", columnSize, decimalDigits);
+ }
+ schema.put(columnName, columnType);
}
- schema.put(columnName, columnType);
+ return schema;
+ } catch (SQLException e) {
+ throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName,
+ e);
+ } finally {
+ closeQuietly(result, null);
}
+ } else {
+ return getTableSchemaUsingMetastoreClient();
+ }
+ }
+
+ public Map<String, String> getTableSchemaUsingMetastoreClient() {
+ try {
+ // HiveMetastoreClient returns partition keys separate from Columns, hence get both and merge to
+ // get the Schema of the table.
+ final long start = System.currentTimeMillis();
+ Table table = this.client.getTable(syncConfig.databaseName, syncConfig.tableName);
+ Map<String, String> partitionKeysMap = table.getPartitionKeys().stream()
+ .collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase()));
+
+ Map<String, String> columnsMap = table.getSd().getCols().stream()
+ .collect(Collectors.toMap(f -> f.getName(), f -> f.getType().toUpperCase()));
+
+ Map<String, String> schema = new HashMap<>();
+ schema.putAll(columnsMap);
+ schema.putAll(partitionKeysMap);
+ final long end = System.currentTimeMillis();
+ LOG.info(String.format("Time taken to getTableSchema: %s ms", (end - start)));
return schema;
- } catch (SQLException e) {
- throw new HoodieHiveSyncException("Failed to get table schema for " + syncConfig.tableName,
- e);
- } finally {
- closeQuietly(result, null);
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed to get table schema for : " + syncConfig.tableName, e);
}
}
@@ -455,19 +492,71 @@ public class HoodieHiveClient {
* @param s SQL to execute
*/
public void updateHiveSQL(String s) {
- Statement stmt = null;
+ if (syncConfig.useJdbc) {
+ Statement stmt = null;
+ try {
+ stmt = connection.createStatement();
+ LOG.info("Executing SQL " + s);
+ stmt.execute(s);
+ } catch (SQLException e) {
+ throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
+ } finally {
+ closeQuietly(null, stmt);
+ }
+ } else {
+ updateHiveSQLUsingHiveDriver(s);
+ }
+ }
+
+ /**
+ * Execute a update in hive using Hive Driver
+ *
+ * @param sql SQL statement to execute
+ */
+ public CommandProcessorResponse updateHiveSQLUsingHiveDriver(String sql) throws HoodieHiveSyncException {
+ List<CommandProcessorResponse> responses = updateHiveSQLs(Arrays.asList(sql));
+ return responses.get(responses.size() - 1);
+ }
+
+ private List<CommandProcessorResponse> updateHiveSQLs(List<String> sqls) throws HoodieHiveSyncException {
+ SessionState ss = null;
+ org.apache.hadoop.hive.ql.Driver hiveDriver = null;
+ List<CommandProcessorResponse> responses = new ArrayList<>();
try {
- stmt = connection.createStatement();
- LOG.info("Executing SQL " + s);
- stmt.execute(s);
- } catch (SQLException e) {
- throw new HoodieHiveSyncException("Failed in executing SQL " + s, e);
+ final long startTime = System.currentTimeMillis();
+ ss = SessionState.start(configuration);
+ hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
+ final long endTime = System.currentTimeMillis();
+ LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
+ for (String sql : sqls) {
+ final long start = System.currentTimeMillis();
+ responses.add(hiveDriver.run(sql));
+ final long end = System.currentTimeMillis();
+ LOG.info(String.format("Time taken to execute [%s]: %s ms", sql, (end - start)));
+ }
+ } catch (Exception e) {
+ throw new HoodieHiveSyncException("Failed in executing SQL", e);
} finally {
- closeQuietly(null, stmt);
+ if (ss != null) {
+ try {
+ ss.close();
+ } catch (IOException ie) {
+ LOG.error("Error while closing SessionState", ie);
+ }
+ }
+ if (hiveDriver != null) {
+ try {
+ hiveDriver.close();
+ } catch (Exception e) {
+ LOG.error("Error while closing hiveDriver", e);
+ }
+ }
}
+ return responses;
}
+
private void createHiveConnection() {
if (connection == null) {
try {
@@ -505,6 +594,11 @@ public class HoodieHiveClient {
if (stmt != null) {
stmt.close();
}
+ } catch (SQLException e) {
+ LOG.error("Could not close the statement opened ", e);
+ }
+
+ try {
if (resultSet != null) {
resultSet.close();
}
@@ -544,6 +638,7 @@ public class HoodieHiveClient {
}
if (client != null) {
client.close();
+ client = null;
}
} catch (SQLException e) {
LOG.error("Could not close connection ", e);
@@ -622,4 +717,4 @@ public class HoodieHiveClient {
return new PartitionEvent(PartitionEventType.UPDATE, storagePartition);
}
}
-}
+}
\ No newline at end of file
diff --git a/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java b/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java
index 2947d8b..c1ef756 100644
--- a/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java
+++ b/hudi-hive/src/test/java/org/apache/hudi/hive/HiveSyncToolTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hudi.common.util.Option;
@@ -39,10 +41,25 @@ import org.apache.parquet.schema.Types;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
@SuppressWarnings("ConstantConditions")
+@RunWith(Parameterized.class)
public class HiveSyncToolTest {
+ // Test sync tool using both jdbc and metastore client
+ private boolean useJdbc;
+
+ public HiveSyncToolTest(Boolean useJdbc) {
+ this.useJdbc = useJdbc;
+ }
+
+ @Parameterized.Parameters(name = "UseJdbc")
+ public static Collection<Boolean[]> data() {
+ return Arrays.asList(new Boolean[][]{{false}, {true}});
+ }
+
@Before
public void setUp() throws IOException, InterruptedException, URISyntaxException {
TestUtil.setUp();
@@ -146,6 +163,7 @@ public class HiveSyncToolTest {
@Test
public void testBasicSync() throws Exception {
+ TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
TestUtil.createCOWDataset(commitTime, 5);
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -168,6 +186,7 @@ public class HiveSyncToolTest {
@Test
public void testSyncIncremental() throws Exception {
+ TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime1 = "100";
TestUtil.createCOWDataset(commitTime1, 5);
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -211,6 +230,7 @@ public class HiveSyncToolTest {
@Test
public void testSyncIncrementalWithSchemaEvolution() throws Exception {
+ TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime1 = "100";
TestUtil.createCOWDataset(commitTime1, 5);
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig,
@@ -247,6 +267,7 @@ public class HiveSyncToolTest {
@Test
public void testSyncMergeOnRead() throws Exception {
+ TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
String deltaCommitTime = "101";
TestUtil.createMORDataset(commitTime, deltaCommitTime, 5);
@@ -295,6 +316,7 @@ public class HiveSyncToolTest {
@Test
public void testSyncMergeOnReadRT()
throws Exception {
+ TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
String deltaCommitTime = "101";
String roTablename = TestUtil.hiveSyncConfig.tableName;
@@ -350,6 +372,7 @@ public class HiveSyncToolTest {
@Test
public void testMultiPartitionKeySync()
throws Exception {
+ TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String commitTime = "100";
TestUtil.createCOWDataset(commitTime, 5);