You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/05/07 23:34:34 UTC

[incubator-hudi] branch master updated: [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync (#1559)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d54b4b8  [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync (#1559)
d54b4b8 is described below

commit d54b4b8a525868ea6d15e2e2cc6ffccc62d5c43c
Author: Udit Mehrotra <um...@illinois.edu>
AuthorDate: Thu May 7 16:33:09 2020 -0700

    [HUDI-838] Support schema from HoodieCommitMetadata for HiveSync (#1559)
    
    Co-authored-by: Mehrotra <ud...@amazon.com>
---
 .../java/org/apache/hudi/table/HoodieTable.java    |   2 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  10 ++
 .../hudi/common/table/TableSchemaResolver.java     |  64 ++++++++++--
 .../org/apache/hudi/hive/HoodieHiveClient.java     |   7 +-
 .../org/apache/hudi/hive/TestHiveSyncTool.java     | 110 +++++++++++++++------
 .../test/java/org/apache/hudi/hive/TestUtil.java   |  63 +++++++++---
 6 files changed, 199 insertions(+), 57 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
index 9904411..62509e4 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -535,7 +535,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
     try {
       TableSchemaResolver schemaUtil = new TableSchemaResolver(getMetaClient());
       writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
-      tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableSchemaFromCommitMetadata());
+      tableSchema = HoodieAvroUtils.createHoodieWriteSchema(schemaUtil.getTableAvroSchemaWithoutMetadataFields());
       isValid = TableSchemaResolver.isSchemaCompatible(tableSchema, writerSchema);
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema/check compatibility for base path " + metaClient.getBasePath(), e);
diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index c3a9d96..d56b7d9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -171,6 +171,16 @@ public class HoodieAvroUtils {
     return mergedSchema;
   }
 
+  public static Schema removeMetadataFields(Schema schema) {
+    List<Schema.Field> filteredFields = schema.getFields()
+                                              .stream()
+                                              .filter(field -> !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name()))
+                                              .collect(Collectors.toList());
+    Schema filteredSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
+    filteredSchema.setFields(filteredFields);
+    return filteredSchema;
+  }
+
   public static String addMetadataColumnTypes(String hiveColumnTypes) {
     return "string,string,string,string,string," + hiveColumnTypes;
   }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 0bab862..129f85f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -25,6 +25,7 @@ import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaCompatibility;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -36,6 +37,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.InvalidTableException;
@@ -66,7 +68,7 @@ public class TableSchemaResolver {
    * @return Parquet schema for this table
    * @throws Exception
    */
-  public MessageType getDataSchema() throws Exception {
+  private MessageType getTableParquetSchemaFromDataFile() throws Exception {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
 
     try {
@@ -139,29 +141,66 @@ public class TableSchemaResolver {
     }
   }
 
+  private Schema getTableAvroSchemaFromDataFile() throws Exception {
+    return convertParquetSchemaToAvro(getTableParquetSchemaFromDataFile());
+  }
+
   /**
-   * Gets the schema for a hoodie table in Avro format.
+   * Gets full schema (user + metadata) for a hoodie table in Avro format.
    *
    * @return Avro schema for this table
    * @throws Exception
    */
-  public Schema getTableSchema() throws Exception {
-    return convertParquetSchemaToAvro(getDataSchema());
+  public Schema getTableAvroSchema() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true);
+    return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() : getTableAvroSchemaFromDataFile();
+  }
+
+  /**
+   * Gets full schema (user + metadata) for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   * @throws Exception
+   */
+  public MessageType getTableParquetSchema() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(true);
+    return schemaFromCommitMetadata.isPresent() ? convertAvroSchemaToParquet(schemaFromCommitMetadata.get()) :
+           getTableParquetSchemaFromDataFile();
+  }
+
+  /**
+   * Gets users data schema for a hoodie table in Avro format.
+   *
+   * @return  Avro user data schema
+   * @throws Exception
+   */
+  public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception {
+    Option<Schema> schemaFromCommitMetadata = getTableSchemaFromCommitMetadata(false);
+    return schemaFromCommitMetadata.isPresent() ? schemaFromCommitMetadata.get() :
+           HoodieAvroUtils.removeMetadataFields(getTableAvroSchemaFromDataFile());
   }
 
   /**
    * Gets the schema for a hoodie table in Avro format from the HoodieCommitMetadata of the last commit.
    *
    * @return Avro schema for this table
-   * @throws Exception
    */
-  public Schema getTableSchemaFromCommitMetadata() throws Exception {
+  private Option<Schema> getTableSchemaFromCommitMetadata(boolean includeMetadataFields) {
     try {
       HoodieTimeline timeline = metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
       byte[] data = timeline.getInstantDetails(timeline.lastInstant().get()).get();
       HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class);
       String existingSchemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
-      return new Schema.Parser().parse(existingSchemaStr);
+
+      if (StringUtils.isNullOrEmpty(existingSchemaStr)) {
+        return Option.empty();
+      }
+
+      Schema schema = new Schema.Parser().parse(existingSchemaStr);
+      if (includeMetadataFields) {
+        schema = HoodieAvroUtils.addMetadataFields(schema);
+      }
+      return Option.of(schema);
     } catch (Exception e) {
       throw new HoodieException("Failed to read schema from commit metadata", e);
     }
@@ -179,6 +218,17 @@ public class TableSchemaResolver {
   }
 
   /**
+   * Convert a avro scheme to the parquet format.
+   *
+   * @param schema The avro schema to convert
+   * @return The converted parquet schema
+   */
+  public MessageType convertAvroSchemaToParquet(Schema schema) {
+    AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getHadoopConf());
+    return avroSchemaConverter.convert(schema);
+  }
+
+  /**
    * HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by
    * checking if the data written using the old schema can be read using the new schema.
    *
diff --git a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
index 9f1a040..f1034e3 100644
--- a/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
+++ b/hudi-hive-sync/src/main/java/org/apache/hudi/hive/HoodieHiveClient.java
@@ -328,14 +328,15 @@ public class HoodieHiveClient {
   }
 
   /**
-   * Gets the schema for a hoodie table. Depending on the type of table, read from any file written in the latest
-   * commit. We will assume that the schema has not changed within a single atomic write.
+   * Gets the schema for a hoodie table. Depending on the type of table, try to read schema from commit metadata if
+   * present, else fallback to reading from any file written in the latest commit. We will assume that the schema has
+   * not changed within a single atomic write.
    *
    * @return Parquet schema for this table
    */
   public MessageType getDataSchema() {
     try {
-      return new TableSchemaResolver(metaClient).getDataSchema();
+      return new TableSchemaResolver(metaClient).getTableParquetSchema();
     } catch (Exception e) {
       throw new HoodieHiveSyncException("Failed to read data schema", e);
     }
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 14dfada..a883757 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hive;
 
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.SchemaTestUtil;
 import org.apache.hudi.hive.HoodieHiveClient.PartitionEvent;
@@ -52,6 +53,10 @@ public class TestHiveSyncTool {
     return Stream.of(false, true);
   }
 
+  private static Iterable<Object[]> useJdbcAndSchemaFromCommitMetadata() {
+    return Arrays.asList(new Object[][] { { true, true }, { true, false }, { false, true }, { false, false } });
+  }
+
   @BeforeEach
   public void setUp() throws IOException, InterruptedException {
     TestUtil.setUp();
@@ -146,11 +151,11 @@ public class TestHiveSyncTool {
   }
 
   @ParameterizedTest
-  @MethodSource("useJdbc")
-  public void testBasicSync(boolean useJdbc) throws Exception {
+  @MethodSource({"useJdbcAndSchemaFromCommitMetadata"})
+  public void testBasicSync(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String instantTime = "100";
-    TestUtil.createCOWTable(instantTime, 5);
+    TestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
     HoodieHiveClient hiveClient =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
     assertFalse(hiveClient.doesTableExist(TestUtil.hiveSyncConfig.tableName),
@@ -214,7 +219,7 @@ public class TestHiveSyncTool {
   public void testSyncIncremental(boolean useJdbc) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String commitTime1 = "100";
-    TestUtil.createCOWTable(commitTime1, 5);
+    TestUtil.createCOWTable(commitTime1, 5, true);
     HoodieHiveClient hiveClient =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
     // Lets do the sync
@@ -228,7 +233,7 @@ public class TestHiveSyncTool {
     // Now lets create more parititions and these are the only ones which needs to be synced
     DateTime dateTime = DateTime.now().plusDays(6);
     String commitTime2 = "101";
-    TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
+    TestUtil.addCOWPartitions(1, true, true, dateTime, commitTime2);
 
     // Lets do the sync
     hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -253,7 +258,7 @@ public class TestHiveSyncTool {
   public void testSyncIncrementalWithSchemaEvolution(boolean useJdbc) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String commitTime1 = "100";
-    TestUtil.createCOWTable(commitTime1, 5);
+    TestUtil.createCOWTable(commitTime1, 5, true);
     HoodieHiveClient hiveClient =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
     // Lets do the sync
@@ -265,7 +270,7 @@ public class TestHiveSyncTool {
     // Now lets create more parititions and these are the only ones which needs to be synced
     DateTime dateTime = DateTime.now().plusDays(6);
     String commitTime2 = "101";
-    TestUtil.addCOWPartitions(1, false, dateTime, commitTime2);
+    TestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
 
     // Lets do the sync
     tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -286,12 +291,13 @@ public class TestHiveSyncTool {
   }
 
   @ParameterizedTest
-  @MethodSource("useJdbc")
-  public void testSyncMergeOnRead(boolean useJdbc) throws Exception {
+  @MethodSource("useJdbcAndSchemaFromCommitMetadata")
+  public void testSyncMergeOnRead(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String instantTime = "100";
     String deltaCommitTime = "101";
-    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
+    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
+                            useSchemaFromCommitMetadata);
 
     String roTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
     HoodieHiveClient hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
@@ -301,8 +307,19 @@ public class TestHiveSyncTool {
     tool.syncHoodieTable();
 
     assertTrue(hiveClient.doesTableExist(roTableName), "Table " + roTableName + " should exist after sync completes");
-    assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
-        "Hive Schema should match the table schema + partition field");
+
+    if (useSchemaFromCommitMetadata) {
+      assertEquals(hiveClient.getTableSchema(roTableName).size(),
+                   SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+                     + HoodieRecord.HOODIE_META_COLUMNS.size(),
+                   "Hive Schema should match the table schema + partition field");
+    } else {
+      // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
+      assertEquals(hiveClient.getTableSchema(roTableName).size(),
+                   SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
+                   "Hive Schema should match the table schema + partition field");
+    }
+
     assertEquals(5, hiveClient.scanTablePartitions(roTableName).size(),
         "Table partitions should match the number of partitions we wrote");
     assertEquals(deltaCommitTime, hiveClient.getLastCommitTimeSynced(roTableName).get(),
@@ -313,15 +330,25 @@ public class TestHiveSyncTool {
     String commitTime2 = "102";
     String deltaCommitTime2 = "103";
 
-    TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
-    TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
+    TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
+    TestUtil.addMORPartitions(1, true, false,
+        useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
     // Lets do the sync
     tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
     tool.syncHoodieTable();
     hiveClient = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
 
-    assertEquals(hiveClient.getTableSchema(roTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
-        "Hive Schema should match the evolved table schema + partition field");
+    if (useSchemaFromCommitMetadata) {
+      assertEquals(hiveClient.getTableSchema(roTableName).size(),
+                   SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+                     + HoodieRecord.HOODIE_META_COLUMNS.size(),
+                   "Hive Schema should match the evolved table schema + partition field");
+    } else {
+      // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
+      assertEquals(hiveClient.getTableSchema(roTableName).size(),
+                   SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
+                   "Hive Schema should match the evolved table schema + partition field");
+    }
     // Sync should add the one partition
     assertEquals(6, hiveClient.scanTablePartitions(roTableName).size(),
         "The 2 partitions we wrote should be added to hive");
@@ -330,13 +357,13 @@ public class TestHiveSyncTool {
   }
 
   @ParameterizedTest
-  @MethodSource("useJdbc")
-  public void testSyncMergeOnReadRT(boolean useJdbc) throws Exception {
+  @MethodSource("useJdbcAndSchemaFromCommitMetadata")
+  public void testSyncMergeOnReadRT(boolean useJdbc, boolean useSchemaFromCommitMetadata) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String instantTime = "100";
     String deltaCommitTime = "101";
     String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
-    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true);
+    TestUtil.createMORTable(instantTime, deltaCommitTime, 5, true, useSchemaFromCommitMetadata);
     HoodieHiveClient hiveClientRT =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
 
@@ -352,8 +379,18 @@ public class TestHiveSyncTool {
         "Table " + TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
             + " should exist after sync completes");
 
-    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
-        "Hive Schema should match the table schema + partition field");
+    if (useSchemaFromCommitMetadata) {
+      assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                   SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+                     + HoodieRecord.HOODIE_META_COLUMNS.size(),
+                   "Hive Schema should match the table schema + partition field");
+    } else {
+      // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
+      assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                   SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
+                   "Hive Schema should match the table schema + partition field");
+    }
+
     assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
         "Table partitions should match the number of partitions we wrote");
     assertEquals(deltaCommitTime, hiveClientRT.getLastCommitTimeSynced(snapshotTableName).get(),
@@ -364,15 +401,24 @@ public class TestHiveSyncTool {
     String commitTime2 = "102";
     String deltaCommitTime2 = "103";
 
-    TestUtil.addCOWPartitions(1, true, dateTime, commitTime2);
-    TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
+    TestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata, dateTime, commitTime2);
+    TestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
     // Lets do the sync
     tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
     tool.syncHoodieTable();
     hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
 
-    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
-        "Hive Schema should match the evolved table schema + partition field");
+    if (useSchemaFromCommitMetadata) {
+      assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                   SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+                     + HoodieRecord.HOODIE_META_COLUMNS.size(),
+                   "Hive Schema should match the evolved table schema + partition field");
+    } else {
+      // The data generated and schema in the data file do not have metadata columns, so we need a separate check.
+      assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                   SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size(),
+                   "Hive Schema should match the evolved table schema + partition field");
+    }
     // Sync should add the one partition
     assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(),
         "The 2 partitions we wrote should be added to hive");
@@ -385,7 +431,7 @@ public class TestHiveSyncTool {
   public void testMultiPartitionKeySync(boolean useJdbc) throws Exception {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String instantTime = "100";
-    TestUtil.createCOWTable(instantTime, 5);
+    TestUtil.createCOWTable(instantTime, 5, true);
 
     HiveSyncConfig hiveSyncConfig = HiveSyncConfig.copy(TestUtil.hiveSyncConfig);
     hiveSyncConfig.partitionValueExtractorClass = MultiPartKeysValueExtractor.class.getCanonicalName();
@@ -416,7 +462,7 @@ public class TestHiveSyncTool {
     TestUtil.hiveSyncConfig.useJdbc = useJdbc;
     String commitTime = "100";
     String snapshotTableName = TestUtil.hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
-    TestUtil.createMORTable(commitTime, "", 5, false);
+    TestUtil.createMORTable(commitTime, "", 5, false, true);
     HoodieHiveClient hiveClientRT =
         new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
 
@@ -431,7 +477,9 @@ public class TestHiveSyncTool {
         + " should exist after sync completes");
 
     // Schema being read from compacted base files
-    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getSimpleSchema().getFields().size() + 1,
+    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                 SchemaTestUtil.getSimpleSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+                   + HoodieRecord.HOODIE_META_COLUMNS.size(),
         "Hive Schema should match the table schema + partition field");
     assertEquals(5, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "Table partitions should match the number of partitions we wrote");
 
@@ -440,14 +488,16 @@ public class TestHiveSyncTool {
     String commitTime2 = "102";
     String deltaCommitTime2 = "103";
 
-    TestUtil.addMORPartitions(1, true, false, dateTime, commitTime2, deltaCommitTime2);
+    TestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2, deltaCommitTime2);
     // Lets do the sync
     tool = new HiveSyncTool(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
     tool.syncHoodieTable();
     hiveClientRT = new HoodieHiveClient(TestUtil.hiveSyncConfig, TestUtil.getHiveConf(), TestUtil.fileSystem);
 
     // Schema being read from the log files
-    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(), SchemaTestUtil.getEvolvedSchema().getFields().size() + 1,
+    assertEquals(hiveClientRT.getTableSchema(snapshotTableName).size(),
+                 SchemaTestUtil.getEvolvedSchema().getFields().size() + TestUtil.hiveSyncConfig.partitionFields.size()
+                   + HoodieRecord.HOODIE_META_COLUMNS.size(),
         "Hive Schema should match the evolved table schema + partition field");
     // Sync should add the one partition
     assertEquals(6, hiveClientRT.scanTablePartitions(snapshotTableName).size(), "The 1 partition we wrote should be added to hive");
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
index 86c78fb..960a010 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
@@ -155,7 +155,7 @@ public class TestUtil {
     }
   }
 
-  static void createCOWTable(String instantTime, int numberOfPartitions)
+  static void createCOWTable(String instantTime, int numberOfPartitions, boolean useSchemaFromCommitMetadata)
       throws IOException, URISyntaxException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
@@ -164,13 +164,15 @@ public class TestUtil {
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
     DateTime dateTime = DateTime.now();
-    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, instantTime);
+    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
+        useSchemaFromCommitMetadata, dateTime, instantTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
     createCommitFile(commitMetadata, instantTime);
   }
 
   static void createMORTable(String commitTime, String deltaCommitTime, int numberOfPartitions,
-      boolean createDeltaCommit) throws IOException, URISyntaxException, InterruptedException {
+      boolean createDeltaCommit, boolean useSchemaFromCommitMetadata)
+      throws IOException, URISyntaxException, InterruptedException {
     Path path = new Path(hiveSyncConfig.basePath);
     FileIOUtils.deleteDirectory(new File(hiveSyncConfig.basePath));
     HoodieTableMetaClient.initTableType(configuration, hiveSyncConfig.basePath, HoodieTableType.MERGE_ON_READ,
@@ -179,46 +181,54 @@ public class TestUtil {
     boolean result = fileSystem.mkdirs(path);
     checkResult(result);
     DateTime dateTime = DateTime.now();
-    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true, dateTime, commitTime);
+    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, true,
+        useSchemaFromCommitMetadata, dateTime, commitTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
     createdTablesSet
         .add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
     HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
     commitMetadata.getPartitionToWriteStats()
         .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
+    addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
+                             useSchemaFromCommitMetadata);
     createCompactionCommitFile(compactionMetadata, commitTime);
     if (createDeltaCommit) {
       // Write a delta commit
-      HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true);
+      HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), true,
+                                                          useSchemaFromCommitMetadata);
       createDeltaCommitFile(deltaMetadata, deltaCommitTime);
     }
   }
 
-  static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, DateTime startFrom,
-                               String instantTime) throws IOException, URISyntaxException {
+  static void addCOWPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
+      boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
     HoodieCommitMetadata commitMetadata =
-        createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, instantTime);
+        createPartitions(numberOfPartitions, isParquetSchemaSimple, useSchemaFromCommitMetadata, startFrom, instantTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName);
     createCommitFile(commitMetadata, instantTime);
   }
 
   static void addMORPartitions(int numberOfPartitions, boolean isParquetSchemaSimple, boolean isLogSchemaSimple,
-                               DateTime startFrom, String instantTime, String deltaCommitTime)
+      boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime, String deltaCommitTime)
       throws IOException, URISyntaxException, InterruptedException {
-    HoodieCommitMetadata commitMetadata =
-        createPartitions(numberOfPartitions, isParquetSchemaSimple, startFrom, instantTime);
+    HoodieCommitMetadata commitMetadata = createPartitions(numberOfPartitions, isParquetSchemaSimple,
+        useSchemaFromCommitMetadata, startFrom, instantTime);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE);
     createdTablesSet.add(hiveSyncConfig.databaseName + "." + hiveSyncConfig.tableName + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE);
     HoodieCommitMetadata compactionMetadata = new HoodieCommitMetadata();
     commitMetadata.getPartitionToWriteStats()
         .forEach((key, value) -> value.forEach(l -> compactionMetadata.addWriteStat(key, l)));
+    addSchemaToCommitMetadata(compactionMetadata, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY),
+                             useSchemaFromCommitMetadata);
     createCompactionCommitFile(compactionMetadata, instantTime);
-    HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple);
+    HoodieCommitMetadata deltaMetadata = createLogFiles(commitMetadata.getPartitionToWriteStats(), isLogSchemaSimple,
+        useSchemaFromCommitMetadata);
     createDeltaCommitFile(deltaMetadata, deltaCommitTime);
   }
 
   private static HoodieCommitMetadata createLogFiles(Map<String, List<HoodieWriteStat>> partitionWriteStats,
-                                                     boolean isLogSchemaSimple) throws InterruptedException, IOException, URISyntaxException {
+      boolean isLogSchemaSimple, boolean useSchemaFromCommitMetadata)
+      throws InterruptedException, IOException, URISyntaxException {
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
     for (Entry<String, List<HoodieWriteStat>> wEntry : partitionWriteStats.entrySet()) {
       String partitionPath = wEntry.getKey();
@@ -232,11 +242,12 @@ public class TestUtil {
         commitMetadata.addWriteStat(partitionPath, writeStat);
       }
     }
+    addSchemaToCommitMetadata(commitMetadata, isLogSchemaSimple, useSchemaFromCommitMetadata);
     return commitMetadata;
   }
 
   private static HoodieCommitMetadata createPartitions(int numberOfPartitions, boolean isParquetSchemaSimple,
-                                                       DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
+      boolean useSchemaFromCommitMetadata, DateTime startFrom, String instantTime) throws IOException, URISyntaxException {
     startFrom = startFrom.withTimeAtStartOfDay();
 
     HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
@@ -249,6 +260,7 @@ public class TestUtil {
       startFrom = startFrom.minusDays(1);
       writeStats.forEach(s -> commitMetadata.addWriteStat(partitionPath, s));
     }
+    addSchemaToCommitMetadata(commitMetadata, isParquetSchemaSimple, useSchemaFromCommitMetadata);
     return commitMetadata;
   }
 
@@ -271,7 +283,7 @@ public class TestUtil {
   @SuppressWarnings({"unchecked", "deprecation"})
   private static void generateParquetData(Path filePath, boolean isParquetSchemaSimple)
       throws IOException, URISyntaxException {
-    Schema schema = (isParquetSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
+    Schema schema = getTestDataSchema(isParquetSchemaSimple);
     org.apache.parquet.schema.MessageType parquetSchema = new AvroSchemaConverter().convert(schema);
     BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.0001, -1,
         BloomFilterTypeCode.SIMPLE.name());
@@ -294,7 +306,7 @@ public class TestUtil {
 
   private static HoodieLogFile generateLogData(Path parquetFilePath, boolean isLogSchemaSimple)
       throws IOException, InterruptedException, URISyntaxException {
-    Schema schema = (isLogSchemaSimple ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema());
+    Schema schema = getTestDataSchema(isLogSchemaSimple);
     HoodieBaseFile dataFile = new HoodieBaseFile(fileSystem.getFileStatus(parquetFilePath));
     // Write a log file for this parquet file
     Writer logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(parquetFilePath.getParent())
@@ -311,6 +323,25 @@ public class TestUtil {
     return logWriter.getLogFile();
   }
 
+  private static Schema getTestDataSchema(boolean isSimpleSchema) throws IOException {
+    return isSimpleSchema ? SchemaTestUtil.getSimpleSchema() : SchemaTestUtil.getEvolvedSchema();
+  }
+
+  private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, boolean isSimpleSchema,
+      boolean useSchemaFromCommitMetadata) throws IOException {
+    if (useSchemaFromCommitMetadata) {
+      Schema dataSchema = getTestDataSchema(isSimpleSchema);
+      commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, dataSchema.toString());
+    }
+  }
+
+  private static void addSchemaToCommitMetadata(HoodieCommitMetadata commitMetadata, String schema,
+      boolean useSchemaFromCommitMetadata) {
+    if (useSchemaFromCommitMetadata) {
+      commitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, schema);
+    }
+  }
+
   private static void checkResult(boolean result) {
     if (!result) {
       throw new JUnitException("Could not initialize");