You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/03/27 02:44:31 UTC

[incubator-hudi] branch master updated: Fixing source schema and writer schema distinction in payloads

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d9041e  Fixing source schema and writer schema distinction in payloads
3d9041e is described below

commit 3d9041e2165bb0de0aaf71ad04cee16daa7d74b4
Author: Nishith Agarwal <na...@uber.com>
AuthorDate: Fri Mar 22 16:27:51 2019 -0700

    Fixing source schema and writer schema distinction in payloads
---
 .../hoodie/func/CopyOnWriteLazyInsertIterable.java |  2 +-
 .../com/uber/hoodie/io/HoodieAppendHandle.java     | 13 ++++----
 .../com/uber/hoodie/io/HoodieCreateHandle.java     | 17 +++++++---
 .../java/com/uber/hoodie/io/HoodieIOHandle.java    | 26 +++++++++++-----
 .../java/com/uber/hoodie/io/HoodieMergeHandle.java | 22 ++++++++-----
 .../uber/hoodie/table/HoodieCopyOnWriteTable.java  |  2 +-
 .../hoodie/common/HoodieTestDataGenerator.java     | 36 ++++++++++++++++++++++
 .../uber/hoodie/table/TestCopyOnWriteTable.java    | 20 ++++++++++++
 .../hoodie/common/model/HoodieAvroPayload.java     |  3 +-
 .../hoodie/OverwriteWithLatestAvroPayload.java     |  3 +-
 10 files changed, 114 insertions(+), 30 deletions(-)

diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
index 657c786..30b353b 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
@@ -95,7 +95,7 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
     BoundedInMemoryExecutor<HoodieRecord<T>,
         HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor = null;
     try {
-      final Schema schema = HoodieIOHandle.createHoodieWriteSchema(hoodieConfig);
+      final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
       bufferedIteratorExecutor =
           new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr,
               getInsertHandler(), getTransformFunction(schema));
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index 69e36ee..7cd2267 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -149,9 +149,10 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
   private Optional<IndexedRecord> getIndexedRecord(HoodieRecord<T> hoodieRecord) {
     Optional recordMetadata = hoodieRecord.getData().getMetadata();
     try {
-      Optional<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(schema);
-
+      Optional<IndexedRecord> avroRecord = hoodieRecord.getData().getInsertValue(originalSchema);
       if (avroRecord.isPresent()) {
+        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
+        avroRecord = Optional.of(rewriteRecord((GenericRecord) avroRecord.get()));
         String seqId = HoodieRecord.generateSequenceId(commitTime, TaskContext.getPartitionId(),
             recordIndex.getAndIncrement());
         HoodieAvroUtils
@@ -183,8 +184,8 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
     return Optional.empty();
   }
 
-  // TODO (NA) - Perform a schema check of current input record with the last schema on log file
-  // to make sure we don't append records with older (shorter) schema than already appended
+  // TODO (NA) - Perform a writerSchema check of current input record with the last writerSchema on log file
+  // to make sure we don't append records with older (shorter) writerSchema than already appended
   public void doAppend() {
     while (recordItr.hasNext()) {
       HoodieRecord record = recordItr.next();
@@ -199,7 +200,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
   private void doAppend(Map<HoodieLogBlock.HeaderMetadataType, String> header) {
     try {
       header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
-      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+      header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, writerSchema.toString());
       if (recordList.size() > 0) {
         writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, header));
         recordList.clear();
@@ -304,4 +305,4 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
     }
   }
 
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
index c56d30c..c677ad3 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
@@ -32,6 +32,7 @@ import com.uber.hoodie.table.HoodieTable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Optional;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
@@ -49,6 +50,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
   private long insertRecordsWritten = 0;
   private long recordsDeleted = 0;
   private Iterator<HoodieRecord<T>> recordIterator;
+  private boolean useWriterSchema;
 
   public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
       String partitionPath, String fileId) {
@@ -68,7 +70,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
           new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
       partitionMetadata.trySave(TaskContext.getPartitionId());
       this.storageWriter = HoodieStorageWriterFactory
-          .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema);
+          .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
     } catch (IOException e) {
       throw new HoodieInsertException(
           "Failed to initialize HoodieStorageWriter for path " + getStorageWriterPath(), e);
@@ -80,6 +82,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
       String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordIterator) {
     this(config, commitTime, hoodieTable, partitionPath, fileId);
     this.recordIterator = recordIterator;
+    this.useWriterSchema = true;
   }
 
   @Override
@@ -94,7 +97,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
     Optional recordMetadata = record.getData().getMetadata();
     try {
       if (avroRecord.isPresent()) {
-        storageWriter.writeAvroWithMetadata(avroRecord.get(), record);
+        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
+        IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) avroRecord.get());
+        storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, record);
         // update the new location of record, so we know where to find it next
         record.setNewLocation(new HoodieRecordLocation(commitTime, writeStatus.getFileId()));
         recordsWritten++;
@@ -122,7 +127,11 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
     try {
       while (recordIterator.hasNext()) {
         HoodieRecord<T> record = recordIterator.next();
-        write(record, record.getData().getInsertValue(schema));
+        if (useWriterSchema) {
+          write(record, record.getData().getInsertValue(writerSchema));
+        } else {
+          write(record, record.getData().getInsertValue(originalSchema));
+        }
       }
     } catch (IOException io) {
       throw new HoodieInsertException(
@@ -170,4 +179,4 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
     // Use tempPath for storage writer if possible
     return (this.tempPath == null) ? this.path : this.tempPath;
   }
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
index af8574f..b4a03d1 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
@@ -31,6 +31,7 @@ import com.uber.hoodie.table.HoodieTable;
 import java.io.IOException;
 import java.util.Optional;
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -45,7 +46,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
   protected final HoodieWriteConfig config;
   protected final FileSystem fs;
   protected final HoodieTable<T> hoodieTable;
-  protected final Schema schema;
+  protected final Schema originalSchema;
+  protected final Schema writerSchema;
   protected HoodieTimeline hoodieTimeline;
   protected HoodieTimer timer;
   protected final WriteStatus writeStatus;
@@ -56,7 +58,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
     this.fs = hoodieTable.getMetaClient().getFs();
     this.hoodieTable = hoodieTable;
     this.hoodieTimeline = hoodieTable.getCompletedCommitsTimeline();
-    this.schema = createHoodieWriteSchema(config);
+    this.originalSchema = new Schema.Parser().parse(config.getSchema());
+    this.writerSchema = createHoodieWriteSchema(originalSchema);
     this.timer = new HoodieTimer().startTimer();
     this.writeStatus = ReflectionUtils.loadClass(config.getWriteStatusClassName());
   }
@@ -83,8 +86,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
     }
   }
 
-  public static Schema createHoodieWriteSchema(HoodieWriteConfig config) {
-    return HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
+  public static Schema createHoodieWriteSchema(Schema originalSchema) {
+    return HoodieAvroUtils.addMetadataFields(originalSchema);
   }
 
   public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) {
@@ -107,8 +110,8 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
             taskAttemptId));
   }
 
-  public Schema getSchema() {
-    return schema;
+  public Schema getWriterSchema() {
+    return writerSchema;
   }
 
   /**
@@ -142,7 +145,16 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
     }
   }
 
+  /**
+   * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata fields
+   * @param record
+   * @return
+   */
+  protected GenericRecord rewriteRecord(GenericRecord record) {
+    return HoodieAvroUtils.rewriteRecord(record, writerSchema);
+  }
+
   public abstract WriteStatus close();
 
   public abstract WriteStatus getWriteStatus();
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
index 1b12931..1ad8b0f 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
@@ -64,6 +64,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
   private long recordsDeleted = 0;
   private long updatedRecordsWritten = 0;
   private long insertRecordsWritten = 0;
+  private boolean useWriterSchema;
 
   public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
       Iterator<HoodieRecord<T>> recordItr, String fileId) {
@@ -80,6 +81,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
     super(config, commitTime, hoodieTable);
     this.fileSystemView = hoodieTable.getROFileSystemView();
     this.keyToNewRecords = keyToNewRecords;
+    this.useWriterSchema = true;
     init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get())
         .getPartitionPath(), dataFileToBeMerged);
   }
@@ -125,7 +127,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
       writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
       // Create the writer for writing the new version file
       storageWriter = HoodieStorageWriterFactory
-          .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, schema);
+          .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
     } catch (IOException io) {
       logger.error("Error in update task at commit " + commitTime, io);
       writeStatus.setGlobalError(io);
@@ -143,7 +145,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
       // Load the new records in a map
       logger.info("MaxMemoryPerPartitionMerge => " + config.getMaxMemoryPerPartitionMerge());
       this.keyToNewRecords = new ExternalSpillableMap<>(config.getMaxMemoryPerPartitionMerge(),
-          config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(schema));
+          config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(originalSchema));
     } catch (IOException io) {
       throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
     }
@@ -177,7 +179,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
     Optional recordMetadata = hoodieRecord.getData().getMetadata();
     try {
       if (indexedRecord.isPresent()) {
-        storageWriter.writeAvroWithMetadata(indexedRecord.get(), hoodieRecord);
+        // Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
+        IndexedRecord recordWithMetadataInSchema = rewriteRecord((GenericRecord) indexedRecord.get());
+        storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, hoodieRecord);
         recordsWritten++;
       } else {
         recordsDeleted++;
@@ -209,7 +213,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
       HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
       try {
         Optional<IndexedRecord> combinedAvroRecord = hoodieRecord.getData()
-            .combineAndGetUpdateValue(oldRecord, schema);
+            .combineAndGetUpdateValue(oldRecord, useWriterSchema ? writerSchema : originalSchema);
         if (writeUpdateRecord(hoodieRecord, combinedAvroRecord)) {
           /* ONLY WHEN
            * 1) we have an update for this key AND
@@ -235,7 +239,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
         storageWriter.writeAvro(key, oldRecord);
       } catch (ClassCastException e) {
         logger.error("Schema mismatch when rewriting old record " + oldRecord + " from file "
-            + getOldFilePath() + " to file " + getStorageWriterPath() + " with schema " + schema
+            + getOldFilePath() + " to file " + getStorageWriterPath() + " with writerSchema " + writerSchema
             .toString(true));
         throw new HoodieUpsertException(errMsg, e);
       } catch (IOException e) {
@@ -254,7 +258,11 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
       for (String key : keyToNewRecords.keySet()) {
         if (!writtenRecordKeys.contains(key)) {
           HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
-          writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(schema));
+          if (useWriterSchema) {
+            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(writerSchema));
+          } else {
+            writeRecord(hoodieRecord, hoodieRecord.getData().getInsertValue(originalSchema));
+          }
           insertRecordsWritten++;
         }
       }
@@ -293,4 +301,4 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
   public WriteStatus getWriteStatus() {
     return writeStatus;
   }
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
index ea7eceb..a1d9731 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
@@ -199,7 +199,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId);
     } else {
-      AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema());
+      AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getWriterSchema());
       BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
       try (ParquetReader<IndexedRecord> reader = AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath())
           .withConf(getHadoopConf()).build()) {
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java
index d1713c5..ca776fb 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java
@@ -17,6 +17,7 @@
 package com.uber.hoodie.common;
 
 import com.uber.hoodie.avro.model.HoodieCompactionPlan;
+import com.uber.hoodie.common.model.HoodieAvroPayload;
 import com.uber.hoodie.common.model.HoodieCommitMetadata;
 import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodiePartitionMetadata;
@@ -105,6 +106,14 @@ public class HoodieTestDataGenerator {
     return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
   }
 
+  /**
+   * Generates a new avro record of the above schema format, retaining the key if optionally provided.
+   */
+  public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commitTime) throws IOException {
+    GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0);
+    return new HoodieAvroPayload(Optional.of(rec));
+  }
+
   public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
       double timestamp) {
     GenericRecord rec = new GenericData.Record(avroSchema);
@@ -207,6 +216,33 @@ public class HoodieTestDataGenerator {
     return copy;
   }
 
+  public List<HoodieRecord> generateInsertsWithHoodieAvroPayload(String commitTime, int limit) throws
+      IOException {
+    List<HoodieRecord> inserts = new ArrayList<>();
+    for (int i = 0; i < limit; i++) {
+      String partitionPath = partitionPaths[rand.nextInt(partitionPaths.length)];
+      HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
+      HoodieRecord record = new HoodieRecord(key, generateAvroPayload(key, commitTime));
+      inserts.add(record);
+
+      KeyPartition kp = new KeyPartition();
+      kp.key = key;
+      kp.partitionPath = partitionPath;
+      existingKeysList.add(kp);
+    }
+    return inserts;
+  }
+
+  public List<HoodieRecord> generateUpdatesWithHoodieAvroPayload(String commitTime, List<HoodieRecord> baseRecords)
+      throws IOException {
+    List<HoodieRecord> updates = new ArrayList<>();
+    for (HoodieRecord baseRecord : baseRecords) {
+      HoodieRecord record = new HoodieRecord(baseRecord.getKey(), generateAvroPayload(baseRecord.getKey(), commitTime));
+      updates.add(record);
+    }
+    return updates;
+  }
+
   public List<HoodieRecord> generateDeletes(String commitTime, Integer n) throws IOException {
     List<HoodieRecord> inserts = generateInserts(commitTime, n);
     return generateDeletesFromExistingRecords(inserts);
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
index 2536c47..3db10a4 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
@@ -482,6 +482,26 @@ public class TestCopyOnWriteTable {
     assertEquals("First insert bucket should have weight 0.5", 200.0 / 2400, insertBuckets.get(0).weight, 0.01);
   }
 
+  @Test
+  public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
+    HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(
+        HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
+    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    String commitTime = "000";
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    // Perform inserts of 100 records to test CreateHandle and BufferedExecutor
+    List<HoodieRecord> inserts = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 100);
+    Iterator<List<WriteStatus>> ws = table.handleInsert(commitTime, inserts.iterator());
+    WriteStatus writeStatus = ws.next().get(0);
+    String fileId = writeStatus.getFileId();
+    metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
+    table = new HoodieCopyOnWriteTable(config, jsc);
+    // Perform update of 100 records to test MergeHandle and BufferedExecutor
+    table.handleUpdate("001", fileId,
+        dataGenerator.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords()).iterator());
+  }
+
   @After
   public void cleanup() {
     if (basePath != null) {
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java
index 6920790..0eb39a3 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieAvroPayload.java
@@ -62,7 +62,6 @@ public class HoodieAvroPayload implements HoodieRecordPayload<HoodieAvroPayload>
     if (recordBytes.length == 0) {
       return Optional.empty();
     }
-    Optional<GenericRecord> record = Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
-    return record.map(r -> HoodieAvroUtils.rewriteRecord(r, schema));
+    return Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
   }
 }
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java
index 0b454f1..8dac77b 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/OverwriteWithLatestAvroPayload.java
@@ -66,7 +66,6 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload implements
 
   @Override
   public Optional<IndexedRecord> getInsertValue(Schema schema) throws IOException {
-    return Optional.of(HoodieAvroUtils.rewriteRecord(HoodieAvroUtils.bytesToAvro(recordBytes, Schema.parse(schemaStr)),
-        schema));
+    return Optional.of(HoodieAvroUtils.bytesToAvro(recordBytes, schema));
   }
 }