You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/01/06 12:49:57 UTC

[hudi] branch master updated: [HUDI-1383] Fixing sorting of partition vals for hive sync computation (#2402)

This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new da2919a  [HUDI-1383] Fixing sorting of partition vals for hive sync computation (#2402)
da2919a is described below

commit da2919a75f564be6c3d731a2c503959e416ebe71
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Wed Jan 6 07:49:44 2021 -0500

    [HUDI-1383] Fixing sorting of partition vals for hive sync computation (#2402)
---
 .../org/apache/hudi/hive/HoodieHiveClient.java     |  2 -
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 66 ++++++++++++++++++----
 .../apache/hudi/hive/testutils/HiveTestUtil.java   | 20 +++++++
 3 files changed, 75 insertions(+), 13 deletions(-)

diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 5c0c128..b621167 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -207,7 +207,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
     Map<String, String> paths = new HashMap<>();
     for (Partition tablePartition : tablePartitions) {
       List<String> hivePartitionValues = tablePartition.getValues();
-      Collections.sort(hivePartitionValues);
       String fullTablePartitionPath =
           Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath();
       paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
@@ -219,7 +218,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
       String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
       // Check if the partition values or if hdfs path is the same
       List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
-      Collections.sort(storagePartitionValues);
       if (!storagePartitionValues.isEmpty()) {
         String storageValue = String.join(", ", storagePartitionValues);
         if (!paths.containsKey(storageValue)) {
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 1d8cbd8..8a1ea4f 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -21,10 +21,10 @@ package org.apache.hudi.hive;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
 import org.apache.hudi.hive.testutils.HiveTestUtil;
 import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
 
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.parquet.schema.MessageType;
@@ -56,7 +56,7 @@ public class TestHiveSyncTool {
   }
 
   private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
-    return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } });
+    return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}});
   }
 
   @BeforeEach
@@ -347,7 +347,7 @@ public class TestHiveSyncTool {
       assertEquals(hiveClient.getTableSchema(roTableName).size(),
           SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
               + HoodieRecord.HOODIE_META_COLUMNS.size(),
-                   "Hive Schema should match the table schema + partition field");
+          "Hive Schema should match the table schema + partition field");
     } else {
       // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
       assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -377,7 +377,7 @@ public class TestHiveSyncTool {
       assertEquals(hiveClient.getTableSchema(roTableName).size(),
           SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
               + HoodieRecord.HOODIE_META_COLUMNS.size(),
-                   "Hive Schema should match the evolved table schema + partition field");
+          "Hive Schema should match the evolved table schema + partition field");
     } else {
       // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
       assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -418,7 +418,7 @@ public class TestHiveSyncTool {
       assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
           SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
               + HoodieRecord.HOODIE_META_COLUMNS.size(),
-                   "Hive Schema should match the table schema + partition field");
+          "Hive Schema should match the table schema + partition field");
     } else {
       // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
       assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
@@ -489,6 +489,50 @@ public class TestHiveSyncTool {
         "Table partitions should match the number of partitions we wrote");
     assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
         "The last commit that was sycned should be updated in the TBLPROPERTIES");
+
+    // HoodieHiveClient had a bug where partition vals were sorted
+    // and stored as keys in a map. The following tests this particular case.
+    // Now lets create partition "2010/01/02" and followed by "2010/02/01".
+    String commitTime2 = "101";
+    HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
+
+    hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
+    assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
+    List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
+    List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
+    assertEquals(1, partitionEvents.size(), "There should be only one paritition event");
+    assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
+
+    tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    // Sync should add the one partition
+    assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+        "Table partitions should match the number of partitions we wrote");
+    assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+        "The last commit that was sycned should be 101");
+
+    // create partition "2010/02/01" and ensure sync works
+    String commitTime3 = "102";
+    HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3);
+    HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
+
+    hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
+    tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+    tool.syncHoodieTable();
+
+    assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
+        "Table " + hiveSyncConfig.tableName + " should exist after sync completes");
+    assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
+        hiveClient.getDataSchema().getColumns().size() + 3,
+        "Hive Schema should match the table schema + partition fields");
+    assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+        "Table partitions should match the number of partitions we wrote");
+    assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+        "The last commit that was sycned should be updated in the TBLPROPERTIES");
+    assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
   }
 
   @ParameterizedTest
@@ -507,17 +551,17 @@ public class TestHiveSyncTool {
 
     HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
     assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
-            "Table " + hiveSyncConfig.tableName + " should not exist initially");
+        "Table " + hiveSyncConfig.tableName + " should not exist initially");
     // Lets do the sync
     HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
     tool.syncHoodieTable();
     assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
-            "Table " + hiveSyncConfig.tableName + " should exist after sync completes");
+        "Table " + hiveSyncConfig.tableName + " should exist after sync completes");
     assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
-            hiveClient.getDataSchema().getColumns().size(),
-            "Hive Schema should match the table schema,ignoring the partition fields");
+        hiveClient.getDataSchema().getColumns().size(),
+        "Hive Schema should match the table schema,ignoring the partition fields");
     assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
-            "Table should not have partitions because of the NonPartitionedExtractor");
+        "Table should not have partitions because of the NonPartitionedExtractor");
   }
 
   @ParameterizedTest
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d0d1b66..0909053 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -210,6 +210,14 @@ public class HiveTestUtil {
     createCommitFile(commitMetadata, instantTime);
   }
 
+  public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
+      boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
+    HoodieCommitMetadata commitMetadata =
+        createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
+    createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
+    createCommitFile(commitMetadata, instantTime);
+  }
+
   public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
       boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
       throws IOException, URISyntaxException, InterruptedException {
@@ -266,6 +274,18 @@ public class HiveTestUtil {
     return commitMetadata;
   }
 
+  private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple,
+      boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
+    fileSystem.makeQualified(partPath);
+    fileSystem.mkdirs(partPath);
+    List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime);
+    writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
+    addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata);
+    return commitMetadata;
+  }
+
   private static List<HoodieWriteStat> createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime)
       throws IOException, URISyntaxException {
     List<HoodieWriteStat> writeStats = new ArrayList<>();