You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/07/01 03:07:25 UTC

[hudi] branch master updated: [HUDI-760]Remove Rolling Stat management from Hudi Writer (#1739)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2be924f  [HUDI-760]Remove Rolling Stat management from Hudi Writer (#1739)
2be924f is described below

commit 2be924fd3a04e2106f1ad3f9ce91890ea58c8a88
Author: baobaoyeye <ba...@gmail.com>
AuthorDate: Wed Jul 1 11:07:09 2020 +0800

    [HUDI-760]Remove Rolling Stat management from Hudi Writer (#1739)
---
 .../hudi/client/AbstractHoodieWriteClient.java     |  47 +-------
 .../action/commit/BaseCommitActionExecutor.java    |  15 +--
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  31 ++----
 .../hudi/table/TestHoodieMergeOnReadTable.java     | 120 ++++++++-------------
 4 files changed, 58 insertions(+), 155 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index af13171..096bc2e 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -22,8 +22,6 @@ import com.codahale.metrics.Timer;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieRollingStat;
-import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -104,8 +102,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
     List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
-
-    updateMetadataAndRollingStats(actionType, metadata, stats);
+    stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));
 
     // Finalize write
     finalizeWrite(table, instantTime, stats);
@@ -175,48 +172,6 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
     }
   }
 
-  private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata,
-      List<HoodieWriteStat> writeStats) {
-    // TODO : make sure we cannot rollback / archive last commit file
-    try {
-      // Create a Hoodie table which encapsulated the commits and files visible
-      HoodieTable table = HoodieTable.create(config, hadoopConf);
-      // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise
-      // there may be race conditions
-      HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType);
-      // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there.
-      // 2. Now, first read the existing rolling stats and merge with the result of current metadata.
-
-      // Need to do this on every commit (delta or commit) to support COW and MOR.
-
-      for (HoodieWriteStat stat : writeStats) {
-        String partitionPath = stat.getPartitionPath();
-        // TODO: why is stat.getPartitionPath() null at times here.
-        metadata.addWriteStat(partitionPath, stat);
-        HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(),
-            stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()), stat.getNumUpdateWrites(),
-            stat.getNumDeletes(), stat.getTotalWriteBytes());
-        rollingStatMetadata.addRollingStat(partitionPath, hoodieRollingStat);
-      }
-      // The last rolling stat should be present in the completed timeline
-      Option<HoodieInstant> lastInstant =
-          table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();
-      if (lastInstant.isPresent()) {
-        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
-            table.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
-        Option<String> lastRollingStat = Option
-            .ofNullable(commitMetadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY));
-        if (lastRollingStat.isPresent()) {
-          rollingStatMetadata = rollingStatMetadata
-              .merge(HoodieCommitMetadata.fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class));
-        }
-      }
-      metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString());
-    } catch (IOException io) {
-      throw new HoodieCommitException("Unable to save rolling stats");
-    }
-  }
-
   public HoodieMetrics getMetrics() {
     return metrics;
   }
diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 0717fd2..40185c6 100644
--- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -190,10 +190,9 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
 
     result.setCommitted(true);
     List<HoodieWriteStat> stats = result.getWriteStatuses().map(WriteStatus::getStat).collect();
+    stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat));
     result.setWriteStats(stats);
 
-    updateMetadataAndRollingStats(metadata, stats);
-
     // Finalize write
     finalizeWrite(instantTime, stats, result);
 
@@ -230,18 +229,6 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
     }
   }
 
-  private void updateMetadataAndRollingStats(HoodieCommitMetadata metadata, List<HoodieWriteStat> writeStats) {
-    // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there.
-    // 2. Now, first read the existing rolling stats and merge with the result of current metadata.
-
-    // Need to do this on every commit (delta or commit) to support COW and MOR.
-    for (HoodieWriteStat stat : writeStats) {
-      String partitionPath = stat.getPartitionPath();
-      // TODO: why is stat.getPartitionPath() null at times here.
-      metadata.addWriteStat(partitionPath, stat);
-    }
-  }
-
   protected boolean isWorkloadProfileNeeded() {
     return true;
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
index 8107cdf..fc1d6ba 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
@@ -24,8 +24,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRollingStat;
-import org.apache.hudi.common.model.HoodieRollingStatMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -849,10 +848,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
   }
 
   /**
-   * Test to ensure commit metadata points to valid files.
+   * Test to ensure commit metadata points to valid files.10.
    */
   @Test
-  public void testRollingStatsInMetadata() throws Exception {
+  public void testMetadataStatsOnCommit() throws Exception {
 
     HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
     HoodieWriteClient client = getHoodieWriteClient(cfg);
@@ -876,14 +875,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     String everything = FileIOUtils.readAsUTFString(inputStream);
     HoodieCommitMetadata metadata =
         HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class);
-    HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromJsonString(
-        metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY),
-        HoodieRollingStatMetadata.class);
     int inserts = 0;
-    for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
-        .entrySet()) {
-      for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
-        inserts += stat.getValue().getInserts();
+    for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+      for (HoodieWriteStat stat : pstat.getValue()) {
+        inserts += stat.getNumInserts();
       }
     }
     assertEquals(200, inserts);
@@ -905,19 +900,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
     inputStream = new FileInputStream(filename);
     everything = FileIOUtils.readAsUTFString(inputStream);
     metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class);
-    rollingStatMetadata = HoodieCommitMetadata.fromJsonString(
-        metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY),
-        HoodieRollingStatMetadata.class);
     inserts = 0;
     int upserts = 0;
-    for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
-        .entrySet()) {
-      for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
-        inserts += stat.getValue().getInserts();
-        upserts += stat.getValue().getUpserts();
+    for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+      for (HoodieWriteStat stat : pstat.getValue()) {
+        inserts += stat.getNumInserts();
+        upserts += stat.getNumUpdateWrites();
       }
     }
-    assertEquals(200, inserts);
+    assertEquals(0, inserts);
     assertEquals(200, upserts);
 
   }
diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 0e94025..8e0afbc 100644
--- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -28,9 +28,8 @@ import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.HoodieRollingStat;
-import org.apache.hudi.common.model.HoodieRollingStatMetadata;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -1088,11 +1087,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   }
 
   /**
-   * Test to ensure rolling stats are correctly written to metadata file.
+   * Test to ensure metadata stats are correctly written to metadata file.
    */
   @ParameterizedTest
   @MethodSource("argumentsProvider")
-  public void testRollingStatsInMetadata(HoodieFileFormat baseFileFormat) throws Exception {
+  public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat) throws Exception {
     init(baseFileFormat);
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
@@ -1100,7 +1099,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       metaClient = getHoodieMetaClient(hadoopConf, basePath);
       HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
 
-      // Create a commit without rolling stats in metadata to test backwards compatibility
+      // Create a commit without metadata stats in metadata to test backwards compatibility
       HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
       String commitActionType = table.getMetaClient().getCommitActionType();
       HoodieInstant instant = new HoodieInstant(State.REQUESTED, commitActionType, "000");
@@ -1123,14 +1122,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
       HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
           table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
           HoodieCommitMetadata.class);
-      HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(
-          metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
-          HoodieRollingStatMetadata.class);
       int inserts = 0;
-      for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
-          .entrySet()) {
-        for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
-          inserts += stat.getValue().getInserts();
+      for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat stat : pstat.getValue()) {
+          inserts += stat.getNumInserts();
         }
       }
       assertEquals(200, inserts);
@@ -1148,20 +1143,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
           HoodieCommitMetadata.class);
-      rollingStatMetadata = HoodieCommitMetadata.fromBytes(
-          metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
-          HoodieRollingStatMetadata.class);
+      
       inserts = 0;
       int upserts = 0;
-      for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
-          .entrySet()) {
-        for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
-          inserts += stat.getValue().getInserts();
-          upserts += stat.getValue().getUpserts();
+      for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat stat : pstat.getValue()) {
+          inserts += stat.getNumInserts();
+          upserts += stat.getNumUpdateWrites();
         }
       }
 
-      assertEquals(200, inserts);
+      assertEquals(0, inserts);
       assertEquals(200, upserts);
 
       client.rollback(instantTime);
@@ -1172,16 +1164,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
           HoodieCommitMetadata.class);
-      rollingStatMetadata = HoodieCommitMetadata.fromBytes(
-          metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
-          HoodieRollingStatMetadata.class);
       inserts = 0;
       upserts = 0;
-      for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
-          .entrySet()) {
-        for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
-          inserts += stat.getValue().getInserts();
-          upserts += stat.getValue().getUpserts();
+      for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat stat : pstat.getValue()) {
+          inserts += stat.getNumInserts();
+          upserts += stat.getNumUpdateWrites();
         }
       }
       assertEquals(200, inserts);
@@ -1190,11 +1178,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
   }
 
   /**
-   * Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
+   * Test to ensure metadata stats are correctly written to the metadata file, identifies small files and corrects them.
    */
   @ParameterizedTest
   @MethodSource("argumentsProvider")
-  public void testRollingStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
+  public void testMetadataStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
     init(baseFileFormat);
 
     HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
@@ -1217,16 +1205,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
           HoodieCommitMetadata.class);
-      HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(
-          metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
-          HoodieRollingStatMetadata.class);
       int inserts = 0;
-      for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
-          .entrySet()) {
-        for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
-          inserts += stat.getValue().getInserts();
-          fileIdToInsertsMap.put(stat.getKey(), stat.getValue().getInserts());
-          fileIdToUpsertsMap.put(stat.getKey(), stat.getValue().getUpserts());
+      for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat stat : pstat.getValue()) {
+          inserts += stat.getNumInserts();
+          fileIdToInsertsMap.put(stat.getFileId(), stat.getNumInserts());
+          fileIdToUpsertsMap.put(stat.getFileId(), stat.getNumUpdateWrites());
         }
       }
       assertEquals(200, inserts);
@@ -1246,23 +1230,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
           HoodieCommitMetadata.class);
-      rollingStatMetadata = HoodieCommitMetadata.fromBytes(
-          metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
-          HoodieRollingStatMetadata.class);
       inserts = 0;
       int upserts = 0;
-      for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
-          .entrySet()) {
-        for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
-          // No new file id should be created, all the data should be written to small files already there
-          assertTrue(fileIdToInsertsMap.containsKey(stat.getKey()));
-          assertTrue(fileIdToUpsertsMap.containsKey(stat.getKey()));
-          inserts += stat.getValue().getInserts();
-          upserts += stat.getValue().getUpserts();
+      for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat stat : pstat.getValue()) {
+          assertTrue(fileIdToInsertsMap.containsKey(stat.getFileId()));
+          assertTrue(fileIdToUpsertsMap.containsKey(stat.getFileId()));
+          inserts += stat.getNumInserts();
+          upserts += stat.getNumUpdateWrites();
         }
       }
 
-      assertEquals(400, inserts);
+      assertEquals(200, inserts);
       assertEquals(200, upserts);
 
       // Test small file handling after compaction
@@ -1273,20 +1252,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
 
       // Read from commit file
       table = HoodieTable.create(cfg, hadoopConf);
-      metadata = HoodieCommitMetadata.fromBytes(
+      HoodieCommitMetadata metadata1 = HoodieCommitMetadata.fromBytes(
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(),
           HoodieCommitMetadata.class);
-      HoodieRollingStatMetadata rollingStatMetadata1 = HoodieCommitMetadata.fromBytes(
-          metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
-          HoodieRollingStatMetadata.class);
-
-      // Ensure that the rolling stats from the extra metadata of delta commits is copied over to the compaction commit
-      for (Map.Entry<String, Map<String, HoodieRollingStat>> entry : rollingStatMetadata.getPartitionToRollingStats()
-          .entrySet()) {
-        assertTrue(rollingStatMetadata1.getPartitionToRollingStats().containsKey(entry.getKey()));
-        assertEquals(rollingStatMetadata1.getPartitionToRollingStats().get(entry.getKey()).size(),
-            entry.getValue().size());
+
+      // Ensure that the metadata stats from the extra metadata of delta commits is copied over to the compaction commit
+      for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+        assertTrue(metadata1.getPartitionToWriteStats().containsKey(pstat.getKey()));
+        assertEquals(metadata1.getPartitionToWriteStats().get(pstat.getKey()).size(),
+            pstat.getValue().size());
       }
 
       // Write inserts + updates
@@ -1305,23 +1280,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
           table.getActiveTimeline()
               .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
           HoodieCommitMetadata.class);
-      rollingStatMetadata = HoodieCommitMetadata.fromBytes(
-          metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
-          HoodieRollingStatMetadata.class);
       inserts = 0;
       upserts = 0;
-      for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
-          .entrySet()) {
-        for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
-          // No new file id should be created, all the data should be written to small files already there
-          assertTrue(fileIdToInsertsMap.containsKey(stat.getKey()));
-          inserts += stat.getValue().getInserts();
-          upserts += stat.getValue().getUpserts();
+      for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
+        for (HoodieWriteStat stat : pstat.getValue()) {
+          assertTrue(fileIdToInsertsMap.containsKey(stat.getFileId()));
+          inserts += stat.getNumInserts();
+          upserts += stat.getNumUpdateWrites();
         }
       }
 
-      assertEquals(600, inserts);
-      assertEquals(600, upserts);
+      assertEquals(200, inserts);
+      assertEquals(400, upserts);
     }
   }