You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/04/03 23:23:11 UTC
[incubator-hudi] branch master updated: [HUDI-717] Fixed usage of
HiveDriver for DDL statements. (#1416)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6808559 [HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416)
6808559 is described below
commit 6808559b018366b4bc6d47b40dbbe362f48f65d7
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Apr 3 16:23:05 2020 -0700
[HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416)
When using HiveDriver mode in HudiHiveClient, Hive 2.x DDL operations like ALTER PARTITION may fail. This is because Hive 2.x doesn't like `db`.`table_name` for operations. In this fix, we set the name of the database in the SessionState create for the Driver.
---
.../org/apache/hudi/hive/HoodieHiveClient.java | 4 +-
.../org/apache/hudi/hive/TestHiveSyncTool.java | 91 +++++++++++++++++++++-
.../test/java/org/apache/hudi/hive/TestUtil.java | 16 ++--
3 files changed, 101 insertions(+), 10 deletions(-)
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 1bfbe20..55a4968 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -198,7 +198,8 @@ public class HoodieHiveClient {
for (String partition : partitions) {
String partitionClause = getPartitionClause(partition);
Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
- String fullPartitionPath = partitionPath.toUri().getScheme().equals(StorageSchemes.HDFS.getScheme())
+ String partitionScheme = partitionPath.toUri().getScheme();
+ String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
String changePartition =
alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'";
@@ -505,6 +506,7 @@ public class HoodieHiveClient {
try {
final long startTime = System.currentTimeMillis();
ss = SessionState.start(configuration);
+ ss.setCurrentDatabase(syncConfig.databaseName);
hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
final long endTime = System.currentTimeMillis();
LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index f804219..449c7f3 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -168,6 +168,47 @@ public class TestHiveSyncTool {
hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime,
hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
+
+ // Adding of new partitions
+ List<String> newPartition = Arrays.asList("2050/01/01");
+ hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
+ assertEquals("No new partition should be added", 5,
+ hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+ hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
+ assertEquals("New partition should be added", 6,
+ hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+
+ // Update partitions
+ hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
+ assertEquals("Partition count should remain the same", 6,
+ hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+ hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
+ assertEquals("Partition count should remain the same", 6,
+ hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+
+ // Alter partitions
+ // Manually change a hive partition location to check if the sync will detect
+ // it and generage a partition update event for it.
+ hiveClient.updateHiveSQL("ALTER TABLE `" + TestUtil.hiveSyncConfig.tableName
+ + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");
+
+ hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+ List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
+ List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
+ writtenPartitionsSince.add(newPartition.get(0));
+ List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
+ assertEquals("There should be only one paritition event", 1, partitionEvents.size());
+ assertEquals("The one partition event must of type UPDATE", PartitionEventType.UPDATE,
+ partitionEvents.iterator().next().eventType);
+
+ tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+ tool.syncHoodieTable();
+ // Sync should update the changed partition to correct path
+ List<Partition> tablePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
+ assertEquals("The one partition we wrote should be added to hive", 6, tablePartitions.size());
+ assertEquals("The last commit that was sycned should be 100", instantTime,
+ hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
+
}
@Test
@@ -250,7 +291,7 @@ public class TestHiveSyncTool {
TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
String instantTime = "100";
String deltaCommitTime = "101";
- TestUtil.createMORTable(instantTime, deltaCommitTime, 5);
+ TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -294,7 +335,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
String deltaCommitTime = "101";
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
- TestUtil.createMORTable(instantTime, deltaCommitTime, 5);
+ TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -363,4 +404,50 @@ public class TestHiveSyncTool {
assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime,
hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
}
+
+ @Test
+ public void testReadSchemaForMOR() throws Exception {
+ TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
+ String commitTime = "100";
+ String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+ TestUtil.createMORTable(commitTime, "", 5, false);
+ HoodieHiveClient hiveClientRT =
+ new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+
+ assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ + " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName));
+
+ // Lets do the sync
+ HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ + " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName));
+
+ // Schema being read from compacted base files
+ assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(),
+ SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
+ assertEquals("Table partitions should match the number of partitions we wrote", 5,
+ hiveClientRT.scanTablePartitions(snapshotTableName).size());
+
+ // Now lets create more partitions and these are the only ones which needs to be synced
+ DateTime dateTime = DateTime.now().plusDays(6);
+ String commitTime2 = "102";
+ String deltaCommitTime2 = "103";
+
+ TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
+ // Lets do the sync
+ tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+ tool.syncHoodieTable();
+ hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+
+ // Schema being read from the log files
+ assertEquals("Hive Schema should match the evolved table schema + partition field",
+ hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
+ // Sync should add the one partition
+ assertEquals("The 1 partition we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size());
+ assertEquals("The last commit that was sycned should be 103", deltaCommitTime2,
+ hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
+ }
+
}
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
index df5d9ea..3c0f551 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
@@ -107,7 +107,6 @@ public class TestUtil {
hiveSyncConfig = new HiveSyncConfig();
hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
- hiveSyncConfig.databaseName = "hdrone_test";
hiveSyncConfig.hiveUser = "";
hiveSyncConfig.hivePass = "";
hiveSyncConfig.databaseName = "testdb";
@@ -167,7 +166,8 @@ public class TestUtil {
createCommitFile(commitMetadata, instantTime);
}
- static void createMORTable(String instantTime, String deltaCommitTime, int numberOfPartitions)
+ static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
+ boolean createDeltaCommit)
throws IOException, InitializationError, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -177,17 +177,19 @@ public class TestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
- HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, instantTime);
+ HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
- createCompactionCommitFile(compactionMetadata, instantTime);
- // Write a delta commit
- HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
- createDeltaCommitFile(deltaMetadata, deltaCommitTime);
+ createCompactionCommitFile(compactionMetadata, commitTime);
+ if (createDeltaCommit) {
+ // Write a delta commit
+ HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
+ createDeltaCommitFile(deltaMetadata, deltaCommitTime);
+ }
}
static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom,