You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/05/07 23:34:34 UTC
[incubator-hudi] branch master updated: [HUDI-838] Support schema
from HoodieCommitMetadata for HiveSync (#1559)
This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d54b4b8 [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync (#1559)
d54b4b8 is described below
commit d54b4b8a525868ea6d15e2e2cc6ffccc62d5c43c
Author: Udit Mehrotra <um...@illinois.edu>
AuthorDate: Thu May 7 16:33:09 2020 -0700
[HUDI-838] Support schema from HoodieCommitMetadata for HiveSync (#1559)
Co-authored-by: Mehrotra <ud...@amazon.com>
---
.../java/org/apache/hudi/table/HoodieTable.java | 2 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 10 ++
.../hudi/common/table/TableSchemaResolver.java | 64 ++++++++++--
.../org/apache/hudi/hive/HoodieHiveClient.java | 7 +-
.../org/apache/hudi/hive/TestHiveSyncTool.java | 110 +++++++++++++++------
.../test/java/org/apache/hudi/hive/TestUtil.java | 63 +++++++++---
6 files changed, 199 insertions(+), 57 deletions(-)
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 9904411..62509e4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -535,7 +535,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
try {
TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient());
writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
- tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
+ tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields());
isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
} catch (Exception e) {
throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index c3a9d96..d56b7d9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -171,6 +171,16 @@ public class HoodieAvroUtils {
return mergedSchema;
}
+ public static Schema removeMetadataFields(Schema schema) {
+ List<Schema.Field> filteredFields = schema.getFields()
+ .stream()
+ .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name()))
+ .collect(Collectors.toList());
+ Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
+ filteredSchema.setFields(filteredFields);
+ return filteredSchema;
+ }
+
public static String addMetadataColumnTypes(String hiveColumnTypes) {
return "string,string,string,string,string," + hiveColumnTypes;
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 0bab862..129f85f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -25,6 +25,7 @@ import org.apache.avro.Schema.Field;
import org.apache.avro.SchemaCompatibility;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -36,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.InvalidTableException;
@@ -66,7 +68,7 @@ public class TableSchemaResolver {
* @return Parquet schema for this table
* @throws Exception
*/
- public MessageType getDataSchema() throws Exception {
+ private MessageType getTableParquetSchemaFromDataFile() throws Exception {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
try {
@@ -139,29 +141,66 @@ public class TableSchemaResolver {
}
}
+ private Schema getTableAvroSchemaFromDataFile() throws Exception {
+ return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
+ }
+
/**
- * Gets the schema for a hoodie table in Avro format.
+ * Gets full schema (user + metadata) for a hoodie table in Avro format.
*
* @return Avro schema for this table
* @throws Exception
*/
- public Schema getTableSchema() throws Exception {
- return convertParquetSchemaToAvro(getDataSchema());
+ public Schema getTableAvroSchema() throws Exception {
+ Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true);
+ return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile();
+ }
+
+ /**
+ * Gets full schema (user + metadata) for a hoodie table in Parquet format.
+ *
+ * @return Parquet schema for the table
+ * @throws Exception
+ */
+ public MessageType getTableParquetSchema() throws Exception {
+ Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true);
+ return schemaFromCommitMetadata.isPresent() ? convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) :
+ getTableParquetSchemaFromDataFile();
+ }
+
+ /**
+ * Gets users data schema for a hoodie table in Avro format.
+ *
+ * @return Avro user data schema
+ * @throws Exception
+ */
+ public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
+ Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(false);
+ return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() :
+ HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
}
/**
* Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit.
*
* @return Avro schema for this table
- * @throws Exception
*/
- public Schema getTableSchemaFromCommitMetadata() throws Exception {
+ private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) {
try {
HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get();
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
- return new Schema.Parser().parse(existingSchemaStr);
+
+ if (StringUtils.isNullOrEmpty(existingSchemaStr)) {
+ return Option.empty();
+ }
+
+ Schema schema = new Schema.Parser().parse(existingSchemaStr);
+ if (includeMetadataFields) {
+ schema = HoodieAvroUtils.addMetadataFields(schema);
+ }
+ return Option.of(schema);
} catch (Exception e) {
throw new HoodieException("Failed to read schema from commit metadata", e);
}
@@ -179,6 +218,17 @@ public class TableSchemaResolver {
}
/**
+ * Convert a avro scheme to the parquet format.
+ *
+ * @param schema The avro schema to convert
+ * @return The converted parquet schema
+ */
+ public MessageType convertAvroSchemaToParquet(Schema schema) {
+ AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf());
+ return avroSchemaConverter.convert(schema);
+ }
+
+ /**
* HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by
* checking if the data written using the old schema can be read using the new schema.
*
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 9f1a040..f1034e3 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -328,14 +328,15 @@ public class HoodieHiveClient {
}
/**
- * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest
- * commit. We will assume that the schema has not changed within a single atomic write.
+ * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if
+ * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has
+ * not changed within a single atomic write.
*
* @return Parquet schema for this table
*/
public MessageType getDataSchema() {
try {
- return new TableSchemaResolver(metaClient).getDataSchema();
+ return new TableSchemaResolver(metaClient).getTableParquetSchema();
} catch (Exception e) {
throw new HoodieHiveSyncException("Failed to read data schema", e);
}
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 14dfada..a883757 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,6 +18,7 @@
package org.apache.hudi.hive;
+import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SchemaTestUtil;
import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
@@ -52,6 +53,10 @@ public class TestHiveSyncTool {
return Stream.of(false, true);
}
+ private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
+ return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } });
+ }
+
@BeforeEach
public void setUp() throws IOException, InterruptedException {
TestUtil.setUp();
@@ -146,11 +151,11 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
- @MethodSource("useJdbc")
- public void testBasicSync(boolean useJdbc) throws Exception {
+ @MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
+ public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100";
- TestUtil.createCOWTable(instantTime, 5);
+ TestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
assertFalse(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName),
@@ -214,7 +219,7 @@ public class TestHiveSyncTool {
public void testSyncIncremental(boolean useJdbc) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String commitTime1 = "100";
- TestUtil.createCOWTable(commitTime1, 5);
+ TestUtil.createCOWTable(commitTime1, 5, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Lets do the sync
@@ -228,7 +233,7 @@ public class TestHiveSyncTool {
// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "101";
- TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
+ TestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
// Lets do the sync
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -253,7 +258,7 @@ public class TestHiveSyncTool {
public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String commitTime1 = "100";
- TestUtil.createCOWTable(commitTime1, 5);
+ TestUtil.createCOWTable(commitTime1, 5, true);
HoodieHiveClient hiveClient =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Lets do the sync
@@ -265,7 +270,7 @@ public class TestHiveSyncTool {
// Now lets create more parititions and these are the only ones which needs to be synced
DateTime dateTime = DateTime.now().plusDays(6);
String commitTime2 = "101";
- TestUtil.addCOWPartitions(1, false, dateTime, commitTime2);
+ TestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -286,12 +291,13 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
- @MethodSource("useJdbc")
- public void testSyncMergeOnRead(boolean useJdbc) throws Exception {
+ @MethodSource("useJdbcAndSchemaFromCommitMetadata")
+ public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100";
String deltaCommitTime = "101";
- TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
+ TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+ useSchemaFromCommitMetadata);
String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -301,8 +307,19 @@ public class TestHiveSyncTool {
tool.syncHoodieTable();
assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes");
- assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
- "Hive Schema should match the table schema + partition field");
+
+ if (useSchemaFromCommitMetadata) {
+ assertEquals(hiveClient.getTableSchema(roTableName).size(),
+ SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ + HoodieRecord.HOODIE_META_COLUMNS.size(),
+ "Hive Schema should match the table schema + partition field");
+ } else {
+ // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
+ assertEquals(hiveClient.getTableSchema(roTableName).size(),
+ SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
+ "Hive Schema should match the table schema + partition field");
+ }
+
assertEquals(5, hiveClient.scanTablePartitions(roTableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(deltaCommitTime, hiveClient.getLastCommitTimeSynced(roTableName).get(),
@@ -313,15 +330,25 @@ public class TestHiveSyncTool {
String commitTime2 = "102";
String deltaCommitTime2 = "103";
- TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
- TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
+ TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
+ TestUtil.addMORPartitions(1, true, false,
+ useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
- assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
- "Hive Schema should match the evolved table schema + partition field");
+ if (useSchemaFromCommitMetadata) {
+ assertEquals(hiveClient.getTableSchema(roTableName).size(),
+ SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ + HoodieRecord.HOODIE_META_COLUMNS.size(),
+ "Hive Schema should match the evolved table schema + partition field");
+ } else {
+ // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
+ assertEquals(hiveClient.getTableSchema(roTableName).size(),
+ SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
+ "Hive Schema should match the evolved table schema + partition field");
+ }
// Sync should add the one partition
assertEquals(6, hiveClient.scanTablePartitions(roTableName).size(),
"The 2 partitions we wrote should be added to hive");
@@ -330,13 +357,13 @@ public class TestHiveSyncTool {
}
@ParameterizedTest
- @MethodSource("useJdbc")
- public void testSyncMergeOnReadRT(boolean useJdbc) throws Exception {
+ @MethodSource("useJdbcAndSchemaFromCommitMetadata")
+ public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100";
String deltaCommitTime = "101";
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
- TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
+ TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -352,8 +379,18 @@ public class TestHiveSyncTool {
"Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should exist after sync completes");
- assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
- "Hive Schema should match the table schema + partition field");
+ if (useSchemaFromCommitMetadata) {
+ assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+ SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ + HoodieRecord.HOODIE_META_COLUMNS.size(),
+ "Hive Schema should match the table schema + partition field");
+ } else {
+ // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
+ assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+ SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
+ "Hive Schema should match the table schema + partition field");
+ }
+
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
"Table partitions should match the number of partitions we wrote");
assertEquals(deltaCommitTime, hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(),
@@ -364,15 +401,24 @@ public class TestHiveSyncTool {
String commitTime2 = "102";
String deltaCommitTime2 = "103";
- TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
- TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
+ TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
+ TestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
- assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
- "Hive Schema should match the evolved table schema + partition field");
+ if (useSchemaFromCommitMetadata) {
+ assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+ SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ + HoodieRecord.HOODIE_META_COLUMNS.size(),
+ "Hive Schema should match the evolved table schema + partition field");
+ } else {
+ // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
+ assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+ SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
+ "Hive Schema should match the evolved table schema + partition field");
+ }
// Sync should add the one partition
assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
"The 2 partitions we wrote should be added to hive");
@@ -385,7 +431,7 @@ public class TestHiveSyncTool {
public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String instantTime = "100";
- TestUtil.createCOWTable(instantTime, 5);
+ TestUtil.createCOWTable(instantTime, 5, true);
HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(TestUtil.hiveSyncConfig);
hiveSyncConfig.partitionValueExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName();
@@ -416,7 +462,7 @@ public class TestHiveSyncTool {
TestUtil.hiveSyncConfig.useJdbc = useJdbc;
String commitTime = "100";
String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
- TestUtil.createMORTable(commitTime, "", 5, false);
+ TestUtil.createMORTable(commitTime, "", 5, false, true);
HoodieHiveClient hiveClientRT =
new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -431,7 +477,9 @@ public class TestHiveSyncTool {
+ " should exist after sync completes");
// Schema being read from compacted base files
- assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
+ assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+ SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ + HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the table schema + partition field");
assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote");
@@ -440,14 +488,16 @@ public class TestHiveSyncTool {
String commitTime2 = "102";
String deltaCommitTime2 = "103";
- TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
+ TestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
tool.syncHoodieTable();
hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
// Schema being read from the log files
- assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
+ assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+ SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+ + HoodieRecord.HOODIE_META_COLUMNS.size(),
"Hive Schema should match the evolved table schema + partition field");
// Sync should add the one partition
assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 1 partition we wrote should be added to hive");
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
index 86c78fb..960a010 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
@@ -155,7 +155,7 @@ public class TestUtil {
}
}
- static void createCOWTable(String instantTime, int numberOfPartitions)
+ static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata)
throws IOException, URISyntaxException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -164,13 +164,15 @@ public class TestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
- HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, instantTime);
+ HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
+ useSchemaFromCommitMetadata, dateTime, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
}
static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
- boolean createDeltaCommit) throws IOException, URISyntaxException, InterruptedException {
+ boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
+ throws IOException, URISyntaxException, InterruptedException {
Path path = new Path(hiveSyncConfig.basePath);
FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ,
@@ -179,46 +181,54 @@ public class TestUtil {
boolean result = fileSystem.mkdirs(path);
checkResult(result);
DateTime dateTime = DateTime.now();
- HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
+ HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
+ useSchemaFromCommitMetadata, dateTime, commitTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createdTablesSet
.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
+ addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
+ useSchemaFromCommitMetadata);
createCompactionCommitFile(compactionMetadata, commitTime);
if (createDeltaCommit) {
// Write a delta commit
- HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
+ HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true,
+ useSchemaFromCommitMetadata);
createDeltaCommitFile(deltaMetadata, deltaCommitTime);
}
}
- static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom,
- String instantTime) throws IOException, URISyntaxException {
+ static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
+ boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata =
- createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, instantTime);
+ createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
createCommitFile(commitMetadata, instantTime);
}
static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
- DateTime startFrom, String instantTime, String deltaCommitTime)
+ boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
throws IOException, URISyntaxException, InterruptedException {
- HoodieCommitMetadata commitMetadata =
- createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, instantTime);
+ HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
+ useSchemaFromCommitMetadata, startFrom, instantTime);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
commitMetadata.getPartitionToWriteStats()
.forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
+ addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
+ useSchemaFromCommitMetadata);
createCompactionCommitFile(compactionMetadata, instantTime);
- HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple);
+ HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple,
+ useSchemaFromCommitMetadata);
createDeltaCommitFile(deltaMetadata, deltaCommitTime);
}
private static HoodieCommitMetadata createLogFiles(Map<String, List<HoodieWriteStat>> partitionWriteStats,
- boolean isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException {
+ boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
+ throws InterruptedException, IOException, URISyntaxException {
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
for (Entry<String, List<HoodieWriteStat>> wEntry : partitionWriteStats.entrySet()) {
String partitionPath = wEntry.getKey();
@@ -232,11 +242,12 @@ public class TestUtil {
commitMetadata.addWriteStat(partitionPath, writeStat);
}
}
+ addSchemaToCommitMetadata(commitMetadata, isLogSchemaSimple, useSchemaFromCommitMetadata);
return commitMetadata;
}
private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
- DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
+ boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
startFrom = startFrom.withTimeAtStartOfDay();
HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
@@ -249,6 +260,7 @@ public class TestUtil {
startFrom = startFrom.minusDays(1);
writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
}
+ addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata);
return commitMetadata;
}
@@ -271,7 +283,7 @@ public class TestUtil {
@SuppressWarnings({"unchecked", "deprecation"})
private static void generateParquetData(Path filePath, boolean isParquetSchemaSimple)
throws IOException, URISyntaxException {
- Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
+ Schema schema = getTestDataSchema(isParquetSchemaSimple);
org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
BloomFilterTypeCode.SIMPLE.name());
@@ -294,7 +306,7 @@ public class TestUtil {
private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple)
throws IOException, InterruptedException, URISyntaxException {
- Schema schema = (isLogSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
+ Schema schema = getTestDataSchema(isLogSchemaSimple);
HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(parquetFilePath));
// Write a log file for this parquet file
Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
@@ -311,6 +323,25 @@ public class TestUtil {
return logWriter.getLogFile();
}
+ private static Schema getTestDataSchema(boolean isSimpleSchema) throws IOException {
+ return isSimpleSchema ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema();
+ }
+
+ private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, boolean isSimpleSchema,
+ boolean useSchemaFromCommitMetadata) throws IOException {
+ if (useSchemaFromCommitMetadata) {
+ Schema dataSchema = getTestDataSchema(isSimpleSchema);
+ commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, dataSchema.toString());
+ }
+ }
+
+ private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, String schema,
+ boolean useSchemaFromCommitMetadata) {
+ if (useSchemaFromCommitMetadata) {
+ commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema);
+ }
+ }
+
private static void checkResult(boolean result) {
if (!result) {
throw new JUnitException("Could not initialize");