You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/04/03 23:23:11 UTC

[incubator-hudi] branch master updated: [HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6808559  [HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416)
6808559 is described below

commit 6808559b018366b4bc6d47b40dbbe362f48f65d7
Author: Prashant Wason <pw...@uber.com>
AuthorDate: Fri Apr 3 16:23:05 2020 -0700

    [HUDI-717] Fixed usage of HiveDriver for DDL statements. (#1416)
    
    When using HiveDriver mode in HudiHiveClient, Hive 2.x DDL operations like ALTER PARTITION may fail. This is because Hive 2.x doesn't like `db`.`table_name` for operations. In this fix, we set the name of the database in the SessionState create for the Driver.
---
 .../org/apache/hudi/hive/HoodieHiveClient.java     |  4 +-
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 91 +++++++++++++++++++++-
 .../test/java/org/apache/hudi/hive/TestUtil.java   | 16 ++--
 3 files changed, 101 insertions(+), 10 deletions(-)

diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 1bfbe20..55a4968 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -198,7 +198,8 @@ public class HoodieHiveClient {
     for (String partition : partitions) {
       String partitionClause = getPartitionClause(partition);
       Path partitionPath = FSUtils.getPartitionPath(syncConfig.basePath, partition);
-      String fullPartitionPath = partitionPath.toUri().getScheme().equals(StorageSchemes.HDFS.getScheme())
+      String partitionScheme = partitionPath.toUri().getScheme();
+      String fullPartitionPath = StorageSchemes.HDFS.getScheme().equals(partitionScheme)
               ? FSUtils.getDFSFullPartitionPath(fs, partitionPath) : partitionPath.toString();
       String changePartition =
           alterTable + " PARTITION (" + partitionClause + ") SET LOCATION '" + fullPartitionPath + "'";
@@ -505,6 +506,7 @@ public class HoodieHiveClient {
     try {
       final long startTime = System.currentTimeMillis();
       ss = SessionState.start(configuration);
+      ss.setCurrentDatabase(syncConfig.databaseName);
       hiveDriver = new org.apache.hadoop.hive.ql.Driver(configuration);
       final long endTime = System.currentTimeMillis();
       LOG.info(String.format("Time taken to start SessionState and create Driver: %s ms", (endTime - startTime)));
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index f804219..449c7f3 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -168,6 +168,47 @@ public class TestHiveSyncTool {
         hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
     assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime,
         hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
+
+    // Adding of new partitions
+    List<String> newPartition = Arrays.asList("2050/01/01");
+    hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
+    assertEquals("No new partition should be added", 5,
+        hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+    hiveClient.addPartitionsToTable(TestUtil.hiveSyncConfig.tableName, newPartition);
+    assertEquals("New partition should be added", 6,
+        hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+
+    // Update partitions
+    hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName, Arrays.asList());
+    assertEquals("Partition count should remain the same", 6,
+        hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+    hiveClient.updatePartitionsToTable(TestUtil.hiveSyncConfig.tableName,  newPartition);
+    assertEquals("Partition count should remain the same", 6,
+        hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName).size());
+
+    // Alter partitions
+    // Manually change a hive partition location to check if the sync will detect
+    // it and generage a partition update event for it.
+    hiveClient.updateHiveSQL("ALTER TABLE `" + TestUtil.hiveSyncConfig.tableName
+        + "` PARTITION (`datestr`='2050-01-01') SET LOCATION '/some/new/location'");
+
+    hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+    List<Partition> hivePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
+    List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.empty());
+    writtenPartitionsSince.add(newPartition.get(0));
+    List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
+    assertEquals("There should be only one paritition event", 1, partitionEvents.size());
+    assertEquals("The one partition event must of type UPDATE", PartitionEventType.UPDATE,
+        partitionEvents.iterator().next().eventType);
+
+    tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+    tool.syncHoodieTable();
+    // Sync should update the changed partition to correct path
+    List<Partition> tablePartitions = hiveClient.scanTablePartitions(TestUtil.hiveSyncConfig.tableName);
+    assertEquals("The one partition we wrote should be added to hive", 6, tablePartitions.size());
+    assertEquals("The last commit that was sycned should be 100", instantTime,
+        hiveClient.getLastCommitTimeSynced(TestUtil.hiveSyncConfig.tableName).get());
+
   }
 
   @Test
@@ -250,7 +291,7 @@ public class TestHiveSyncTool {
     TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
     String instantTime = "100";
     String deltaCommitTime = "101";
-    TestUtil.createMORTable(instantTime, deltaCommitTime, 5);
+    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
 
     String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
     HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -294,7 +335,7 @@ public class TestHiveSyncTool {
     String instantTime = "100";
     String deltaCommitTime = "101";
     String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
-    TestUtil.createMORTable(instantTime, deltaCommitTime, 5);
+    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
     HoodieHiveClient hiveClientRT =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
 
@@ -363,4 +404,50 @@ public class TestHiveSyncTool {
     assertEquals("The last commit that was sycned should be updated in the TBLPROPERTIES", instantTime,
         hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get());
   }
+
+  @Test
+  public void testReadSchemaForMOR() throws Exception {
+    TestUtil.hiveSyncConfig.useJdbc = this.useJdbc;
+    String commitTime = "100";
+    String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
+    TestUtil.createMORTable(commitTime, "", 5, false);
+    HoodieHiveClient hiveClientRT =
+        new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+
+    assertFalse("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+        + " should not exist initially", hiveClientRT.doesTableExist(snapshotTableName));
+
+    // Lets do the sync
+    HiveSyncTool tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    assertTrue("Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+        + " should exist after sync completes", hiveClientRT.doesTableExist(snapshotTableName));
+
+    // Schema being read from compacted base files
+    assertEquals("Hive Schema should match the table schema + partition field", hiveClientRT.getTableSchema(snapshotTableName).size(),
+        SchemaTestUtil.getSimpleSchema().getFields().size() + 1);
+    assertEquals("Table partitions should match the number of partitions we wrote", 5,
+        hiveClientRT.scanTablePartitions(snapshotTableName).size());
+
+    // Now lets create more partitions and these are the only ones which needs to be synced
+    DateTime dateTime = DateTime.now().plusDays(6);
+    String commitTime2 = "102";
+    String deltaCommitTime2 = "103";
+
+    TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
+    // Lets do the sync
+    tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+    tool.syncHoodieTable();
+    hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
+
+    // Schema being read from the log files
+    assertEquals("Hive Schema should match the evolved table schema + partition field",
+        hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1);
+    // Sync should add the one partition
+    assertEquals("The 1 partition we wrote should be added to hive", 6, hiveClientRT.scanTablePartitions(snapshotTableName).size());
+    assertEquals("The last commit that was sycned should be 103", deltaCommitTime2,
+        hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get());
+  }
+
 }
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
index df5d9ea..3c0f551 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
@@ -107,7 +107,6 @@ public class TestUtil {
 
     hiveSyncConfig = new HiveSyncConfig();
     hiveSyncConfig.jdbcUrl = "jdbc:hive2://127.0.0.1:9999/";
-    hiveSyncConfig.databaseName = "hdrone_test";
     hiveSyncConfig.hiveUser = "";
     hiveSyncConfig.hivePass = "";
     hiveSyncConfig.databaseName = "testdb";
@@ -167,7 +166,8 @@ public class TestUtil {
     createCommitFile(commitMetadata, instantTime);
   }
 
-  static void createMORTable(String instantTime, String deltaCommitTime, int numberOfPartitions)
+  static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
+                             boolean createDeltaCommit)
       throws IOException, InitializationError, URISyntaxException, InterruptedException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -177,17 +177,19 @@ public class TestUtil {
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
     DateTime dateTime = DateTime.now();
-    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, instantTime);
+    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
     createdTablesSet
         .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
     HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
     commitMetadata.getPartitionToWriteStats()
         .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
-    createCompactionCommitFile(compactionMetadata, instantTime);
-    // Write a delta commit
-    HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
-    createDeltaCommitFile(deltaMetadata, deltaCommitTime);
+    createCompactionCommitFile(compactionMetadata, commitTime);
+    if (createDeltaCommit) {
+      // Write a delta commit
+      HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
+      createDeltaCommitFile(deltaMetadata, deltaCommitTime);
+    }
   }
 
   static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom,