You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by si...@apache.org on 2021/01/06 12:49:57 UTC
[hudi] branch master updated: [HUDI-1383] Fixing sorting of
partition vals for hive sync computation (#2402)
This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new da2919a [HUDI-1383] Fixing sorting of partition vals for hive sync computation (#2402)
da2919a is described below
commit da2919a75f564be6c3d731a2c503959e416ebe71
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Wed Jan 6 07:49:44 2021 -0500
[HUDI-1383] Fixing sorting of partition vals for hive sync computation (#2402)
---
.../org/apache/hudi/hive/HoodieHiveClient.java | 2 -
.../org/apache/hudi/hive/TestHiveSyncTool.java | 66 ++++++++++++++++++----
.../apache/hudi/hive/testutils/HiveTestUtil.java | 20 +++++++
3 files changed, 75 insertions(+), 13 deletions(-)
diff --git a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 5c0c128..b621167 100644
--- a/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-sync/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -207,7 +207,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
Map<String, String> paths = new HashMap<>();
for (Partition tablePartition : tablePartitions) {
List<String> hivePartitionValues = tablePartition.getValues();
- Collections.sort(hivePartitionValues);
String fullTablePartitionPath =
Path.getPathWithoutSchemeAndAuthority(new Path(tablePartition.getSd().getLocation())).toUri().getPath();
paths.put(String.join(", ", hivePartitionValues), fullTablePartitionPath);
@@ -219,7 +218,6 @@ public class HoodieHiveClient extends AbstractSyncHoodieClient {
String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath();
// Check if the partition values or if hdfs path is the same
List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition);
- Collections.sort(storagePartitionValues);
if (!storagePartitionValues.isEmpty()) {
String storageValue = String.join(", ", storagePartitionValues);
if (!paths.containsKey(storageValue)) {
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 1d8cbd8..8a1ea4f 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -21,10 +21,10 @@ package org.apache.hudi.hive;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
-import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hudi.hive.testutils.HiveTestUtil;
import org.apache.hudi.hive.util.HiveSchemaUtil;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent;
+import org.apache.hudi.sync.common.AbstractSyncHoodieClient.PartitionEvent.PartitionEventType;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.parquet.schema.MessageType;
@@ -56,7 +56,7 @@ public class TestHiveSyncTool {
}
private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
- return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } });
+ return Arrays.asList(new Object[][] {{true, true}, {true, false}, {false, true}, {false, false}});
}
@BeforeEach
@@ -347,7 +347,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
- "Hive Schema should match the table schema + partition field");
+ "Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -377,7 +377,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClient.getTableSchema(roTableName).size(),
SchemaTestUtil.getEvolvedSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
- "Hive Schema should match the evolved table schema + partition field");
+ "Hive Schema should match the evolved table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClient.getTableSchema(roTableName).size(),
@@ -418,7 +418,7 @@ public class TestHiveSyncTool {
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
SchemaTestUtil.getSimpleSchema().getFields().size() + HiveTestUtil.hiveSyncConfig.partitionFields.size()
+ HoodieRecord.HOODIE_META_COLUMNS.size(),
- "Hive Schema should match the table schema + partition field");
+ "Hive Schema should match the table schema + partition field");
} else {
// The data generated and schema in the data file do not have metadata columns, so we need a separate check.
assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
@@ -489,6 +489,50 @@ public class TestHiveSyncTool {
"Table partitions should match the number of partitions we wrote");
assertEquals(instantTime, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
"The last commit that was sycned should be updated in the TBLPROPERTIES");
+
+ // HoodieHiveClient had a bug where partition vals were sorted
+ // and stored as keys in a map. The following tests this particular case.
+ // Now lets create partition "2010/01/02" and followed by "2010/02/01".
+ String commitTime2 = "101";
+ HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
+
+ hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ List<String> writtenPartitionsSince = hiveClient.getPartitionsWrittenToSince(Option.of(instantTime));
+ assertEquals(1, writtenPartitionsSince.size(), "We should have one partition written after 100 commit");
+ List<Partition> hivePartitions = hiveClient.scanTablePartitions(hiveSyncConfig.tableName);
+ List<PartitionEvent> partitionEvents = hiveClient.getPartitionEvents(hivePartitions, writtenPartitionsSince);
+ assertEquals(1, partitionEvents.size(), "There should be only one paritition event");
+ assertEquals(PartitionEventType.ADD, partitionEvents.iterator().next().eventType, "The one partition event must of type ADD");
+
+ tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ // Sync should add the one partition
+ assertEquals(6, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+ "Table partitions should match the number of partitions we wrote");
+ assertEquals(commitTime2, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+ "The last commit that was sycned should be 101");
+
+ // create partition "2010/02/01" and ensure sync works
+ String commitTime3 = "102";
+ HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3);
+ HiveTestUtil.getCreatedTablesSet().add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
+
+ hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+
+ tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
+ tool.syncHoodieTable();
+
+ assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
+ "Table " + hiveSyncConfig.tableName + " should exist after sync completes");
+ assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
+ hiveClient.getDataSchema().getColumns().size() + 3,
+ "Hive Schema should match the table schema + partition fields");
+ assertEquals(7, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
+ "Table partitions should match the number of partitions we wrote");
+ assertEquals(commitTime3, hiveClient.getLastCommitTimeSynced(hiveSyncConfig.tableName).get(),
+ "The last commit that was sycned should be updated in the TBLPROPERTIES");
+ assertEquals(1, hiveClient.getPartitionsWrittenToSince(Option.of(commitTime2)).size());
}
@ParameterizedTest
@@ -507,17 +551,17 @@ public class TestHiveSyncTool {
HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(hiveSyncConfig.tableName),
- "Table " + hiveSyncConfig.tableName + " should not exist initially");
+ "Table " + hiveSyncConfig.tableName + " should not exist initially");
// Lets do the sync
HiveSyncTool tool = new HiveSyncTool(hiveSyncConfig, HiveTestUtil.getHiveConf(), HiveTestUtil.fileSystem);
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(hiveSyncConfig.tableName),
- "Table " + hiveSyncConfig.tableName + " should exist after sync completes");
+ "Table " + hiveSyncConfig.tableName + " should exist after sync completes");
assertEquals(hiveClient.getTableSchema(hiveSyncConfig.tableName).size(),
- hiveClient.getDataSchema().getColumns().size(),
- "Hive Schema should match the table schema,ignoring the partition fields");
+ hiveClient.getDataSchema().getColumns().size(),
+ "Hive Schema should match the table schema,ignoring the partition fields");
assertEquals(0, hiveClient.scanTablePartitions(hiveSyncConfig.tableName).size(),
- "Table should not have partitions because of the NonPartitionedExtractor");
+ "Table should not have partitions because of the NonPartitionedExtractor");
}
@ParameterizedTest
diff --git a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d0d1b66..0909053 100644
--- a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -210,6 +210,14 @@ public class HiveTestUtil {
createCommitFile(commitMetadata, instantTime);
}
+ public static void addCOWPartition(String partitionPath, boolean isParquetSchemaSimple,
+ boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
+ HoodieCommitMetadata commitMetadata =
+ createPartition(partitionPath, isParquetSchemaSimple, useSchemaFromCommitMetadata, instantTime);
+ createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
+ createCommitFile(commitMetadata, instantTime);
+ }
+
public static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
@@ -266,6 +274,18 @@ public class HiveTestUtil {
return commitMetadata;
}
+ private static HoodieCommitMetadata createPartition(String partitionPath, boolean isParquetSchemaSimple,
+ boolean useSchemaFromCommitMetadata, String instantTime) throws IOException, URISyntaxException {
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ Path partPath = new Path(hiveSyncConfig.basePath + "/" + partitionPath);
+ fileSystem.makeQualified(partPath);
+ fileSystem.mkdirs(partPath);
+ List<HoodieWriteStat> writeStats = createTestData(partPath, isParquetSchemaSimple, instantTime);
+ writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
+ addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata);
+ return commitMetadata;
+ }
+
private static List<HoodieWriteStat> createTestData(Path partPath, boolean isParquetSchemaSimple, String instantTime)
throws IOException, URISyntaxException {
List<HoodieWriteStat> writeStats = new ArrayList<>();