You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/05/21 21:49:56 UTC

[incubator-hudi] branch master updated: Spark Stage retry handling

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 145034c  Spark Stage retry handling
145034c is described below

commit 145034c5faebefe51b6f1557f156bc19c787d671
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Fri Mar 8 15:05:33 2019 -0800

    Spark Stage retry handling
---
 .../com/uber/hoodie/CompactionAdminClient.java     |   7 +-
 .../java/com/uber/hoodie/HoodieWriteClient.java    |  57 ++++--
 .../com/uber/hoodie/config/HoodieWriteConfig.java  |  93 +++++----
 .../uber/hoodie/func/BulkInsertMapFunction.java    |  10 +-
 .../hoodie/func/CopyOnWriteLazyInsertIterable.java |  32 ++-
 .../hoodie/func/MergeOnReadLazyInsertIterable.java |   8 +-
 .../java/com/uber/hoodie/io/ConsistencyCheck.java  | 112 -----------
 .../com/uber/hoodie/io/HoodieAppendHandle.java     |  17 +-
 .../com/uber/hoodie/io/HoodieCreateHandle.java     |  26 +--
 .../java/com/uber/hoodie/io/HoodieIOHandle.java    |  85 +++++---
 .../java/com/uber/hoodie/io/HoodieMergeHandle.java |  41 ++--
 .../io/compact/HoodieRealtimeTableCompactor.java   |   2 +-
 .../hoodie/io/storage/HoodieParquetWriter.java     |   1 +
 .../uber/hoodie/table/HoodieCopyOnWriteTable.java  | 125 +++---------
 .../uber/hoodie/table/HoodieMergeOnReadTable.java  |  11 +-
 .../java/com/uber/hoodie/table/HoodieTable.java    | 148 ++++++++++++--
 .../src/test/java/com/uber/hoodie/TestCleaner.java |  34 ++--
 .../java/com/uber/hoodie/TestConsistencyGuard.java | 108 +++++++++++
 .../TestHoodieClientOnCopyOnWriteStorage.java      | 100 +++++-----
 .../java/com/uber/hoodie/TestHoodieReadClient.java |   3 +-
 .../uber/hoodie/common/HoodieClientTestUtils.java  |   4 +-
 .../uber/hoodie/func/TestUpdateMapFunction.java    | 115 +++++++----
 .../com/uber/hoodie/io/TestConsistencyCheck.java   |  91 ---------
 .../uber/hoodie/io/TestHoodieCommitArchiveLog.java |   9 +-
 .../uber/hoodie/table/TestCopyOnWriteTable.java    | 125 ++++++------
 .../io/storage/HoodieWrapperFileSystem.java        |  67 +++++--
 .../io/storage/SizeAwareFSDataOutputStream.java    |  21 +-
 .../com/uber/hoodie/common/model/FileSlice.java    |   6 +-
 .../uber/hoodie/common/model/HoodieLogFile.java    |  48 ++++-
 .../uber/hoodie/common/model/HoodieWriteStat.java  |   5 +-
 .../hoodie/common/table/HoodieTableMetaClient.java |  17 ++
 .../hoodie/common/table/log/HoodieLogFormat.java   |  50 ++++-
 .../common/table/log/HoodieLogFormatWriter.java    |  25 ++-
 .../uber/hoodie/common/util/ConsistencyGuard.java  |  89 +++++++++
 .../java/com/uber/hoodie/common/util/FSUtils.java  | 182 ++++++++++++-----
 .../common/util/FailSafeConsistencyGuard.java      | 200 +++++++++++++++++++
 .../hoodie/common/util/NoOpConsistencyGuard.java   |  46 +++++
 .../com/uber/hoodie/common/util/ParquetUtils.java  |   2 -
 .../uber/hoodie/common/model/HoodieTestUtils.java  |  55 ++++--
 .../hoodie/common/model/TestHoodieWriteStat.java   |  15 +-
 .../common/table/log/HoodieLogFormatTest.java      |  65 +++++++
 .../table/view/HoodieTableFileSystemViewTest.java  | 216 +++++++++++----------
 .../table/view/IncrementalFSViewSyncTest.java      |   6 +-
 .../hoodie/common/util/CompactionTestUtils.java    |   4 +-
 .../hoodie/common/util/TestCompactionUtils.java    |  18 +-
 .../com/uber/hoodie/common/util/TestFSUtils.java   |  69 +++++--
 .../hadoop/realtime/HoodieRealtimeFileSplit.java   |  10 +
 .../uber/hoodie/hadoop/InputFormatTestUtil.java    |  10 +-
 .../realtime/HoodieRealtimeRecordReaderTest.java   |   6 +-
 .../test/java/com/uber/hoodie/hive/TestUtil.java   |   3 +-
 hoodie-timeline-service/pom.xml                    |   2 +-
 .../utilities/schema/RowBasedSchemaProvider.java   |   2 +-
 .../hoodie/utilities/TestHoodieSnapshotCopier.java |  20 +-
 53 files changed, 1660 insertions(+), 963 deletions(-)

diff --git a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
index 547a0a4..0333328 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
@@ -29,6 +29,7 @@ import com.uber.hoodie.common.model.HoodieFileGroupId;
 import com.uber.hoodie.common.model.HoodieLogFile;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
+import com.uber.hoodie.common.table.log.HoodieLogFormat;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
 import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
 import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
@@ -245,7 +246,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
           "Expect new log version to be sane");
       HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(),
           FSUtils.makeLogFileName(lf.getFileId(), "." + FSUtils.getFileExtensionFromLog(lf.getPath()),
-              compactionInstant, lf.getLogVersion() - maxVersion)));
+              compactionInstant, lf.getLogVersion() - maxVersion, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
       return Pair.of(lf, newLogFile);
     }).collect(Collectors.toList());
   }
@@ -436,7 +437,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
             .filter(fs -> fs.getFileId().equals(operation.getFileId())).findFirst().get();
     List<HoodieLogFile> logFilesToRepair =
             merged.getLogFiles().filter(lf -> lf.getBaseCommitTime().equals(compactionInstant))
-                    .sorted(HoodieLogFile.getBaseInstantAndLogVersionComparator().reversed())
+                    .sorted(HoodieLogFile.getLogFileComparator())
                     .collect(Collectors.toList());
     FileSlice fileSliceForCompaction =
         fileSystemView.getLatestFileSlicesBeforeOrOn(operation.getPartitionPath(), operation.getBaseInstantTime())
@@ -451,7 +452,7 @@ public class CompactionAdminClient extends AbstractHoodieClient {
     for (HoodieLogFile toRepair : logFilesToRepair) {
       int version = maxUsedVersion + 1;
       HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, FSUtils.makeLogFileName(operation.getFileId(),
-          logExtn, operation.getBaseInstantTime(), version)));
+          logExtn, operation.getBaseInstantTime(), version, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
       result.add(Pair.of(toRepair, newLf));
       maxUsedVersion = version;
     }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index 92bac7a..5a918e4 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -72,6 +72,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -333,9 +334,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       String commitTime, HoodieTable<T> table,
       Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     final JavaRDD<HoodieRecord<T>> repartitionedRecords;
+    final int parallelism = config.getBulkInsertShuffleParallelism();
     if (bulkInsertPartitioner.isDefined()) {
       repartitionedRecords = bulkInsertPartitioner.get()
-          .repartitionRecords(dedupedRecords, config.getBulkInsertShuffleParallelism());
+          .repartitionRecords(dedupedRecords, parallelism);
     } else {
       // Now, sort the records and line them up nicely for loading.
       repartitionedRecords = dedupedRecords.sortBy(record -> {
@@ -343,10 +345,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
         // the records split evenly across RDD partitions, such that small partitions fit
         // into 1 RDD partition, while big ones spread evenly across multiple RDD partitions
         return String.format("%s+%s", record.getPartitionPath(), record.getRecordKey());
-      }, true, config.getBulkInsertShuffleParallelism());
+      }, true, parallelism);
     }
+
+    //generate new file ID prefixes for each output partition
+    final List<String> fileIDPrefixes = IntStream.range(0, parallelism)
+        .mapToObj(i -> FSUtils.createNewFileIdPfx())
+        .collect(Collectors.toList());
+
     JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
-        .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table), true)
+        .mapPartitionsWithIndex(new BulkInsertMapFunction<T>(commitTime, config, table, fileIDPrefixes), true)
         .flatMap(writeStatuses -> writeStatuses.iterator());
 
     return updateIndexAndCommitIfNeeded(writeStatusRDD, table, commitTime);
@@ -498,20 +506,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     updateMetadataAndRollingStats(actionType, metadata, stats);
 
     // Finalize write
-    final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
-    try {
-      table.finalizeWrite(jsc, stats);
-      if (finalizeCtx != null) {
-        Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
-        durationInMs.ifPresent(duration -> {
-          logger.info("Finalize write elapsed time (milliseconds): " + duration);
-          metrics.updateFinalizeWriteMetrics(duration, stats.size());
-        });
-      }
-    } catch (HoodieIOException ioe) {
-      throw new HoodieCommitException(
-          "Failed to complete commit " + commitTime + " due to finalize errors.", ioe);
-    }
+    finalizeWrite(table, commitTime, stats);
 
     // add in extra metadata
     if (extraMetadata.isPresent()) {
@@ -1270,7 +1265,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       String compactionCommitTime, boolean autoCommit, Optional<Map<String, String>> extraMetadata) {
     if (autoCommit) {
       HoodieCommitMetadata metadata =
-          doCompactionCommit(compactedStatuses, table.getMetaClient(), compactionCommitTime, extraMetadata);
+          doCompactionCommit(table, compactedStatuses, compactionCommitTime, extraMetadata);
       if (compactionTimer != null) {
         long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
         try {
@@ -1288,6 +1283,23 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     }
   }
 
+  private void finalizeWrite(HoodieTable<T> table, String instantTime, List<HoodieWriteStat> stats) {
+    try {
+      final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+      table.finalizeWrite(jsc, instantTime, stats);
+      if (finalizeCtx != null) {
+        Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
+        durationInMs.ifPresent(duration -> {
+          logger.info("Finalize write elapsed time (milliseconds): " + duration);
+          metrics.updateFinalizeWriteMetrics(duration, stats.size());
+        });
+      }
+    } catch (HoodieIOException ioe) {
+      throw new HoodieCommitException(
+          "Failed to complete commit " + instantTime + " due to finalize errors.", ioe);
+    }
+  }
+
   /**
    * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file
    *
@@ -1301,8 +1313,9 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     table.getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
   }
 
-  private HoodieCommitMetadata doCompactionCommit(JavaRDD<WriteStatus> writeStatuses,
-      HoodieTableMetaClient metaClient, String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
+  private HoodieCommitMetadata doCompactionCommit(HoodieTable<T> table, JavaRDD<WriteStatus> writeStatuses,
+      String compactionCommitTime, Optional<Map<String, String>> extraMetadata) {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
     List<HoodieWriteStat> updateStatusMap = writeStatuses.map(WriteStatus::getStat)
         .collect();
 
@@ -1311,6 +1324,10 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       metadata.addWriteStat(stat.getPartitionPath(), stat);
     }
 
+    // Finalize write
+    List<HoodieWriteStat> stats = writeStatuses.map(WriteStatus::getStat).collect();
+    finalizeWrite(table, compactionCommitTime, stats);
+
     // Copy extraMetadata
     extraMetadata.ifPresent(m -> {
       m.entrySet().stream().forEach(e -> {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index e05a435..3c69fa9 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -62,19 +62,26 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String DEFAULT_ASSUME_DATE_PARTITIONING = "false";
   private static final String HOODIE_WRITE_STATUS_CLASS_PROP = "hoodie.writestatus.class";
   private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = WriteStatus.class.getName();
-  private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE =
-      "hoodie.copyonwrite.use" + ".temp.folder.for.create";
-  private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE = "false";
-  private static final String HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE =
-      "hoodie.copyonwrite.use" + ".temp.folder.for.merge";
-  private static final String DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE = "false";
   private static final String FINALIZE_WRITE_PARALLELISM = "hoodie.finalize.write.parallelism";
   private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = DEFAULT_PARALLELISM;
-  private static final String CONSISTENCY_CHECK_ENABLED = "hoodie.consistency.check.enabled";
+  private static final String CONSISTENCY_CHECK_ENABLED_PROP = "hoodie.consistency.check.enabled";
   private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
   private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = "hoodie.embed.timeline.server";
   private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "false";
 
+  // time between successive attempts to ensure written data's metadata is consistent on storage
+  private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
+      "hoodie.consistency.check.initial_interval_ms";
+  private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
+
+  // max interval time
+  private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = "hoodie.consistency.check.max_interval_ms";
+  private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
+
+  // maximum number of checks, for consistency of written data. Will wait upto 256 Secs
+  private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
+  private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+
   // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
   // We keep track of original config and rewritten config
   private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
@@ -148,29 +155,28 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
   }
 
-  public boolean shouldUseTempFolderForCopyOnWriteForCreate() {
-    return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE));
+  public int getFinalizeWriteParallelism() {
+    return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
   }
 
-  public boolean shouldUseTempFolderForCopyOnWriteForMerge() {
-    return Boolean.parseBoolean(props.getProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE));
+  public boolean isConsistencyCheckEnabled() {
+    return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
   }
 
-  public boolean shouldUseTempFolderForCopyOnWrite() {
-    return shouldUseTempFolderForCopyOnWriteForCreate()
-        || shouldUseTempFolderForCopyOnWriteForMerge();
+  public boolean isEmbeddedTimelineServerEnabled() {
+    return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
   }
 
-  public int getFinalizeWriteParallelism() {
-    return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
+  public int getMaxConsistencyChecks() {
+    return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECKS_PROP));
   }
 
-  public boolean isConsistencyCheckEnabled() {
-    return Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED));
+  public int getInitialConsistencyCheckIntervalMs() {
+    return Integer.parseInt(props.getProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
-  public boolean isEmbeddedTimelineServerEnabled() {
-    return Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
+  public int getMaxConsistencyCheckIntervalMs() {
+    return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
   }
 
   /**
@@ -588,20 +594,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
-    public Builder withUseTempFolderCopyOnWriteForCreate(
-        boolean shouldUseTempFolderCopyOnWriteForCreate) {
-      props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
-          String.valueOf(shouldUseTempFolderCopyOnWriteForCreate));
-      return this;
-    }
-
-    public Builder withUseTempFolderCopyOnWriteForMerge(
-        boolean shouldUseTempFolderCopyOnWriteForMerge) {
-      props.setProperty(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
-          String.valueOf(shouldUseTempFolderCopyOnWriteForMerge));
-      return this;
-    }
-
     public Builder withFileSystemViewConfig(FileSystemViewStorageConfig viewStorageConfig) {
       props.putAll(viewStorageConfig.getProps());
       isViewConfigSet = true;
@@ -614,7 +606,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     }
 
     public Builder withConsistencyCheckEnabled(boolean enabled) {
-      props.setProperty(CONSISTENCY_CHECK_ENABLED, String.valueOf(enabled));
+      props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, String.valueOf(enabled));
       return this;
     }
 
@@ -623,6 +615,21 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) {
+      props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(initialIntevalMs));
+      return this;
+    }
+
+    public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
+      props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(maxIntervalMs));
+      return this;
+    }
+
+    public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
+      props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(maxConsistencyChecks));
+      return this;
+    }
+
     public HoodieWriteConfig build() {
       // Check for mandatory properties
       setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM,
@@ -643,18 +650,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
       setDefaultOnCondition(props, !props.containsKey(HOODIE_WRITE_STATUS_CLASS_PROP),
           HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
-      setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE),
-          HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE,
-          DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_CREATE);
-      setDefaultOnCondition(props, !props.containsKey(HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE),
-          HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE,
-          DEFAULT_HOODIE_COPYONWRITE_USE_TEMP_FOLDER_MERGE);
       setDefaultOnCondition(props, !props.containsKey(FINALIZE_WRITE_PARALLELISM),
           FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
-      setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED),
-          CONSISTENCY_CHECK_ENABLED, DEFAULT_CONSISTENCY_CHECK_ENABLED);
+      setDefaultOnCondition(props, !props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
+          CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
       setDefaultOnCondition(props, !props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
           EMBEDDED_TIMELINE_SERVER_ENABLED, DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
+      setDefaultOnCondition(props, !props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
+          INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS));
+      setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
+          MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS));
+      setDefaultOnCondition(props, !props.containsKey(MAX_CONSISTENCY_CHECKS_PROP),
+          MAX_CONSISTENCY_CHECKS_PROP, String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
 
       // Make sure the props is propagated
       setDefaultOnCondition(props, !isIndexConfigSet,
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java
index e530e86..0f716f2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/func/BulkInsertMapFunction.java
@@ -35,17 +35,19 @@ public class BulkInsertMapFunction<T extends HoodieRecordPayload> implements
   private String commitTime;
   private HoodieWriteConfig config;
   private HoodieTable<T> hoodieTable;
+  private List<String> fileIDPrefixes;
 
   public BulkInsertMapFunction(String commitTime, HoodieWriteConfig config,
-      HoodieTable<T> hoodieTable) {
+      HoodieTable<T> hoodieTable, List<String> fileIDPrefixes) {
     this.commitTime = commitTime;
     this.config = config;
     this.hoodieTable = hoodieTable;
+    this.fileIDPrefixes = fileIDPrefixes;
   }
 
   @Override
-  public Iterator<List<WriteStatus>> call(Integer partition,
-      Iterator<HoodieRecord<T>> sortedRecordItr) throws Exception {
-    return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable);
+  public Iterator<List<WriteStatus>> call(Integer partition, Iterator<HoodieRecord<T>> sortedRecordItr) {
+    return new CopyOnWriteLazyInsertIterable<>(sortedRecordItr, config, commitTime, hoodieTable,
+        fileIDPrefixes.get(partition));
   }
 }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
index 30b353b..72dd0bb 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
@@ -27,16 +27,12 @@ import com.uber.hoodie.io.HoodieCreateHandle;
 import com.uber.hoodie.io.HoodieIOHandle;
 import com.uber.hoodie.table.HoodieTable;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
 import java.util.function.Function;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.spark.TaskContext;
 
 /**
  * Lazy Iterable, that writes a stream of HoodieRecords sorted by the partitionPath, into new
@@ -48,15 +44,17 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
   protected final HoodieWriteConfig hoodieConfig;
   protected final String commitTime;
   protected final HoodieTable<T> hoodieTable;
-  protected Set<String> partitionsCleaned;
+  protected final String idPrefix;
+  protected int numFilesWritten;
 
   public CopyOnWriteLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
-      String commitTime, HoodieTable<T> hoodieTable) {
+      String commitTime, HoodieTable<T> hoodieTable, String idPrefix) {
     super(sortedRecordItr);
-    this.partitionsCleaned = new HashSet<>();
     this.hoodieConfig = config;
     this.commitTime = commitTime;
     this.hoodieTable = hoodieTable;
+    this.idPrefix = idPrefix;
+    this.numFilesWritten = 0;
   }
 
   // Used for caching HoodieRecord along with insertValue. We need this to offload computation work to buffering thread.
@@ -113,7 +111,10 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
 
   @Override
   protected void end() {
+  }
 
+  protected String getNextFileId(String idPfx) {
+    return String.format("%s-%d", idPfx, numFilesWritten++);
   }
 
   protected CopyOnWriteInsertHandler getInsertHandler() {
@@ -133,20 +134,11 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
     @Override
     protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) {
       final HoodieRecord insertPayload = payload.record;
-      // clean up any partial failures
-      if (!partitionsCleaned.contains(insertPayload.getPartitionPath())) {
-        // This insert task could fail multiple times, but Spark will faithfully retry with
-        // the same data again. Thus, before we open any files under a given partition, we
-        // first delete any files in the same partitionPath written by same Spark partition
-        HoodieIOHandle.cleanupTmpFilesFromCurrentCommit(hoodieConfig, commitTime, insertPayload.getPartitionPath(),
-            TaskContext.getPartitionId(), hoodieTable);
-        partitionsCleaned.add(insertPayload.getPartitionPath());
-      }
 
       // lazily initialize the handle, for the first time
       if (handle == null) {
-        handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
-            .randomUUID().toString());
+        handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(),
+            getNextFileId(idPrefix));
       }
 
       if (handle.canWrite(payload.record)) {
@@ -156,8 +148,8 @@ public class CopyOnWriteLazyInsertIterable<T extends HoodieRecordPayload> extend
         // handle is full.
         statuses.add(handle.close());
         // Need to handle the rejected payload & open new handle
-        handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(), UUID
-            .randomUUID().toString());
+        handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, insertPayload.getPartitionPath(),
+            getNextFileId(idPrefix));
         handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
       }
     }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java b/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java
index b4eea0f..1da4529 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/func/MergeOnReadLazyInsertIterable.java
@@ -34,8 +34,8 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
     CopyOnWriteLazyInsertIterable<T> {
 
   public MergeOnReadLazyInsertIterable(Iterator<HoodieRecord<T>> sortedRecordItr, HoodieWriteConfig config,
-      String commitTime, HoodieTable<T> hoodieTable) {
-    super(sortedRecordItr, config, commitTime, hoodieTable);
+      String commitTime, HoodieTable<T> hoodieTable, String idPfx) {
+    super(sortedRecordItr, config, commitTime, hoodieTable, idPfx);
   }
 
   @Override
@@ -51,7 +51,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
       List<WriteStatus> statuses = new ArrayList<>();
       // lazily initialize the handle, for the first time
       if (handle == null) {
-        handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
+        handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
       }
       if (handle.canWrite(insertPayload)) {
         // write the payload, if the handle has capacity
@@ -61,7 +61,7 @@ public class MergeOnReadLazyInsertIterable<T extends HoodieRecordPayload> extend
         handle.close();
         statuses.add(handle.getWriteStatus());
         // Need to handle the rejected payload & open new handle
-        handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable);
+        handle = new HoodieAppendHandle(hoodieConfig, commitTime, hoodieTable, getNextFileId(idPrefix));
         handle.write(insertPayload, payload.insertValue, payload.exception); // we should be able to write 1 payload.
       }
     }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java b/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java
deleted file mode 100644
index 0171e07..0000000
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/ConsistencyCheck.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.uber.hoodie.io;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.uber.hoodie.common.SerializableConfiguration;
-import com.uber.hoodie.common.util.FSUtils;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaSparkContext;
-
-/**
- * Checks if all the written paths have their metadata consistent on storage and thus be listable to
- * queries. This is important for cloud, stores like AWS S3 which are eventually consistent with
- * their metadata. Without such checks, we may proceed to commit the written data, without the
- * written data being made available to queries. In cases like incremental pull this can lead to
- * downstream readers failing to ever see some data.
- */
-public class ConsistencyCheck implements Serializable {
-
-  private static final transient Logger log = LogManager.getLogger(ConsistencyCheck.class);
-
-  private String basePath;
-
-  private List<String> relPaths;
-
-  private transient JavaSparkContext jsc;
-
-  private SerializableConfiguration hadoopConf;
-
-  private int parallelism;
-
-  public ConsistencyCheck(String basePath, List<String> relPaths, JavaSparkContext jsc,
-      int parallelism) {
-    this.basePath = basePath;
-    this.relPaths = relPaths;
-    this.jsc = jsc;
-    this.hadoopConf = new SerializableConfiguration(jsc.hadoopConfiguration());
-    this.parallelism = parallelism;
-  }
-
-  @VisibleForTesting
-  void sleepSafe(long waitMs) {
-    try {
-      Thread.sleep(waitMs);
-    } catch (InterruptedException e) {
-      // ignore & continue next attempt
-    }
-  }
-
-  /**
-   * Repeatedly lists the filesystem on the paths, with exponential backoff and marks paths found as
-   * passing the check.
-   *
-   * @return list of (relative) paths failing the check
-   */
-  public List<String> check(int maxAttempts, long initalDelayMs) {
-    long waitMs = initalDelayMs;
-    int attempt = 0;
-
-    List<String> remainingPaths = new ArrayList<>(relPaths);
-    while (attempt++ < maxAttempts) {
-      remainingPaths = jsc.parallelize(remainingPaths, parallelism)
-          .groupBy(p -> new Path(basePath, p).getParent()) // list by partition
-          .map(pair -> {
-            FileSystem fs = FSUtils.getFs(basePath, hadoopConf.get());
-            // list the partition path and obtain all file paths present
-            Set<String> fileNames = Arrays.stream(fs.listStatus(pair._1()))
-                .map(s -> s.getPath().getName())
-                .collect(Collectors.toSet());
-
-            // only return paths that can't be found
-            return StreamSupport.stream(pair._2().spliterator(), false)
-                .filter(p -> !fileNames.contains(new Path(basePath, p).getName()))
-                .collect(Collectors.toList());
-          })
-          .flatMap(List::iterator).collect();
-      if (remainingPaths.size() == 0) {
-        break; // we are done.
-      }
-
-      log.info("Consistency check, waiting for " + waitMs + " ms , after attempt :" + attempt);
-      sleepSafe(waitMs);
-      waitMs = waitMs * 2; // double check interval every attempt
-    }
-
-    return remainingPaths;
-  }
-}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index 1f5e95f..c33e5ee 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -32,6 +32,7 @@ import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
 import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock;
 import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock;
 import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
+import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.HoodieAvroUtils;
 import com.uber.hoodie.common.util.Option;
 import com.uber.hoodie.config.HoodieWriteConfig;
@@ -40,12 +41,10 @@ import com.uber.hoodie.exception.HoodieUpsertException;
 import com.uber.hoodie.table.HoodieTable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
@@ -96,14 +95,14 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
 
   public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
       String fileId, Iterator<HoodieRecord<T>> recordItr) {
-    super(config, commitTime, hoodieTable);
+    super(config, commitTime, fileId, hoodieTable);
     writeStatus.setStat(new HoodieDeltaWriteStat());
     this.fileId = fileId;
     this.recordItr = recordItr;
   }
 
-  public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
-    this(config, commitTime, hoodieTable, UUID.randomUUID().toString(), null);
+  public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable, String fileId) {
+    this(config, commitTime, hoodieTable, fileId, null);
   }
 
   private void init(HoodieRecord record) {
@@ -270,12 +269,16 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
 
   private Writer createLogWriter(Option<FileSlice> fileSlice, String baseCommitTime)
       throws IOException, InterruptedException {
+    Optional<HoodieLogFile> latestLogFile = fileSlice.get().getLatestLogFile();
+
     return HoodieLogFormat.newWriterBuilder()
         .onParentPath(new Path(hoodieTable.getMetaClient().getBasePath(), partitionPath))
         .withFileId(fileId).overBaseCommit(baseCommitTime).withLogVersion(
-            fileSlice.get().getLogFiles().map(logFile -> logFile.getLogVersion())
-                .max(Comparator.naturalOrder()).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
+            latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
         .withSizeThreshold(config.getLogFileMaxSize()).withFs(fs)
+        .withLogWriteToken(
+            latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
+        .withRolloverLogWriteToken(writeToken)
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
   }
 
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
index 734b7c0..f090291 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
@@ -45,7 +45,6 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
 
   private final HoodieStorageWriter<IndexedRecord> storageWriter;
   private final Path path;
-  private Path tempPath = null;
   private long recordsWritten = 0;
   private long insertRecordsWritten = 0;
   private long recordsDeleted = 0;
@@ -54,26 +53,22 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
 
   public HoodieCreateHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
       String partitionPath, String fileId) {
-    super(config, commitTime, hoodieTable);
+    super(config, commitTime, fileId, hoodieTable);
     writeStatus.setFileId(fileId);
     writeStatus.setPartitionPath(partitionPath);
 
-    final int sparkPartitionId = TaskContext.getPartitionId();
-    this.path = makeNewPath(partitionPath, sparkPartitionId, writeStatus.getFileId());
-    if (config.shouldUseTempFolderForCopyOnWriteForCreate()) {
-      this.tempPath = makeTempPath(partitionPath, sparkPartitionId, writeStatus.getFileId(),
-          TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
-    }
+    this.path = makeNewPath(partitionPath);
 
     try {
       HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, commitTime,
           new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
       partitionMetadata.trySave(TaskContext.getPartitionId());
+      createMarkerFile(partitionPath);
       this.storageWriter = HoodieStorageWriterFactory
-          .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
+          .getStorageWriter(commitTime, path, hoodieTable, config, writerSchema);
     } catch (IOException e) {
       throw new HoodieInsertException(
-          "Failed to initialize HoodieStorageWriter for path " + getStorageWriterPath(), e);
+          "Failed to initialize HoodieStorageWriter for path " + path, e);
     }
     logger.info("New InsertHandle for partition :" + partitionPath + " with fileId " + fileId);
   }
@@ -138,7 +133,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
       }
     } catch (IOException io) {
       throw new HoodieInsertException(
-          "Failed to insert records for path " + getStorageWriterPath(), io);
+          "Failed to insert records for path " + path, io);
     }
   }
 
@@ -165,8 +160,8 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
       stat.setNumInserts(insertRecordsWritten);
       stat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);
       stat.setFileId(writeStatus.getFileId());
-      stat.setPaths(new Path(config.getBasePath()), path, tempPath);
-      long fileSizeInBytes = FSUtils.getFileSize(fs, getStorageWriterPath());
+      stat.setPath(new Path(config.getBasePath()), path);
+      long fileSizeInBytes = FSUtils.getFileSize(fs, path);
       stat.setTotalWriteBytes(fileSizeInBytes);
       stat.setFileSizeInBytes(fileSizeInBytes);
       stat.setTotalWriteErrors(writeStatus.getTotalErrorRecords());
@@ -180,9 +175,4 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
       throw new HoodieInsertException("Failed to close the Insert Handle for path " + path, e);
     }
   }
-
-  private Path getStorageWriterPath() {
-    // Use tempPath for storage writer if possible
-    return (this.tempPath == null) ? this.path : this.tempPath;
-  }
 }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
index abcd247..1602d72 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
@@ -17,14 +17,17 @@
 package com.uber.hoodie.io;
 
 import com.uber.hoodie.WriteStatus;
+import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
 import com.uber.hoodie.common.util.HoodieAvroUtils;
 import com.uber.hoodie.common.util.HoodieTimer;
+import com.uber.hoodie.common.util.NoOpConsistencyGuard;
 import com.uber.hoodie.common.util.ReflectionUtils;
 import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.exception.HoodieIOException;
 import com.uber.hoodie.table.HoodieTable;
 import java.io.IOException;
@@ -32,16 +35,19 @@ import java.util.Optional;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.TaskContext;
+
 
 public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
 
   private static Logger logger = LogManager.getLogger(HoodieIOHandle.class);
   protected final String commitTime;
+  protected final String fileId;
+  protected final String writeToken;
   protected final HoodieWriteConfig config;
   protected final FileSystem fs;
   protected final HoodieTable<T> hoodieTable;
@@ -50,10 +56,13 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
   protected HoodieTimer timer;
   protected final WriteStatus writeStatus;
 
-  public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
+  public HoodieIOHandle(HoodieWriteConfig config, String commitTime, String fileId,
+      HoodieTable<T> hoodieTable) {
     this.commitTime = commitTime;
+    this.fileId = fileId;
+    this.writeToken = makeSparkWriteToken();
     this.config = config;
-    this.fs = hoodieTable.getMetaClient().getFs();
+    this.fs = getFileSystem(hoodieTable, config);
     this.hoodieTable = hoodieTable;
     this.originalSchema = new Schema.Parser().parse(config.getSchema());
     this.writerSchema = createHoodieWriteSchema(originalSchema);
@@ -63,33 +72,26 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
         config.getWriteStatusFailureFraction());
   }
 
+  private static FileSystem getFileSystem(HoodieTable hoodieTable, HoodieWriteConfig config) {
+    return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), config.isConsistencyCheckEnabled()
+        ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
+        config.getMaxConsistencyChecks(), config.getInitialConsistencyCheckIntervalMs(),
+        config.getMaxConsistencyCheckIntervalMs()) : new NoOpConsistencyGuard());
+  }
+
   /**
-   * Deletes any new tmp files written during the current commit, into the partition
+   * Generate a write token based on the currently running spark task and its place in the spark dag.
    */
-  public static void cleanupTmpFilesFromCurrentCommit(HoodieWriteConfig config, String commitTime,
-      String partitionPath, int taskPartitionId, HoodieTable hoodieTable) {
-    FileSystem fs = hoodieTable.getMetaClient().getFs();
-    try {
-      FileStatus[] prevFailedFiles = fs.globStatus(new Path(String
-          .format("%s/%s/%s", config.getBasePath(), partitionPath,
-              FSUtils.maskWithoutFileId(commitTime, taskPartitionId))));
-      if (prevFailedFiles != null) {
-        logger.info(
-            "Deleting " + prevFailedFiles.length + " files generated by previous failed attempts.");
-        for (FileStatus status : prevFailedFiles) {
-          fs.delete(status.getPath(), false);
-        }
-      }
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to cleanup Temp files from commit " + commitTime, e);
-    }
+  private static String makeSparkWriteToken() {
+    return FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(),
+        TaskContext.get().taskAttemptId());
   }
 
   public static Schema createHoodieWriteSchema(Schema originalSchema) {
     return HoodieAvroUtils.addMetadataFields(originalSchema);
   }
 
-  public Path makeNewPath(String partitionPath, int taskPartitionId, String fileName) {
+  public Path makeNewPath(String partitionPath) {
     Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
     try {
       fs.mkdirs(path); // create a new partition as needed.
@@ -97,16 +99,37 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
 
-    return new Path(path.toString(),
-        FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
+    return new Path(path.toString(), FSUtils.makeDataFileName(commitTime, writeToken, fileId));
   }
 
-  public Path makeTempPath(String partitionPath, int taskPartitionId, String fileName, int stageId,
-      long taskAttemptId) {
-    Path path = new Path(config.getBasePath(), HoodieTableMetaClient.TEMPFOLDER_NAME);
-    return new Path(path.toString(),
-        FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId,
-            taskAttemptId));
+  /**
+   * Creates an empty marker file corresponding to storage writer path
+   * @param partitionPath Partition path
+   */
+  protected void createMarkerFile(String partitionPath) {
+    Path markerPath = makeNewMarkerPath(partitionPath);
+    try {
+      logger.info("Creating Marker Path=" + markerPath);
+      fs.create(markerPath, false).close();
+    } catch (IOException e) {
+      throw new HoodieException("Failed to create marker file " + markerPath, e);
+    }
+  }
+
+  /**
+   * THe marker path will be  <base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename
+   * @param partitionPath
+   * @return
+   */
+  private Path makeNewMarkerPath(String partitionPath) {
+    Path markerRootPath = new Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
+    Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
+    try {
+      fs.mkdirs(path); // create a new partition as needed.
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to make dir " + path, e);
+    }
+    return new Path(path.toString(), FSUtils.makeMarkerFile(commitTime, writeToken, fileId));
   }
 
   public Schema getWriterSchema() {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
index ae98eb5..e77c156 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
@@ -57,7 +57,6 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
   private HoodieStorageWriter<IndexedRecord> storageWriter;
   private Path newFilePath;
   private Path oldFilePath;
-  private Path tempPath = null;
   private long recordsWritten = 0;
   private long recordsDeleted = 0;
   private long updatedRecordsWritten = 0;
@@ -66,7 +65,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
 
   public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
       Iterator<HoodieRecord<T>> recordItr, String fileId) {
-    super(config, commitTime, hoodieTable);
+    super(config, commitTime, fileId, hoodieTable);
     String partitionPath = init(fileId, recordItr);
     init(fileId, partitionPath,
         hoodieTable.getROFileSystemView().getLatestDataFile(partitionPath, fileId).get());
@@ -77,7 +76,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
    */
   public HoodieMergeHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
       Map<String, HoodieRecord<T>> keyToNewRecords, String fileId, HoodieDataFile dataFileToBeMerged) {
-    super(config, commitTime, hoodieTable);
+    super(config, commitTime, fileId, hoodieTable);
     this.keyToNewRecords = keyToNewRecords;
     this.useWriterSchema = true;
     init(fileId, keyToNewRecords.get(keyToNewRecords.keySet().stream().findFirst().get())
@@ -101,30 +100,25 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
       oldFilePath = new Path(
           config.getBasePath() + "/" + partitionPath + "/" + latestValidFilePath);
       String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + FSUtils
-          .makeDataFileName(commitTime, TaskContext.getPartitionId(), fileId)).toString();
+          .makeDataFileName(commitTime, writeToken, fileId)).toString();
       newFilePath = new Path(config.getBasePath(), relativePath);
-      if (config.shouldUseTempFolderForCopyOnWriteForMerge()) {
-        this.tempPath = makeTempPath(partitionPath, TaskContext.getPartitionId(), fileId,
-            TaskContext.get().stageId(), TaskContext.get().taskAttemptId());
-      }
-
-      // handle cases of partial failures, for update task
-      if (fs.exists(newFilePath)) {
-        fs.delete(newFilePath, false);
-      }
 
       logger.info(String
           .format("Merging new data into oldPath %s, as newPath %s", oldFilePath.toString(),
-              getStorageWriterPath().toString()));
+              newFilePath.toString()));
       // file name is same for all records, in this bunch
       writeStatus.setFileId(fileId);
       writeStatus.setPartitionPath(partitionPath);
       writeStatus.getStat().setPartitionPath(partitionPath);
       writeStatus.getStat().setFileId(fileId);
-      writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
+      writeStatus.getStat().setPath(new Path(config.getBasePath()), newFilePath);
+
+      // Create Marker file
+      createMarkerFile(partitionPath);
+
       // Create the writer for writing the new version file
       storageWriter = HoodieStorageWriterFactory
-          .getStorageWriter(commitTime, getStorageWriterPath(), hoodieTable, config, writerSchema);
+          .getStorageWriter(commitTime, newFilePath, hoodieTable, config, writerSchema);
     } catch (IOException io) {
       logger.error("Error in update task at commit " + commitTime, io);
       writeStatus.setGlobalError(io);
@@ -231,17 +225,17 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
     if (copyOldRecord) {
       // this should work as it is, since this is an existing record
       String errMsg = "Failed to merge old record into new file for key " + key + " from old file "
-          + getOldFilePath() + " to new file " + getStorageWriterPath();
+          + getOldFilePath() + " to new file " + newFilePath;
       try {
         storageWriter.writeAvro(key, oldRecord);
       } catch (ClassCastException e) {
         logger.error("Schema mismatch when rewriting old record " + oldRecord + " from file "
-            + getOldFilePath() + " to file " + getStorageWriterPath() + " with writerSchema " + writerSchema
+            + getOldFilePath() + " to file " + newFilePath + " with writerSchema " + writerSchema
             .toString(true));
         throw new HoodieUpsertException(errMsg, e);
       } catch (IOException e) {
         logger.error("Failed to merge old record into new file for key " + key + " from old file "
-            + getOldFilePath() + " to new file " + getStorageWriterPath(), e);
+            + getOldFilePath() + " to new file " + newFilePath, e);
         throw new HoodieUpsertException(errMsg, e);
       }
       recordsWritten++;
@@ -270,7 +264,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
         storageWriter.close();
       }
 
-      long fileSizeInBytes = FSUtils.getFileSize(fs, getStorageWriterPath());
+      long fileSizeInBytes = FSUtils.getFileSize(fs, newFilePath);
       writeStatus.getStat().setTotalWriteBytes(fileSizeInBytes);
       writeStatus.getStat().setFileSizeInBytes(fileSizeInBytes);
       writeStatus.getStat().setNumWrites(recordsWritten);
@@ -291,13 +285,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
     return oldFilePath;
   }
 
-  private Path getStorageWriterPath() {
-    // Use tempPath for storage writer if possible
-    return (this.tempPath == null) ? this.newFilePath : this.tempPath;
-  }
-
   @Override
   public WriteStatus getWriteStatus() {
     return writeStatus;
   }
-}
\ No newline at end of file
+}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
index 86aff63..024bd3a 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java
@@ -201,7 +201,7 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
                 .map(
                     s -> {
                       List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile
-                          .getBaseInstantAndLogVersionComparator().reversed()).collect(Collectors.toList());
+                          .getLogFileComparator()).collect(Collectors.toList());
                       totalLogFiles.add((long) logFiles.size());
                       totalFileSlices.add(1L);
                       // Avro generated classes are not inheriting Serializable. Using CompactionOperation POJO
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java
index bc8b491..45327e6 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieParquetWriter.java
@@ -17,6 +17,7 @@
 package com.uber.hoodie.io.storage;
 
 import com.uber.hoodie.avro.HoodieAvroWriteSupport;
+import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
 import com.uber.hoodie.common.util.FSUtils;
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
index e968e93..879e6e2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
@@ -28,8 +28,6 @@ import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordLocation;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
 import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
-import com.uber.hoodie.common.model.HoodieWriteStat;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -52,7 +50,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -234,14 +231,14 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged);
   }
 
-  public Iterator<List<WriteStatus>> handleInsert(String commitTime,
+  public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx,
       Iterator<HoodieRecord<T>> recordItr) throws Exception {
     // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
     if (!recordItr.hasNext()) {
       logger.info("Empty partition");
       return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
     }
-    return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this);
+    return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
   }
 
   public Iterator<List<WriteStatus>> handleInsert(String commitTime, String partitionPath, String fileId,
@@ -261,9 +258,9 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
     BucketType btype = binfo.bucketType;
     try {
       if (btype.equals(BucketType.INSERT)) {
-        return handleInsert(commitTime, recordItr);
+        return handleInsert(commitTime, binfo.fileIdPrefix, recordItr);
       } else if (btype.equals(BucketType.UPDATE)) {
-        return handleUpdate(commitTime, binfo.fileLoc, recordItr);
+        return handleUpdate(commitTime, binfo.fileIdPrefix, recordItr);
       } else {
         throw new HoodieUpsertException(
             "Unknown bucketType " + btype + " for partition :" + partition);
@@ -376,9 +373,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
               .withDeletedFileResults(filesToDeletedStatus).build();
         }).collect();
 
-    // clean temporary data files
-    cleanTemporaryDataFiles(jsc);
-
     // Delete Inflight instant if enabled
     deleteInflightInstant(deleteInstants, activeTimeline,
         new HoodieInstant(true, actionType, commit));
@@ -391,96 +385,25 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
    * @param activeTimeline  Hoodie active timeline
    * @param instantToBeDeleted Instant to be deleted
    */
-  protected static void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
+  protected void deleteInflightInstant(boolean deleteInstant, HoodieActiveTimeline activeTimeline,
       HoodieInstant instantToBeDeleted) {
     // Remove the rolled back inflight commits
     if (deleteInstant) {
-      activeTimeline.deleteInflight(instantToBeDeleted);
-      logger.info("Deleted inflight commit " + instantToBeDeleted);
-    } else {
-      logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
-    }
-  }
-
-  /**
-   * Finalize the written data files
-   *
-   * @param stats List of HoodieWriteStats
-   * @return number of files finalized
-   */
-  @Override
-  @SuppressWarnings("unchecked")
-  public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
-      throws HoodieIOException {
-
-    super.finalizeWrite(jsc, stats);
-
-    if (config.shouldUseTempFolderForCopyOnWrite()) {
-      // This is to rename each data file from temporary path to its final location
-      jsc.parallelize(stats, config.getFinalizeWriteParallelism())
-          .foreach(writeStat -> {
-            final FileSystem fs = getMetaClient().getFs();
-            final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());
-
-            if (writeStat.getTempPath() != null) {
-              final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath());
-              boolean success;
-              try {
-                logger.info("Renaming temporary file: " + tempPath + " to " + finalPath);
-                success = fs.rename(tempPath, finalPath);
-              } catch (IOException e) {
-                throw new HoodieIOException(
-                    "Failed to rename file: " + tempPath + " to " + finalPath);
-              }
-
-              if (!success) {
-                throw new HoodieIOException(
-                    "Failed to rename file: " + tempPath + " to " + finalPath);
-              }
-            }
-          });
-
-      // clean temporary data files
-      cleanTemporaryDataFiles(jsc);
-    }
-  }
-
-  /**
-   * Clean temporary data files that are produced from previous failed commit or retried spark
-   * stages.
-   */
-  private void cleanTemporaryDataFiles(JavaSparkContext jsc) {
-    if (!config.shouldUseTempFolderForCopyOnWrite()) {
-      return;
-    }
-
-    final FileSystem fs = getMetaClient().getFs();
-    final Path temporaryFolder = new Path(config.getBasePath(),
-        HoodieTableMetaClient.TEMPFOLDER_NAME);
-    try {
-      if (!fs.exists(temporaryFolder)) {
-        logger.info("Temporary folder does not exist: " + temporaryFolder);
-        return;
-      }
-      List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder));
-      List<Tuple2<String, Boolean>> results = jsc
-          .parallelize(fileStatusesList, config.getFinalizeWriteParallelism()).map(fileStatus -> {
-            FileSystem fs1 = getMetaClient().getFs();
-            boolean success = fs1.delete(fileStatus.getPath(), false);
-            logger
-                .info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success);
-            return new Tuple2<>(fileStatus.getPath().toString(), success);
-          }).collect();
-
-      for (Tuple2<String, Boolean> result : results) {
-        if (!result._2()) {
-          logger.info("Failed to delete file: " + result._1());
-          throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1());
+      try {
+        //TODO: Cleanup Hoodie 1.0 rollback to simply call super.cleanFailedWrites with consistency check disabled
+        // and empty WriteStat list.
+        Path markerDir = new Path(metaClient.getMarkerFolderPath(instantToBeDeleted.getTimestamp()));
+        logger.info("Removing marker directory=" + markerDir);
+        if (metaClient.getFs().exists(markerDir)) {
+          metaClient.getFs().delete(markerDir, true);
         }
+        activeTimeline.deleteInflight(instantToBeDeleted);
+        logger.info("Deleted inflight commit " + instantToBeDeleted);
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
       }
-    } catch (IOException e) {
-      throw new HoodieIOException(
-          "Failed to clean data files in temporary folder: " + temporaryFolder);
+    } else {
+      logger.warn("Rollback finished without deleting inflight instant file. Instant=" + instantToBeDeleted);
     }
   }
 
@@ -624,13 +547,13 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
   class BucketInfo implements Serializable {
 
     BucketType bucketType;
-    String fileLoc;
+    String fileIdPrefix;
 
     @Override
     public String toString() {
       final StringBuilder sb = new StringBuilder("BucketInfo {");
       sb.append("bucketType=").append(bucketType).append(", ");
-      sb.append("fileLoc=").append(fileLoc);
+      sb.append("fileIdPrefix=").append(fileIdPrefix);
       sb.append('}');
       return sb.toString();
     }
@@ -697,12 +620,12 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       }
     }
 
-    private int addUpdateBucket(String fileLoc) {
+    private int addUpdateBucket(String fileIdHint) {
       int bucket = totalBuckets;
-      updateLocationToBucket.put(fileLoc, bucket);
+      updateLocationToBucket.put(fileIdHint, bucket);
       BucketInfo bucketInfo = new BucketInfo();
       bucketInfo.bucketType = BucketType.UPDATE;
-      bucketInfo.fileLoc = fileLoc;
+      bucketInfo.fileIdPrefix = fileIdHint;
       bucketInfoMap.put(totalBuckets, bucketInfo);
       totalBuckets++;
       return bucket;
@@ -764,6 +687,7 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
               recordsPerBucket.add(totalUnassignedInserts / insertBuckets);
               BucketInfo bucketInfo = new BucketInfo();
               bucketInfo.bucketType = BucketType.INSERT;
+              bucketInfo.fileIdPrefix = FSUtils.createNewFileIdPfx();
               bucketInfoMap.put(totalBuckets, bucketInfo);
               totalBuckets++;
             }
@@ -784,7 +708,6 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
       }
     }
 
-
     /**
      * Returns a list  of small files in the given partition path
      */
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
index 402fc8c..98248b9 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java
@@ -121,13 +121,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  public Iterator<List<WriteStatus>> handleInsert(String commitTime,
+  public Iterator<List<WriteStatus>> handleInsert(String commitTime, String idPfx,
       Iterator<HoodieRecord<T>> recordItr) throws Exception {
     // If canIndexLogFiles, write inserts to log files else write inserts to parquet files
     if (index.canIndexLogFiles()) {
-      return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this);
+      return new MergeOnReadLazyInsertIterable<>(recordItr, config, commitTime, this, idPfx);
     } else {
-      return super.handleInsert(commitTime, recordItr);
+      return super.handleInsert(commitTime, idPfx, recordItr);
     }
   }
 
@@ -325,10 +325,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
   }
 
   @Override
-  public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
+  public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
       throws HoodieIOException {
     // delegate to base class for MOR tables
-    super.finalizeWrite(jsc, stats);
+    super.finalizeWrite(jsc, instantTs, stats);
   }
 
   @Override
@@ -362,6 +362,7 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
       super(profile);
     }
 
+    @Override
     protected List<SmallFile> getSmallFiles(String partitionPath) {
 
       // smallFiles only for partitionPath
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
index 6f7dd30..bff61c1 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
@@ -34,19 +34,30 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
 import com.uber.hoodie.common.table.view.FileSystemViewManager;
 import com.uber.hoodie.common.table.view.HoodieTableFileSystemView;
 import com.uber.hoodie.common.util.AvroUtils;
+import com.uber.hoodie.common.util.ConsistencyGuard;
+import com.uber.hoodie.common.util.ConsistencyGuard.FileVisibility;
+import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.exception.HoodieIOException;
 import com.uber.hoodie.exception.HoodieSavepointException;
 import com.uber.hoodie.index.HoodieIndex;
-import com.uber.hoodie.io.ConsistencyCheck;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -56,10 +67,7 @@ import org.apache.spark.api.java.JavaSparkContext;
  */
 public abstract class HoodieTable<T extends HoodieRecordPayload> implements Serializable {
 
-  // time between successive attempts to ensure written data's metadata is consistent on storage
-  private static long INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
-  // maximum number of checks, for consistency of written data. Will wait upto 256 Secs
-  private static int MAX_CONSISTENCY_CHECKS = 7;
+  private static Logger logger = LogManager.getLogger(HoodieTable.class);
 
   protected final HoodieWriteConfig config;
   protected final HoodieTableMetaClient metaClient;
@@ -279,20 +287,126 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
    * @param stats List of HoodieWriteStats
    * @throws HoodieIOException if some paths can't be finalized on storage
    */
-  public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
+  public void finalizeWrite(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats)
       throws HoodieIOException {
-    if (config.isConsistencyCheckEnabled()) {
-      List<String> pathsToCheck = stats.stream()
-          .map(stat -> stat.getTempPath() != null
-              ? stat.getTempPath() : stat.getPath())
-          .collect(Collectors.toList());
-
-      List<String> failingPaths = new ConsistencyCheck(config.getBasePath(), pathsToCheck, jsc,
-          config.getFinalizeWriteParallelism())
-          .check(MAX_CONSISTENCY_CHECKS, INITIAL_CONSISTENCY_CHECK_INTERVAL_MS);
-      if (failingPaths.size() > 0) {
-        throw new HoodieIOException("Could not verify consistency of paths : " + failingPaths);
+    cleanFailedWrites(jsc, instantTs, stats, config.isConsistencyCheckEnabled());
+  }
+
+  /**
+   * Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark
+   * retries.
+   *
+   * @param jsc       Spark Context
+   * @param instantTs Instant Timestamp
+   * @param stats   Hoodie Write Stat
+   * @param consistencyCheckEnabled  Consistency Check Enabled
+   * @throws HoodieIOException
+   */
+  protected void cleanFailedWrites(JavaSparkContext jsc, String instantTs, List<HoodieWriteStat> stats,
+      boolean consistencyCheckEnabled) throws HoodieIOException {
+    try {
+      // Reconcile marker and data files with WriteStats so that partially written data-files due to failed
+      // (but succeeded on retry) tasks are removed.
+      String basePath = getMetaClient().getBasePath();
+      FileSystem fs = getMetaClient().getFs();
+      Path markerDir = new Path(metaClient.getMarkerFolderPath(instantTs));
+
+      if (!fs.exists(markerDir)) {
+        // Happens when all writes are appends
+        return;
+      }
+
+      List<String> invalidDataPaths = FSUtils.getAllDataFilesForMarkers(fs, basePath, instantTs, markerDir.toString());
+      List<String> validDataPaths = stats.stream().map(w -> String.format("%s/%s", basePath, w.getPath()))
+          .filter(p -> p.endsWith(".parquet")).collect(Collectors.toList());
+      // Contains list of partially created files. These needs to be cleaned up.
+      invalidDataPaths.removeAll(validDataPaths);
+      logger.warn("InValid data paths=" + invalidDataPaths);
+
+      Map<String, List<Pair<String, String>>> groupByPartition = invalidDataPaths.stream()
+          .map(dp -> Pair.of(new Path(dp).getParent().toString(), dp))
+          .collect(Collectors.groupingBy(Pair::getKey));
+
+      if (!groupByPartition.isEmpty()) {
+        // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
+        // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
+        if (consistencyCheckEnabled) {
+          // This will either ensure all files to be deleted are present.
+          waitForAllFiles(jsc, groupByPartition, FileVisibility.APPEAR);
+        }
+
+        // Now delete partially written files
+        jsc.parallelize(new ArrayList<>(groupByPartition.values()), config.getFinalizeWriteParallelism())
+            .map(partitionWithFileList -> {
+              final FileSystem fileSystem = metaClient.getFs();
+              logger.info("Deleting invalid data files=" + partitionWithFileList);
+              if (partitionWithFileList.isEmpty()) {
+                return true;
+              }
+              // Delete
+              partitionWithFileList.stream().map(Pair::getValue).forEach(file -> {
+                try {
+                  fileSystem.delete(new Path(file), false);
+                } catch (IOException e) {
+                  throw new HoodieIOException(e.getMessage(), e);
+                }
+              });
+
+              return true;
+            }).collect();
+
+        // Now ensure the deleted files disappear
+        if (consistencyCheckEnabled) {
+          // This will either ensure all files to be deleted are absent.
+          waitForAllFiles(jsc, groupByPartition, FileVisibility.DISAPPEAR);
+        }
       }
+      // Now delete the marker directory
+      if (fs.exists(markerDir)) {
+        // For append only case, we do not write to marker dir. Hence, the above check
+        logger.info("Removing marker directory=" + markerDir);
+        fs.delete(markerDir, true);
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException(ioe.getMessage(), ioe);
+    }
+  }
+
+  /**
+   * Ensures all files passed either appear or disappear
+   * @param jsc   JavaSparkContext
+   * @param groupByPartition Files grouped by partition
+   * @param visibility Appear/Disappear
+   */
+  private void waitForAllFiles(JavaSparkContext jsc, Map<String, List<Pair<String, String>>> groupByPartition,
+      FileVisibility visibility) {
+    // This will either ensure all files to be deleted are present.
+    boolean checkPassed =
+        jsc.parallelize(new ArrayList<>(groupByPartition.entrySet()), config.getFinalizeWriteParallelism())
+            .map(partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),
+                partitionWithFileList.getValue().stream(), visibility))
+            .collect().stream().allMatch(x -> x);
+    if (!checkPassed) {
+      throw new HoodieIOException("Consistency check failed to ensure all files " + visibility);
+    }
+  }
+
+  private boolean waitForCondition(String partitionPath, Stream<Pair<String, String>> partitionFilePaths,
+      FileVisibility visibility) {
+    final FileSystem fileSystem = metaClient.getFs();
+    List<String> fileList = partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
+    try {
+      getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, fileList, visibility);
+    } catch (IOException | TimeoutException ioe) {
+      logger.error("Got exception while waiting for files to show up", ioe);
+      return false;
     }
+    return true;
+  }
+
+  private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
+    return new FailSafeConsistencyGuard(fileSystem, config.getMaxConsistencyChecks(),
+        config.getInitialConsistencyCheckIntervalMs(),
+        config.getMaxConsistencyCheckIntervalMs());
   }
 }
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
index 987a1d2..3d60c3e 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
@@ -66,6 +66,7 @@ import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -620,18 +621,17 @@ public class TestCleaner extends TestHoodieClientBase {
    * Test Cleaning functionality of table.rollback() API.
    */
   @Test
-  public void testCleanTemporaryDataFilesOnRollback() throws IOException {
+  public void testCleanMarkerDataFilesOnRollback() throws IOException {
     HoodieTestUtils.createCommitFiles(basePath, "000");
-    List<String> tempFiles = createTempFiles("000", 10);
-    assertEquals("Some temp files are created.", 10, tempFiles.size());
-    assertEquals("Some temp files are created.", tempFiles.size(), getTotalTempFiles());
-
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath)
-        .withUseTempFolderCopyOnWriteForCreate(true)
-        .withUseTempFolderCopyOnWriteForMerge(false).build();
-    HoodieTable table = HoodieTable.getHoodieTable(new HoodieTableMetaClient(jsc.hadoopConfiguration(), config
-            .getBasePath(), true),
-        config, jsc);
+    List<String> markerFiles = createMarkerFiles("000", 10);
+    assertEquals("Some marker files are created.", 10, markerFiles.size());
+    assertEquals("Some marker files are created.", markerFiles.size(), getTotalTempFiles());
+
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath).build();
+    HoodieTable table = HoodieTable.getHoodieTable(
+        new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config,
+        jsc);
+
     table.rollback(jsc, "000", true);
     assertEquals("All temp files are deleted.", 0, getTotalTempFiles());
   }
@@ -901,10 +901,10 @@ public class TestCleaner extends TestHoodieClientBase {
    * @return generated files
    * @throws IOException in case of error
    */
-  private List<String> createTempFiles(String commitTime, int numFiles) throws IOException {
+  private List<String> createMarkerFiles(String commitTime, int numFiles) throws IOException {
     List<String> files = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      files.add(HoodieTestUtils.createNewDataFile(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, commitTime));
+      files.add(HoodieTestUtils.createNewMarkerFile(basePath, "2019/03/29", commitTime));
     }
     return files;
   }
@@ -915,7 +915,13 @@ public class TestCleaner extends TestHoodieClientBase {
    * @throws IOException in case of error
    */
   private int getTotalTempFiles() throws IOException {
-    return fs.listStatus(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME)).length;
+    RemoteIterator itr = fs.listFiles(new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME), true);
+    int count = 0;
+    while (itr.hasNext()) {
+      count++;
+      itr.next();
+    }
+    return count;
   }
 
   private Stream<Pair<String, String>> convertPathToFileIdWithCommitTime(
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
new file mode 100644
index 0000000..3330575
--- /dev/null
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
@@ -0,0 +1,108 @@
+/*
+ * Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie;
+
+import com.uber.hoodie.common.HoodieClientTestUtils;
+import com.uber.hoodie.common.util.ConsistencyGuard;
+import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestConsistencyGuard {
+  private String basePath;
+  protected transient FileSystem fs;
+
+  @Before
+  public void setup() throws IOException {
+    TemporaryFolder testFolder = new TemporaryFolder();
+    testFolder.create();
+    basePath = testFolder.getRoot().getAbsolutePath();
+    fs = FSUtils.getFs(basePath, new Configuration());
+    if (fs instanceof LocalFileSystem) {
+      LocalFileSystem lfs = (LocalFileSystem) fs;
+      // With LocalFileSystem, with checksum disabled, fs.open() returns an inputStream which is FSInputStream
+      // This causes ClassCastExceptions in LogRecordScanner (and potentially other places) calling fs.open
+      // So, for the tests, we enforce checksum verification to circumvent the problem
+      lfs.setVerifyChecksum(true);
+    }
+  }
+
+  @Test
+  public void testCheckPassingAppearAndDisAppear() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
+
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 1, 1000, 1000);
+    passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
+    passing.waitTillFileAppears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"));
+    passing.waitTillAllFilesAppear(basePath + "/partition/path",
+        Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet",
+            basePath + "/partition/path/f2_1-0-1_000.parquet"));
+
+    fs.delete(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"), false);
+    fs.delete(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"), false);
+    passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
+    passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f2_1-0-1_000.parquet"));
+    passing.waitTillAllFilesDisappear(basePath + "/partition/path",
+        Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet",
+            basePath + "/partition/path/f2_1-0-1_000.parquet"));
+  }
+
+  @Test(expected = TimeoutException.class)
+  public void testCheckFailingAppear() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+    passing.waitTillAllFilesAppear(basePath + "/partition/path",
+        Arrays.asList(basePath + "/partition/path/f1_1-0-2_000.parquet",
+            basePath + "/partition/path/f2_1-0-2_000.parquet"));
+  }
+
+
+  @Test(expected = TimeoutException.class)
+  public void testCheckFailingAppears() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+    passing.waitTillFileAppears(new Path(basePath + "/partition/path/f1_1-0-2_000.parquet"));
+  }
+
+  @Test(expected = TimeoutException.class)
+  public void testCheckFailingDisappear() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+    passing.waitTillAllFilesDisappear(basePath + "/partition/path",
+        Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet",
+            basePath + "/partition/path/f2_1-0-2_000.parquet"));
+  }
+
+  @Test(expected = TimeoutException.class)
+  public void testCheckFailingDisappears() throws Exception {
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+    passing.waitTillFileDisappears(new Path(basePath + "/partition/path/f1_1-0-1_000.parquet"));
+  }
+}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
index 9ce1070..73ce6f0 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
@@ -16,6 +16,7 @@
 
 package com.uber.hoodie;
 
+import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -38,6 +39,7 @@ import com.uber.hoodie.common.table.TableFileSystemView;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieCompactionConfig;
 import com.uber.hoodie.config.HoodieStorageConfig;
 import com.uber.hoodie.config.HoodieWriteConfig;
@@ -245,19 +247,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
   }
 
   /**
-   * Test Upsert API using temporary folders.
-   */
-  @Test
-  public void testUpsertsWithFinalizeWrite() throws Exception {
-    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
-        .withUseTempFolderCopyOnWriteForCreate(true)
-        .withUseTempFolderCopyOnWriteForMerge(true)
-        .build();
-    testUpsertsInternal(hoodieWriteConfig,
-        HoodieWriteClient::upsert, false);
-  }
-
-  /**
    * Test UpsertPrepped API
    */
   @Test
@@ -267,19 +256,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
   }
 
   /**
-   * Test UpsertPrepped API using temporary folders.
-   */
-  @Test
-  public void testUpsertsPreppedWithFinalizeWrite() throws Exception {
-    HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
-        .withUseTempFolderCopyOnWriteForCreate(true)
-        .withUseTempFolderCopyOnWriteForMerge(true)
-        .build();
-    testUpsertsInternal(hoodieWriteConfig,
-        HoodieWriteClient::upsertPreppedRecords, true);
-  }
-
-  /**
    * Test one of HoodieWriteClient upsert(Prepped) APIs
    *
    * @param hoodieWriteConfig Write Config
@@ -385,7 +361,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     assertEquals("Just 1 file needs to be added.", 1, statuses.size());
     String file1 = statuses.get(0).getFileId();
     assertEquals("file should contain 100 records", ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(),
-        new Path(basePath, testPartitionPath + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(), 100);
+        new Path(basePath, statuses.get(0).getStat().getPath())).size(), 100);
 
     // Update + Inserts such that they just expand file1
     String commitTime2 = "002";
@@ -403,7 +379,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     assertEquals("Just 1 file needs to be updated.", 1, statuses.size());
     assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
     assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
-    Path newFile = new Path(basePath, testPartitionPath + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
+    Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
     assertEquals("file should contain 140 records",
         ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), 140);
 
@@ -499,7 +475,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     assertEquals("Just 1 file needs to be added.", 1, statuses.size());
     String file1 = statuses.get(0).getFileId();
     assertEquals("file should contain 100 records", ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(),
-        new Path(basePath, testPartitionPath + "/" + FSUtils.makeDataFileName(commitTime1, 0, file1))).size(), 100);
+        new Path(basePath, statuses.get(0).getStat().getPath())).size(), 100);
 
     // Second, set of Inserts should just expand file1
     String commitTime2 = "002";
@@ -513,7 +489,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     assertEquals("Just 1 file needs to be updated.", 1, statuses.size());
     assertEquals("Existing file should be expanded", file1, statuses.get(0).getFileId());
     assertEquals("Existing file should be expanded", commitTime1, statuses.get(0).getStat().getPrevCommit());
-    Path newFile = new Path(basePath, testPartitionPath + "/" + FSUtils.makeDataFileName(commitTime2, 0, file1));
+    Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
     assertEquals("file should contain 140 records",
         ParquetUtils.readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(), 140);
 
@@ -678,22 +654,59 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
    */
   @Test
   public void testConsistencyCheckDuringFinalize() throws Exception {
-    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
-    HoodieWriteClient client = getHoodieWriteClient(cfg);
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
         basePath);
+    String commitTime = "000";
+    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
+    HoodieWriteClient client = getHoodieWriteClient(cfg);
+    Pair<Path, JavaRDD<WriteStatus>> result = testConsistencyCheck(metaClient, commitTime);
 
+    // Delete orphan marker and commit should succeed
+    metaClient.getFs().delete(result.getKey(), false);
+    assertTrue("Commit should succeed", client.commit(commitTime, result.getRight()));
+    assertTrue("After explicit commit, commit file should be created",
+        HoodieTestUtils.doesCommitExist(basePath, commitTime));
+    // Marker directory must be removed
+    assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(commitTime))));
+  }
+
+  @Test
+  public void testRollbackAfterConsistencyCheckFailure() throws Exception {
     String commitTime = "000";
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
+    HoodieWriteClient client = getHoodieWriteClient(cfg);
+    testConsistencyCheck(metaClient, commitTime);
+
+    // Rollback of this commit should succeed
+    client.rollback(commitTime);
+    assertFalse("After explicit rollback, commit file should not be present",
+        HoodieTestUtils.doesCommitExist(basePath, commitTime));
+    // Marker directory must be removed after rollback
+    assertFalse(metaClient.getFs().exists(new Path(metaClient.getMarkerFolderPath(commitTime))));
+  }
+
+  private Pair<Path, JavaRDD<WriteStatus>> testConsistencyCheck(HoodieTableMetaClient metaClient, String commitTime)
+      throws Exception {
+    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).withMaxConsistencyCheckIntervalMs(1)
+        .withInitialConsistencyCheckIntervalMs(1).build();
+    HoodieWriteClient client = getHoodieWriteClient(cfg);
+
     client.startCommitWithTime(commitTime);
-    JavaRDD<HoodieRecord> writeRecords = jsc
-        .parallelize(dataGen.generateInserts(commitTime, 200), 1);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(commitTime, 200), 1);
     JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, commitTime);
-
-    // move one of the files & commit should fail
-    WriteStatus status = result.take(1).get(0);
-    Path origPath = new Path(basePath + "/" + status.getStat().getPath());
-    Path hidePath = new Path(basePath + "/" + status.getStat().getPath() + "_hide");
-    metaClient.getFs().rename(origPath, hidePath);
+    result.collect();
+
+    // Create a dummy marker file to simulate the case that a marker file was created without data file.
+    // This should fail the commit
+    String partitionPath = Arrays.stream(fs.globStatus(new Path(String.format("%s/*/*/*/*",
+        metaClient.getMarkerFolderPath(commitTime))),
+        path -> path.toString().endsWith(MARKER_EXTN))).limit(1)
+        .map(status -> status.getPath().getParent().toString()).collect(Collectors.toList()).get(0);
+    Path markerFilePath = new Path(String.format("%s/%s", partitionPath,
+        FSUtils.makeMarkerFile(commitTime, "1-0-1", UUID.randomUUID().toString())));
+    metaClient.getFs().create(markerFilePath);
+    logger.info("Created a dummy marker path=" + markerFilePath);
 
     try {
       client.commit(commitTime, result);
@@ -701,12 +714,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     } catch (HoodieCommitException cme) {
       assertTrue(cme.getCause() instanceof HoodieIOException);
     }
-
-    // Re-introduce & commit should succeed
-    metaClient.getFs().rename(hidePath, origPath);
-    assertTrue("Commit should succeed", client.commit(commitTime, result));
-    assertTrue("After explicit commit, commit file should be created",
-        HoodieTestUtils.doesCommitExist(basePath, commitTime));
+    return Pair.of(markerFilePath, result);
   }
 
   /**
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
index 8070be7..43dce29 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieReadClient.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.spark.api.java.JavaRDD;
+import org.junit.Assert;
 import org.junit.Test;
 import scala.Option;
 
@@ -107,7 +108,7 @@ public class TestHoodieReadClient extends TestHoodieClientBase {
     filteredRDD = readClient.filterExists(recordsRDD);
     List<HoodieRecord> result = filteredRDD.collect();
     // Check results
-    assertTrue(result.size() == 25);
+    Assert.assertEquals(25, result.size());
   }
 
   /**
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
index 2507dca..e77653e 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieClientTestUtils.java
@@ -105,7 +105,7 @@ public class HoodieClientTestUtils {
       throws Exception {
     String parentPath = String.format("%s/%s", basePath, partitionPath);
     new File(parentPath).mkdirs();
-    String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(commitTime, 0, fileId));
+    String path = String.format("%s/%s", parentPath, FSUtils.makeDataFileName(commitTime, "1-0-1", fileId));
     new File(path).createNewFile();
     new RandomAccessFile(path, "rw").setLength(length);
   }
@@ -236,7 +236,7 @@ public class HoodieClientTestUtils {
     Thread.sleep(1000);
     String commitTime = HoodieTestUtils.makeNewCommitTime();
     String fileId = UUID.randomUUID().toString();
-    String filename = FSUtils.makeDataFileName(commitTime, 1, fileId);
+    String filename = FSUtils.makeDataFileName(commitTime, "1-0-1", fileId);
     HoodieTestUtils.createCommitFiles(basePath, commitTime);
     return HoodieClientTestUtils
         .writeParquetFile(basePath, partitionPath, filename, records, schema, filter, createCommitTime);
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java
index 20a14ea..d3c78a8 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/func/TestUpdateMapFunction.java
@@ -25,24 +25,32 @@ import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordLocation;
 import com.uber.hoodie.common.model.HoodieTestUtils;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.ParquetUtils;
 import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.io.HoodieCreateHandle;
+import com.uber.hoodie.io.HoodieMergeHandle;
 import com.uber.hoodie.table.HoodieCopyOnWriteTable;
 import java.io.File;
+import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Iterator;
+import java.util.Arrays;
 import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class TestUpdateMapFunction {
+public class TestUpdateMapFunction implements Serializable {
 
   private String basePath = null;
   private transient JavaSparkContext jsc = null;
@@ -71,51 +79,73 @@ public class TestUpdateMapFunction {
   @Test
   public void testSchemaEvolutionOnUpdate() throws Exception {
     // Create a bunch of records with a old version of schema
-    HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt");
-    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(HoodieTestUtils.getDefaultHadoopConf(), basePath);
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    final HoodieWriteConfig config = makeHoodieClientConfig("/exampleSchema.txt");
+    System.out.println("JSC =" + jsc);
+    final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
 
-    String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
-        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
-    String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
-        + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
-    String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
-        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
-    List<HoodieRecord> records = new ArrayList<>();
-    TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
-    records.add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
-    TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
-    records.add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
-    TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
-    records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
-    Iterator<List<WriteStatus>> insertResult = table.handleInsert("100", records.iterator());
-    Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100"));
+    final List<WriteStatus> statuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
+      String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+          + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12}";
+      String recordStr2 = "{\"_row_key\":\"8eb5b87b-1feu-4edd-87b4-6ec96dc405a0\","
+          + "\"time\":\"2016-01-31T03:20:41.415Z\",\"number\":100}";
+      String recordStr3 = "{\"_row_key\":\"8eb5b87c-1fej-4edd-87b4-6ec96dc405a0\","
+          + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":15}";
+      List<HoodieRecord> insertRecords = new ArrayList<>();
+      TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
+      insertRecords
+          .add(new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()), rowChange1));
+      TestRawTripPayload rowChange2 = new TestRawTripPayload(recordStr2);
+      insertRecords
+          .add(new HoodieRecord(new HoodieKey(rowChange2.getRowKey(), rowChange2.getPartitionPath()), rowChange2));
+      TestRawTripPayload rowChange3 = new TestRawTripPayload(recordStr3);
+      insertRecords
+          .add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
+
+      HoodieCreateHandle createHandle = new HoodieCreateHandle(config, "100", table, rowChange1.getPartitionPath(),
+          "f1-0", insertRecords.iterator());
+      createHandle.write();
+      WriteStatus insertResult = createHandle.close();
+      return insertResult;
+    }).collect();
+
+    final Path commitFile = new Path(config.getBasePath() + "/.hoodie/" + HoodieTimeline.makeCommitFileName("100"));
     FSUtils.getFs(basePath, HoodieTestUtils.getDefaultHadoopConf()).create(commitFile);
 
     // Now try an update with an evolved schema
     // Evolved schema does not have guarantee on preserving the original field ordering
-    config = makeHoodieClientConfig("/exampleEvolvedSchema.txt");
-    metaClient = new HoodieTableMetaClient(HoodieTestUtils.getDefaultHadoopConf(), basePath);
-    String fileId = insertResult.next().get(0).getFileId();
-    System.out.println(fileId);
+    final HoodieWriteConfig config2 = makeHoodieClientConfig("/exampleEvolvedSchema.txt");
+    final Schema schema = Schema.parse(config2.getSchema());
+    final WriteStatus insertResult = statuses.get(0);
+    String fileId = insertResult.getFileId();
 
-    table = new HoodieCopyOnWriteTable(config, jsc);
-    // New content with values for the newly added field
-    recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
-        + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
-    records = new ArrayList<>();
-    rowChange1 = new TestRawTripPayload(recordStr1);
-    HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
-        rowChange1);
-    record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
-    records.add(record1);
+    final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config2, jsc);
+    Assert.assertEquals(1, jsc.parallelize(Arrays.asList(1)).map(x -> {
+      // New content with values for the newly added field
+      String recordStr1 = "{\"_row_key\":\"8eb5b87a-1feh-4edd-87b4-6ec96dc405a0\","
+          + "\"time\":\"2016-01-31T03:16:41.415Z\",\"number\":12,\"added_field\":1}";
+      List<HoodieRecord> updateRecords = new ArrayList<>();
+      TestRawTripPayload rowChange1 = new TestRawTripPayload(recordStr1);
+      HoodieRecord record1 = new HoodieRecord(new HoodieKey(rowChange1.getRowKey(), rowChange1.getPartitionPath()),
+          rowChange1);
+      record1.setCurrentLocation(new HoodieRecordLocation("100", fileId));
+      updateRecords.add(record1);
 
-    try {
-      table.handleUpdate("101", fileId, records.iterator());
-    } catch (ClassCastException e) {
-      fail("UpdateFunction could not read records written with exampleSchema.txt using the "
-          + "exampleEvolvedSchema.txt");
-    }
+      try {
+        HoodieMergeHandle mergeHandle = new HoodieMergeHandle(config2, "101", table2, updateRecords.iterator(), fileId);
+        Configuration conf = new Configuration();
+        AvroReadSupport.setAvroReadSchema(conf, mergeHandle.getWriterSchema());
+        List<GenericRecord> oldRecords = ParquetUtils.readAvroRecords(conf,
+            new Path(config2.getBasePath() + "/" + insertResult.getStat().getPath()));
+        for (GenericRecord rec : oldRecords) {
+          mergeHandle.write(rec);
+        }
+        mergeHandle.close();
+      } catch (ClassCastException e) {
+        fail("UpdateFunction could not read records written with exampleSchema.txt using the "
+            + "exampleEvolvedSchema.txt");
+      }
+      return 1;
+    }).collect().size());
   }
 
   private HoodieWriteConfig makeHoodieClientConfig(String schema) throws Exception {
@@ -123,5 +153,4 @@ public class TestUpdateMapFunction {
     String schemaStr = IOUtils.toString(getClass().getResourceAsStream(schema), "UTF-8");
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr).build();
   }
-
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java
deleted file mode 100644
index 9a1c4d5..0000000
--- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestConsistencyCheck.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *          http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.uber.hoodie.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.anyInt;
-import static org.mockito.Mockito.anyList;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import com.uber.hoodie.common.HoodieClientTestUtils;
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class TestConsistencyCheck {
-
-  private String basePath;
-  private JavaSparkContext jsc;
-
-  @Before
-  public void setup() throws IOException {
-    jsc = new JavaSparkContext(HoodieClientTestUtils.getSparkConfForTest("ConsistencyCheckTest"));
-    TemporaryFolder testFolder = new TemporaryFolder();
-    testFolder.create();
-    basePath = testFolder.getRoot().getAbsolutePath();
-  }
-
-  @After
-  public void teardown() {
-    if (jsc != null) {
-      jsc.stop();
-    }
-    File testFolderPath = new File(basePath);
-    if (testFolderPath.exists()) {
-      testFolderPath.delete();
-    }
-  }
-
-  @Test
-  public void testExponentialBackoff() throws Exception {
-    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
-    JavaSparkContext jscSpy = spy(jsc);
-
-    ConsistencyCheck failing = new ConsistencyCheck(basePath,
-        Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f2_0_000.parquet"),
-        jscSpy, 2);
-    long startMs = System.currentTimeMillis();
-    assertEquals(1, failing.check(5, 10).size());
-    assertTrue((System.currentTimeMillis() - startMs) > (10 + 20 + 40 + 80));
-    verify(jscSpy, times(5)).parallelize(anyList(), anyInt());
-  }
-
-  @Test
-  public void testCheckPassingAndFailing() throws Exception {
-    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f1");
-    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f2");
-    HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", "f3");
-
-    ConsistencyCheck passing = new ConsistencyCheck(basePath,
-        Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f2_0_000.parquet"),
-        jsc, 2);
-    assertEquals(0, passing.check(1, 1000).size());
-
-    ConsistencyCheck failing = new ConsistencyCheck(basePath,
-        Arrays.asList("partition/path/f1_0_000.parquet", "partition/path/f4_0_000.parquet"),
-        jsc, 2);
-    assertEquals(1, failing.check(1, 1000).size());
-  }
-}
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java
index 2d6f957..45c200b 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/io/TestHoodieCommitArchiveLog.java
@@ -74,13 +74,14 @@ public class TestHoodieCommitArchiveLog {
 
   @AfterClass
   public static void cleanUp() throws Exception {
+    // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
+    // same JVM
+    FileSystem.closeAll();
+
     if (hdfsTestService != null) {
       hdfsTestService.stop();
       dfsCluster.shutdown();
     }
-    // Need to closeAll to clear FileSystem.Cache, required because DFS and LocalFS used in the
-    // same JVM
-    FileSystem.closeAll();
   }
 
   @BeforeClass
@@ -245,7 +246,7 @@ public class TestHoodieCommitArchiveLog {
 
     //read the file
     HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(dfs,
-        new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1")),
+        new HoodieLogFile(new Path(basePath + "/.hoodie/.commits_.archive.1_1-0-1")),
         HoodieArchivedMetaEntry.getClassSchema());
 
     int archivedRecordsCount = 0;
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
index 714f0e5..822ec93 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestCopyOnWriteTable.java
@@ -17,7 +17,6 @@
 package com.uber.hoodie.table;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -36,6 +35,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieCompactionConfig;
 import com.uber.hoodie.config.HoodieStorageConfig;
 import com.uber.hoodie.config.HoodieWriteConfig;
@@ -44,18 +44,18 @@ import com.uber.hoodie.table.HoodieCopyOnWriteTable.UpsertPartitioner;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroReadSupport;
 import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.spark.TaskContext;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -85,20 +85,24 @@ public class TestCopyOnWriteTable {
   public void testMakeNewPath() throws Exception {
     String fileName = UUID.randomUUID().toString();
     String partitionPath = "2016/05/04";
-    int unitNumber = (int) (Math.random() * 10);
-    HoodieRecord record = mock(HoodieRecord.class);
-    when(record.getPartitionPath()).thenReturn(partitionPath);
 
     String commitTime = HoodieTestUtils.makeNewCommitTime();
     HoodieWriteConfig config = makeHoodieClientConfig();
     HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
     HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
 
-    HoodieCreateHandle io = new HoodieCreateHandle(config, commitTime, table, partitionPath,
-        UUID.randomUUID().toString());
-    Path newPath = io.makeNewPath(record.getPartitionPath(), unitNumber, fileName);
-    assertTrue(newPath.toString().equals(
-        this.basePath + "/" + partitionPath + "/" + FSUtils.makeDataFileName(commitTime, unitNumber, fileName)));
+    Pair<Path, String> newPathWithWriteToken = jsc.parallelize(Arrays.asList(1)).map(x -> {
+      HoodieRecord record = mock(HoodieRecord.class);
+      when(record.getPartitionPath()).thenReturn(partitionPath);
+      String writeToken = FSUtils.makeWriteToken(TaskContext.getPartitionId(), TaskContext.get().stageId(),
+          TaskContext.get().taskAttemptId());
+      HoodieCreateHandle io = new HoodieCreateHandle(config, commitTime, table, partitionPath, fileName);
+      return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
+    }).collect().get(0);
+
+    Assert.assertEquals(newPathWithWriteToken.getKey().toString(),
+        this.basePath + "/" + partitionPath + "/"
+            + FSUtils.makeDataFileName(commitTime, newPathWithWriteToken.getRight(), fileName));
   }
 
   private HoodieWriteConfig makeHoodieClientConfig() throws Exception {
@@ -141,7 +145,11 @@ public class TestCopyOnWriteTable {
     records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
 
     // Insert new records
-    HoodieClientTestUtils.collectStatuses(table.handleInsert(firstCommitTime, records.iterator()));
+    final HoodieCopyOnWriteTable cowTable = table;
+    jsc.parallelize(Arrays.asList(1)).map(x -> {
+      return cowTable.handleInsert(firstCommitTime, FSUtils.createNewFileIdPfx(), records.iterator());
+    }).map(x -> HoodieClientTestUtils.collectStatuses(x)).collect();
+
     // We should have a parquet file generated (TODO: better control # files after we revise
     // AvroParquetIO)
     File parquetFile = null;
@@ -190,10 +198,12 @@ public class TestCopyOnWriteTable {
     Thread.sleep(1000);
     String newCommitTime = HoodieTestUtils.makeNewCommitTime();
     metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    table = new HoodieCopyOnWriteTable(config, jsc);
-    Iterator<List<WriteStatus>> iter = table
-        .handleUpdate(newCommitTime, updatedRecord1.getCurrentLocation().getFileId(),
-            updatedRecords.iterator());
+    final HoodieCopyOnWriteTable newTable = new HoodieCopyOnWriteTable(config, jsc);
+    List<WriteStatus> statuses =
+        jsc.parallelize(Arrays.asList(1)).map(x -> {
+          return newTable.handleUpdate(newCommitTime, updatedRecord1.getCurrentLocation().getFileId(),
+                  updatedRecords.iterator());
+        }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
 
     // Check the updated file
     File updatedParquetFile = null;
@@ -231,7 +241,6 @@ public class TestCopyOnWriteTable {
     }
     updatedReader.close();
     // Also check the numRecordsWritten
-    List<WriteStatus> statuses = HoodieClientTestUtils.collectStatuses(iter);
     WriteStatus writeStatus = statuses.get(0);
     assertTrue("Should be only one file generated", statuses.size() == 1);
     assertEquals(4, writeStatus.getStat().getNumWrites());//3 rewritten records + 1 new record
@@ -277,8 +286,10 @@ public class TestCopyOnWriteTable {
     records.add(new HoodieRecord(new HoodieKey(rowChange3.getRowKey(), rowChange3.getPartitionPath()), rowChange3));
 
     // Insert new records
-    List<WriteStatus> writeStatuses = HoodieClientTestUtils
-        .collectStatuses(table.handleInsert(firstCommitTime, records.iterator()));
+    List<WriteStatus> writeStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
+      return table.handleInsert(firstCommitTime, FSUtils.createNewFileIdPfx(), records.iterator());
+    }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
+
     Map<String, String> allWriteStatusMergedMetadataMap = MetadataMergeWriteStatus
         .mergeMetadataForWriteStatuses(writeStatuses);
     assertTrue(allWriteStatusMergedMetadataMap.containsKey("InputRecordCount_1506582000"));
@@ -288,41 +299,6 @@ public class TestCopyOnWriteTable {
   }
 
   @Test
-  public void testInsertWithPartialFailures() throws Exception {
-    HoodieWriteConfig config = makeHoodieClientConfig();
-    String commitTime = HoodieTestUtils.makeNewCommitTime();
-    FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
-
-    // Write a few records, and get atleast one file
-    // 10 records for partition 1, 1 record for partition 2.
-    List<HoodieRecord> records = newHoodieRecords(10, "2016-01-31T03:16:41.415Z");
-    records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
-
-    // Simulate crash after first file
-    List<WriteStatus> statuses = HoodieClientTestUtils
-        .collectStatuses(table.handleInsert(commitTime, records.iterator()));
-    WriteStatus status = statuses.get(0);
-    Path partialFile = new Path(String.format("%s/%s/%s", basePath, status.getPartitionPath(),
-        FSUtils.makeDataFileName(commitTime, 0, status.getFileId())));
-    assertTrue(fs.exists(partialFile));
-
-    // When we retry
-    records = newHoodieRecords(10, "2016-01-31T03:16:41.415Z");
-    records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
-
-    statuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator()));
-    status = statuses.get(0);
-
-    Path retriedFIle = new Path(String.format("%s/%s/%s", basePath, status.getPartitionPath(),
-        FSUtils.makeDataFileName(commitTime, 0, status.getFileId())));
-    assertTrue(fs.exists(retriedFIle));
-    assertFalse(fs.exists(partialFile));
-  }
-
-
-  @Test
   public void testInsertRecords() throws Exception {
     HoodieWriteConfig config = makeHoodieClientConfig();
     String commitTime = HoodieTestUtils.makeNewCommitTime();
@@ -335,8 +311,10 @@ public class TestCopyOnWriteTable {
     records.addAll(newHoodieRecords(1, "2016-02-01T03:16:41.415Z"));
 
     // Insert new records
-    List<WriteStatus> returnedStatuses = HoodieClientTestUtils
-        .collectStatuses(table.handleInsert(commitTime, records.iterator()));
+    final List<HoodieRecord> recs2 = records;
+    List<WriteStatus> returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
+      return table.handleInsert(commitTime, FSUtils.createNewFileIdPfx(), recs2.iterator());
+    }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
 
     // TODO: check the actual files and make sure 11 records, total were written.
     assertEquals(2, returnedStatuses.size());
@@ -354,7 +332,11 @@ public class TestCopyOnWriteTable {
     records.addAll(newHoodieRecords(1, "2016-02-02T03:16:41.415Z"));
 
     // Insert new records
-    returnedStatuses = HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator()));
+    final List<HoodieRecord> recs3 = records;
+
+    returnedStatuses = jsc.parallelize(Arrays.asList(1)).map(x -> {
+      return table.handleInsert(commitTime, FSUtils.createNewFileIdPfx(), recs3.iterator());
+    }).flatMap(x -> HoodieClientTestUtils.collectStatuses(x).iterator()).collect();
 
     assertEquals(3, returnedStatuses.size());
     assertEquals("2016/01/31", returnedStatuses.get(0).getPartitionPath());
@@ -388,7 +370,9 @@ public class TestCopyOnWriteTable {
     }
 
     // Insert new records
-    HoodieClientTestUtils.collectStatuses(table.handleInsert(commitTime, records.iterator()));
+    jsc.parallelize(Arrays.asList(1))
+        .map(i -> table.handleInsert(commitTime, FSUtils.createNewFileIdPfx(), records.iterator()))
+        .map(x -> HoodieClientTestUtils.collectStatuses(x)).collect();
 
     // Check the updated file
     int counts = 0;
@@ -487,19 +471,26 @@ public class TestCopyOnWriteTable {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder().withStorageConfig(
         HoodieStorageConfig.newBuilder().limitFileSize(1000 * 1024).build()).build();
     HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
-    HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
+    final HoodieCopyOnWriteTable table = new HoodieCopyOnWriteTable(config, jsc);
     String commitTime = "000";
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     // Perform inserts of 100 records to test CreateHandle and BufferedExecutor
-    List<HoodieRecord> inserts = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 100);
-    Iterator<List<WriteStatus>> ws = table.handleInsert(commitTime, inserts.iterator());
-    WriteStatus writeStatus = ws.next().get(0);
+    final List<HoodieRecord> inserts = dataGenerator.generateInsertsWithHoodieAvroPayload(commitTime, 100);
+    final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
+      return table.handleInsert(commitTime, UUID.randomUUID().toString(), inserts.iterator());
+    }).map(x -> (List<WriteStatus>)HoodieClientTestUtils.collectStatuses(x)).collect();
+
+    WriteStatus writeStatus = ws.get(0).get(0);
     String fileId = writeStatus.getFileId();
     metadata.getFs().create(new Path(basePath + "/.hoodie/000.commit")).close();
-    table = new HoodieCopyOnWriteTable(config, jsc);
-    // Perform update of 100 records to test MergeHandle and BufferedExecutor
-    table.handleUpdate("001", fileId,
-        dataGenerator.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords()).iterator());
+    final HoodieCopyOnWriteTable table2 = new HoodieCopyOnWriteTable(config, jsc);
+
+    final List<HoodieRecord> updates =
+        dataGenerator.generateUpdatesWithHoodieAvroPayload(commitTime, writeStatus.getWrittenRecords());
+
+    jsc.parallelize(Arrays.asList(1)).map(x -> {
+      return table2.handleUpdate("001", fileId, updates.iterator());
+    }).map(x -> (List<WriteStatus>)HoodieClientTestUtils.collectStatuses(x)).collect();
   }
 
   @After
@@ -511,4 +502,4 @@ public class TestCopyOnWriteTable {
       jsc.stop();
     }
   }
-}
+}
\ No newline at end of file
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
similarity index 92%
rename from hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java
rename to hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
index f18901d..389cf6d 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/HoodieWrapperFileSystem.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
@@ -14,10 +14,12 @@
  * limitations under the License.
  */
 
-package com.uber.hoodie.io.storage;
+package com.uber.hoodie.common.io.storage;
 
 import com.uber.hoodie.common.storage.StorageSchemes;
+import com.uber.hoodie.common.util.ConsistencyGuard;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.NoOpConsistencyGuard;
 import com.uber.hoodie.exception.HoodieIOException;
 import java.io.IOException;
 import java.net.URI;
@@ -27,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.ContentSummary;
@@ -64,6 +67,16 @@ public class HoodieWrapperFileSystem extends FileSystem {
       ConcurrentHashMap<>();
   private FileSystem fileSystem;
   private URI uri;
+  private ConsistencyGuard consistencyGuard = new NoOpConsistencyGuard();
+
+  public HoodieWrapperFileSystem() {
+  }
+
+  public HoodieWrapperFileSystem(FileSystem fileSystem, ConsistencyGuard consistencyGuard) {
+    this.fileSystem = fileSystem;
+    this.uri = fileSystem.getUri();
+    this.consistencyGuard = consistencyGuard;
+  }
 
   public static Path convertToHoodiePath(Path file, Configuration conf) {
     try {
@@ -139,8 +152,8 @@ public class HoodieWrapperFileSystem extends FileSystem {
       return fsDataOutputStream;
     }
 
-    SizeAwareFSDataOutputStream os = new SizeAwareFSDataOutputStream(
-            fsDataOutputStream, () -> openStreams.remove(path.getName()));
+    SizeAwareFSDataOutputStream os = new SizeAwareFSDataOutputStream(path,
+            fsDataOutputStream, consistencyGuard, () -> openStreams.remove(path.getName()));
     openStreams.put(path.getName(), os);
     return os;
   }
@@ -157,66 +170,66 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataOutputStream create(Path f, Progressable progress) throws IOException {
-    return fileSystem.create(convertToDefaultPath(f), progress);
+    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), progress));
   }
 
   @Override
   public FSDataOutputStream create(Path f, short replication) throws IOException {
-    return fileSystem.create(convertToDefaultPath(f), replication);
+    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication));
   }
 
   @Override
   public FSDataOutputStream create(Path f, short replication, Progressable progress)
       throws IOException {
-    return fileSystem.create(convertToDefaultPath(f), replication, progress);
+    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), replication, progress));
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
-    return fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize);
+    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize));
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, Progressable progress)
       throws IOException {
-    return fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress);
+    return wrapOutputStream(f, fileSystem.create(convertToDefaultPath(f), overwrite, bufferSize, progress));
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
       long blockSize, Progressable progress) throws IOException {
-    return fileSystem
-        .create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress);
+    return wrapOutputStream(f, fileSystem
+        .create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize, progress));
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags,
       int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
-    return fileSystem
+    return wrapOutputStream(f, fileSystem
         .create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize,
-            progress);
+            progress));
   }
 
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission, EnumSet<CreateFlag> flags,
       int bufferSize, short replication, long blockSize, Progressable progress,
       Options.ChecksumOpt checksumOpt) throws IOException {
-    return fileSystem
+    return wrapOutputStream(f, fileSystem
         .create(convertToDefaultPath(f), permission, flags, bufferSize, replication, blockSize,
-            progress, checksumOpt);
+            progress, checksumOpt));
   }
 
   @Override
   public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication,
       long blockSize) throws IOException {
-    return fileSystem
-        .create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize);
+    return wrapOutputStream(f, fileSystem
+        .create(convertToDefaultPath(f), overwrite, bufferSize, replication, blockSize));
   }
 
   @Override
   public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
       throws IOException {
-    return fileSystem.append(convertToDefaultPath(f), bufferSize, progress);
+    return wrapOutputStream(f, fileSystem.append(convertToDefaultPath(f), bufferSize, progress));
   }
 
   @Override
@@ -226,7 +239,16 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
-    return fileSystem.delete(convertToDefaultPath(f), recursive);
+    boolean success = fileSystem.delete(convertToDefaultPath(f), recursive);
+
+    if (success) {
+      try {
+        consistencyGuard.waitTillFileDisappears(f);
+      } catch (TimeoutException e) {
+        return false;
+      }
+    }
+    return success;
   }
 
   @Override
@@ -251,6 +273,11 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+    } catch (TimeoutException e) {
+      // pass
+    }
     return fileSystem.getFileStatus(convertToDefaultPath(f));
   }
 
@@ -353,12 +380,12 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public FSDataOutputStream append(Path f) throws IOException {
-    return fileSystem.append(convertToDefaultPath(f));
+    return wrapOutputStream(f, fileSystem.append(convertToDefaultPath(f)));
   }
 
   @Override
   public FSDataOutputStream append(Path f, int bufferSize) throws IOException {
-    return fileSystem.append(convertToDefaultPath(f), bufferSize);
+    return wrapOutputStream(f, fileSystem.append(convertToDefaultPath(f), bufferSize));
   }
 
   @Override
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/SizeAwareFSDataOutputStream.java b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/SizeAwareFSDataOutputStream.java
similarity index 71%
rename from hoodie-client/src/main/java/com/uber/hoodie/io/storage/SizeAwareFSDataOutputStream.java
rename to hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/SizeAwareFSDataOutputStream.java
index 3f966d6..ac31933 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/storage/SizeAwareFSDataOutputStream.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/SizeAwareFSDataOutputStream.java
@@ -14,11 +14,15 @@
  * limitations under the License.
  */
 
-package com.uber.hoodie.io.storage;
+package com.uber.hoodie.common.io.storage;
 
+import com.uber.hoodie.common.util.ConsistencyGuard;
+import com.uber.hoodie.exception.HoodieException;
 import java.io.IOException;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Wrapper over <code>FSDataOutputStream</code> to keep track of the size of the written bytes. This
@@ -30,11 +34,17 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
   private final Runnable closeCallback;
   // Keep track of the bytes written
   private final AtomicLong bytesWritten = new AtomicLong(0L);
+  // Path
+  private final Path path;
+  // Consistency guard
+  private final ConsistencyGuard consistencyGuard;
 
-  public SizeAwareFSDataOutputStream(FSDataOutputStream out, Runnable closeCallback)
-      throws IOException {
+  public SizeAwareFSDataOutputStream(Path path, FSDataOutputStream out,
+      ConsistencyGuard consistencyGuard, Runnable closeCallback) throws IOException {
     super(out);
+    this.path = path;
     this.closeCallback = closeCallback;
+    this.consistencyGuard = consistencyGuard;
   }
 
   @Override
@@ -52,6 +62,11 @@ public class SizeAwareFSDataOutputStream extends FSDataOutputStream {
   @Override
   public void close() throws IOException {
     super.close();
+    try {
+      consistencyGuard.waitTillFileAppears(path);
+    } catch (TimeoutException e) {
+      throw new HoodieException(e);
+    }
     closeCallback.run();
   }
 
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java
index 9393c83..06de961 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/FileSlice.java
@@ -59,7 +59,7 @@ public class FileSlice implements Serializable {
     this.fileGroupId = fileGroupId;
     this.baseInstantTime = baseInstantTime;
     this.dataFile = null;
-    this.logFiles = new TreeSet<>(HoodieLogFile.getBaseInstantAndLogVersionComparator());
+    this.logFiles = new TreeSet<>(HoodieLogFile.getReverseLogFileComparator());
   }
 
   public void setDataFile(HoodieDataFile dataFile) {
@@ -94,6 +94,10 @@ public class FileSlice implements Serializable {
     return Optional.ofNullable(dataFile);
   }
 
+  public Optional<HoodieLogFile> getLatestLogFile() {
+    return logFiles.stream().findFirst();
+  }
+
   /**
    * Returns true if there is no data file and no log files. Happens as part of pending compaction
    * @return
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
index 0820b57..fa11252 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieLogFile.java
@@ -72,6 +72,10 @@ public class HoodieLogFile implements Serializable {
     return FSUtils.getFileVersionFromLog(getPath());
   }
 
+  public String getLogWriteToken() {
+    return FSUtils.getWriteTokenFromLogPath(getPath());
+  }
+
   public String getFileExtension() {
     return FSUtils.getFileExtensionFromLog(getPath());
   }
@@ -96,7 +100,11 @@ public class HoodieLogFile implements Serializable {
     return fileStatus;
   }
 
-  public HoodieLogFile rollOver(FileSystem fs) throws IOException {
+  public void setFileStatus(FileStatus fileStatus) {
+    this.fileStatus = fileStatus;
+  }
+
+  public HoodieLogFile rollOver(FileSystem fs, String logWriteToken) throws IOException {
     String fileId = getFileId();
     String baseCommitTime = getBaseCommitTime();
     Path path = getPath();
@@ -105,28 +113,50 @@ public class HoodieLogFile implements Serializable {
         .computeNextLogVersion(fs, path.getParent(), fileId,
             extension, baseCommitTime);
     return new HoodieLogFile(new Path(path.getParent(),
-        FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion)));
+        FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion, logWriteToken)));
+  }
+
+  public static Comparator<HoodieLogFile> getLogFileComparator() {
+    return new LogFileComparator();
   }
 
-  public static Comparator<HoodieLogFile> getBaseInstantAndLogVersionComparator() {
-    return new BaseInstantAndLogVersionComparator();
+  public static Comparator<HoodieLogFile> getReverseLogFileComparator() {
+    return new LogFileComparator().reversed();
   }
 
   /**
    * Comparator to order log-files
    */
-  private static class BaseInstantAndLogVersionComparator implements Comparator<HoodieLogFile>, Serializable {
+  public static class LogFileComparator implements Comparator<HoodieLogFile>, Serializable {
+
+    private transient Comparator<HoodieLogFile> writeTokenComparator;
+
+    private Comparator<HoodieLogFile> getWriteTokenComparator() {
+      if (null == writeTokenComparator) {
+        // writeTokenComparator is not serializable. Hence, lazy loading
+        writeTokenComparator = Comparator.nullsFirst(Comparator.comparing(HoodieLogFile::getLogWriteToken));
+      }
+      return writeTokenComparator;
+    }
 
     @Override
     public int compare(HoodieLogFile o1, HoodieLogFile o2) {
       String baseInstantTime1 = o1.getBaseCommitTime();
       String baseInstantTime2 = o2.getBaseCommitTime();
+
       if (baseInstantTime1.equals(baseInstantTime2)) {
-        // reverse the order by log-version when base-commit is same
-        return new Integer(o2.getLogVersion()).compareTo(o1.getLogVersion());
+
+        if (o1.getLogVersion() == o2.getLogVersion()) {
+          // Compare by write token when base-commit and log-version is same
+          return getWriteTokenComparator().compare(o1, o2);
+        }
+
+        // compare by log-version when base-commit is same
+        return Integer.compare(o1.getLogVersion(), o2.getLogVersion());
       }
-      // reverse the order by base-commits
-      return baseInstantTime2.compareTo(baseInstantTime1);
+
+      // compare by base-commits
+      return baseInstantTime1.compareTo(baseInstantTime2);
     }
   }
 
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
index e74606c..1a59016 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java
@@ -310,11 +310,8 @@ public class HoodieWriteStat implements Serializable {
   /**
    * Set path and tempPath relative to the given basePath.
    */
-  public void setPaths(Path basePath, Path path, Path tempPath) {
+  public void setPath(Path basePath, Path path) {
     this.path = path.toString().replace(basePath + "/", "");
-    if (tempPath != null) {
-      this.tempPath = tempPath.toString().replace(basePath + "/", "");
-    }
   }
 
   @Override
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
index 7570786..e76f775 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
@@ -61,6 +61,7 @@ public class HoodieTableMetaClient implements Serializable {
   public static String METAFOLDER_NAME = ".hoodie";
   public static String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
   public static String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
+  public static final String MARKER_EXTN = ".marker";
 
   private String basePath;
   private transient FileSystem fs;
@@ -143,6 +144,22 @@ public class HoodieTableMetaClient implements Serializable {
   }
 
   /**
+   * @return Temp Folder path
+   */
+  public String getTempFolderPath() {
+    return basePath + File.separator + TEMPFOLDER_NAME;
+  }
+
+  /**
+   *  Returns Marker folder path
+    * @param instantTs Instant Timestamp
+   * @return
+   */
+  public String getMarkerFolderPath(String instantTs) {
+    return String.format("%s%s%s", getTempFolderPath(), File.separator, instantTs);
+  }
+
+  /**
    * @return Auxiliary Meta path
    */
   public String getMetaAuxiliaryPath() {
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
index 93e5ac7..abc2c61 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormat.java
@@ -19,9 +19,11 @@ package com.uber.hoodie.common.table.log;
 import com.uber.hoodie.common.model.HoodieLogFile;
 import com.uber.hoodie.common.table.log.block.HoodieLogBlock;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.collection.Pair;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.Optional;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,6 +50,8 @@ public interface HoodieLogFormat {
    */
   int currentVersion = 1;
 
+  String UNKNOWN_WRITE_TOKEN = "1-0-1";
+
   /**
    * Writer interface to allow appending block to this file format
    */
@@ -106,6 +110,10 @@ public interface HoodieLogFormat {
     private Integer logVersion;
     // Location of the directory containing the log
     private Path parentPath;
+    // Log File Write Token
+    private String logWriteToken;
+    // Rollover Log file write token
+    private String rolloverLogWriteToken;
 
     public WriterBuilder withBufferSize(int bufferSize) {
       this.bufferSize = bufferSize;
@@ -117,6 +125,16 @@ public interface HoodieLogFormat {
       return this;
     }
 
+    public WriterBuilder withLogWriteToken(String writeToken) {
+      this.logWriteToken = writeToken;
+      return this;
+    }
+
+    public WriterBuilder withRolloverLogWriteToken(String rolloverLogWriteToken) {
+      this.rolloverLogWriteToken = rolloverLogWriteToken;
+      return this;
+    }
+
     public WriterBuilder withFs(FileSystem fs) {
       this.fs = fs;
       return this;
@@ -169,17 +187,37 @@ public interface HoodieLogFormat {
       if (parentPath == null) {
         throw new IllegalArgumentException("Log file parent location is not specified");
       }
+
+      if (rolloverLogWriteToken == null) {
+        rolloverLogWriteToken = UNKNOWN_WRITE_TOKEN;
+      }
+
       if (logVersion == null) {
         log.info("Computing the next log version for " + logFileId + " in " + parentPath);
-        logVersion =
-            FSUtils.getCurrentLogVersion(fs, parentPath, logFileId, fileExtension, commitTime);
+        Optional<Pair<Integer, String>> versionAndWriteToken =
+            FSUtils.getLatestLogVersion(fs, parentPath, logFileId, fileExtension, commitTime);
+        if (versionAndWriteToken.isPresent()) {
+          logVersion = versionAndWriteToken.get().getKey();
+          logWriteToken = versionAndWriteToken.get().getValue();
+        } else {
+          logVersion = HoodieLogFile.LOGFILE_BASE_VERSION;
+          // this is the case where there is no existing log-file.
+          // Use rollover write token as write token to create new log file with tokens
+          logWriteToken = rolloverLogWriteToken;
+        }
         log.info(
             "Computed the next log version for " + logFileId + " in " + parentPath + " as "
-                + logVersion);
+                + logVersion + " with write-token " + logWriteToken);
+      }
+
+      if (logWriteToken == null) {
+        // This is the case where we have existing log-file with old format. rollover to avoid any conflicts
+        logVersion += 1;
+        logWriteToken = rolloverLogWriteToken;
       }
 
       Path logPath = new Path(parentPath,
-          FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion));
+          FSUtils.makeLogFileName(logFileId, fileExtension, commitTime, logVersion, logWriteToken));
       log.info("HoodieLogFile on path " + logPath);
       HoodieLogFile logFile = new HoodieLogFile(logPath);
 
@@ -192,9 +230,9 @@ public interface HoodieLogFormat {
       if (sizeThreshold == null) {
         sizeThreshold = DEFAULT_SIZE_THRESHOLD;
       }
-      return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold);
+      return new HoodieLogFormatWriter(fs, logFile, bufferSize, replication, sizeThreshold, logWriteToken,
+          rolloverLogWriteToken);
     }
-
   }
 
   static WriterBuilder newWriterBuilder() {
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java
index 28b2501..614f622 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieLogFormatWriter.java
@@ -48,6 +48,8 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
   private final long sizeThreshold;
   private final Integer bufferSize;
   private final Short replication;
+  private final String logWriteToken;
+  private final String rolloverLogWriteToken;
   private FSDataOutputStream output;
   private static final String APPEND_UNAVAILABLE_EXCEPTION_MESSAGE = "not sufficiently replicated yet";
 
@@ -59,14 +61,15 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
    * @param sizeThreshold
    */
   HoodieLogFormatWriter(FileSystem fs, HoodieLogFile logFile, Integer bufferSize,
-      Short replication, Long sizeThreshold)
+      Short replication, Long sizeThreshold, String logWriteToken, String rolloverLogWriteToken)
       throws IOException, InterruptedException {
     this.fs = fs;
     this.logFile = logFile;
     this.sizeThreshold = sizeThreshold;
     this.bufferSize = bufferSize;
     this.replication = replication;
-
+    this.logWriteToken = logWriteToken;
+    this.rolloverLogWriteToken = rolloverLogWriteToken;
     Path path = logFile.getPath();
     if (fs.exists(path)) {
       boolean isAppendSupported = StorageSchemes.isAppendSupported(fs.getScheme());
@@ -87,7 +90,7 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
         }
       }
       if (!isAppendSupported) {
-        this.logFile = logFile.rollOver(fs);
+        this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
         log.info("Append not supported.. Rolling over to " + logFile);
         createNewFile();
       }
@@ -180,10 +183,11 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
       // file).
       log.info("CurrentSize " + getCurrentSize() + " has reached threshold " + sizeThreshold
           + ". Rolling over to the next version");
-      HoodieLogFile newLogFile = logFile.rollOver(fs);
+      HoodieLogFile newLogFile = logFile.rollOver(fs, rolloverLogWriteToken);
       // close this writer and return the new writer
       close();
-      return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold);
+      return new HoodieLogFormatWriter(fs, newLogFile, bufferSize, replication, sizeThreshold, logWriteToken,
+          rolloverLogWriteToken);
     }
     return this;
   }
@@ -231,10 +235,15 @@ public class HoodieLogFormatWriter implements HoodieLogFormat.Writer {
       // last block. Find more information here : https://issues.apache.org/jira/browse/HDFS-6325
       log.warn("Failed to open an append stream to the log file. Opening a new log file..", e);
       // Rollover the current log file (since cannot get a stream handle) and create new one
-      this.logFile = logFile.rollOver(fs);
+      this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
+      createNewFile();
+    } else if (e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) {
+      log.warn("Another task executor writing to the same log file(" + logFile + ". Rolling over");
+      // Rollover the current log file (since cannot get a stream handle) and create new one
+      this.logFile = logFile.rollOver(fs, rolloverLogWriteToken);
       createNewFile();
-    } else if ((e.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName()) || e.getClassName()
-        .contentEquals(RecoveryInProgressException.class.getName())) && (fs instanceof DistributedFileSystem)) {
+    } else if (e.getClassName().contentEquals(RecoveryInProgressException.class.getName())
+        && (fs instanceof DistributedFileSystem)) {
       // this happens when either another task executor writing to this file died or
       // data node is going down. Note that we can only try to recover lease for a DistributedFileSystem.
       // ViewFileSystem unfortunately does not support this operation
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuard.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuard.java
new file mode 100644
index 0000000..825c597
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuard.java
@@ -0,0 +1,89 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.common.util;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Ensures file create/delete operation is visible
+ */
+public interface ConsistencyGuard {
+
+  /**
+   * File Visibility
+   */
+  enum FileVisibility {
+    APPEAR,
+    DISAPPEAR,
+  }
+
+  /**
+   * Wait for file to be listable based on configurable timeout
+   * @param filePath
+   * @throws IOException when having trouble listing the path
+   * @throws TimeoutException when retries exhausted
+   */
+  void waitTillFileAppears(Path filePath) throws IOException, TimeoutException;
+
+  /**
+   * Wait for file to be listable based on configurable timeout
+   * @param filePath
+   * @throws IOException when having trouble listing the path
+   * @throws TimeoutException when retries exhausted
+   */
+  void waitTillFileDisappears(Path filePath) throws IOException, TimeoutException;
+
+  /**
+   * Wait till all passed files belonging to a directory shows up in the listing
+   */
+  void waitTillAllFilesAppear(String dirPath, List<String> files) throws IOException, TimeoutException;
+
+  /**
+   * Wait till all passed files belonging to a directory disappears from listing
+   */
+  void waitTillAllFilesDisappear(String dirPath, List<String> files) throws IOException, TimeoutException;
+
+
+  /**
+   * Wait Till target visibility is reached
+   * @param dirPath  Directory Path
+   * @param files    Files
+   * @param targetVisibility Target Visibitlity
+   * @throws IOException
+   * @throws TimeoutException
+   */
+  default void waitTill(String dirPath, List<String> files, FileVisibility targetVisibility)
+      throws IOException, TimeoutException {
+    switch (targetVisibility) {
+      case APPEAR: {
+        waitTillAllFilesAppear(dirPath, files);
+        break;
+      }
+      case DISAPPEAR: {
+        waitTillAllFilesDisappear(dirPath, files);
+        break;
+      }
+      default:
+        throw new IllegalStateException("Unknown File Visibility");
+    }
+  }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
index b014710..a97cb51 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
@@ -16,21 +16,28 @@
 
 package com.uber.hoodie.common.util;
 
+import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN;
+
 import com.google.common.base.Preconditions;
+import com.uber.hoodie.common.model.HoodieFileFormat;
 import com.uber.hoodie.common.model.HoodieLogFile;
 import com.uber.hoodie.common.model.HoodiePartitionMetadata;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.exception.HoodieIOException;
 import com.uber.hoodie.exception.InvalidHoodiePathException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Optional;
+import java.util.UUID;
+import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
@@ -52,13 +59,15 @@ public class FSUtils {
 
   private static final Logger LOG = LogManager.getLogger(FSUtils.class);
   // Log files are of this pattern - .b5068208-e1a4-11e6-bf01-fe55135034f3_20170101134598.log.1
-  private static final Pattern LOG_FILE_PATTERN = Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)");
+  private static final Pattern LOG_FILE_PATTERN =
+      Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(_(([0-9]*)-([0-9]*)-([0-9]*)))?");
   private static final String LOG_FILE_PREFIX = ".";
   private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
   private static final long MIN_CLEAN_TO_KEEP = 10;
   private static final long MIN_ROLLBACK_TO_KEEP = 10;
   private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
 
+
   public static Configuration prepareHadoopConf(Configuration conf) {
     conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
     conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
@@ -76,7 +85,6 @@ public class FSUtils {
     return conf;
   }
 
-
   public static FileSystem getFs(String path, Configuration conf) {
     FileSystem fs;
     conf = prepareHadoopConf(conf);
@@ -92,26 +100,36 @@ public class FSUtils {
     return fs;
   }
 
-  public static String makeDataFileName(String commitTime, int taskPartitionId, String fileId) {
-    return String.format("%s_%d_%s.parquet", fileId, taskPartitionId, commitTime);
+  /**
+   * A write token uniquely identifies an attempt at one of the IOHandle operations (Merge/Create/Append)
+   */
+  public static String makeWriteToken(int taskPartitionId, int stageId, long taskAttemptId) {
+    return String.format("%d-%d-%d", taskPartitionId, stageId, taskAttemptId);
   }
 
-  public static String makeTempDataFileName(String partitionPath, String commitTime,
-      int taskPartitionId, String fileId, int stageId, long taskAttemptId) {
-    return String.format("%s_%s_%d_%s_%d_%d.parquet", partitionPath.replace("/", "-"), fileId,
-        taskPartitionId, commitTime, stageId, taskAttemptId);
+
+  public static String makeDataFileName(String commitTime, String writeToken, String fileId) {
+    return String.format("%s_%s_%s.parquet", fileId, writeToken, commitTime);
   }
 
-  public static String maskWithoutFileId(String commitTime, int taskPartitionId) {
-    return String.format("*_%s_%s.parquet", taskPartitionId, commitTime);
+  public static String makeMarkerFile(String commitTime, String writeToken, String fileId) {
+    return String.format("%s_%s_%s%s", fileId, writeToken, commitTime, MARKER_EXTN);
   }
 
-  public static String maskWithoutTaskPartitionId(String commitTime, String fileId) {
-    return String.format("%s_*_%s.parquet", fileId, commitTime);
+  public static String translateMarkerToDataPath(String basePath, String markerPath, String instantTs) {
+    Preconditions.checkArgument(markerPath.endsWith(MARKER_EXTN));
+    String markerRootPath = Path.getPathWithoutSchemeAndAuthority(new Path(
+        String.format("%s/%s/%s", basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTs))).toString();
+    int begin = markerPath.indexOf(markerRootPath);
+    Preconditions.checkArgument(begin >= 0, "Not in marker dir. Marker Path=" + markerPath
+        + ", Expected Marker Root=" + markerRootPath);
+    String rPath = markerPath.substring(begin + markerRootPath.length() + 1);
+    return String.format("%s/%s%s", basePath, rPath.replace(MARKER_EXTN, ""),
+        HoodieFileFormat.PARQUET.getFileExtension());
   }
 
-  public static String maskWithOnlyCommitTime(String commitTime) {
-    return String.format("*_*_%s.parquet", commitTime);
+  public static String maskWithoutFileId(String commitTime, int taskPartitionId) {
+    return String.format("*_%s_%s%s", taskPartitionId, commitTime, HoodieFileFormat.PARQUET.getFileExtension());
   }
 
   public static String getCommitFromCommitFile(String commitFileName) {
@@ -175,18 +193,43 @@ public class FSUtils {
    */
   public static List<String> getAllFoldersWithPartitionMetaFile(FileSystem fs, String basePathStr)
       throws IOException {
-    List<String> partitions = new ArrayList<>();
-    Path basePath = new Path(basePathStr);
-    RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(new Path(basePathStr), true);
-    while (allFiles.hasNext()) {
-      Path filePath = allFiles.next().getPath();
+    final Path basePath = new Path(basePathStr);
+    final List<String> partitions = new ArrayList<>();
+    processFiles(fs, basePathStr, (locatedFileStatus) -> {
+      Path filePath = locatedFileStatus.getPath();
       if (filePath.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE)) {
         partitions.add(getRelativePartitionPath(basePath, filePath.getParent()));
       }
-    }
+      return true;
+    });
     return partitions;
   }
 
+  public static final List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
+      String markerDir) throws IOException {
+    List<String> dataFiles = new LinkedList<>();
+    FSUtils.processFiles(fs, markerDir, (status) -> {
+      String pathStr = status.getPath().toString();
+      if (pathStr.endsWith(MARKER_EXTN)) {
+        dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs));
+      }
+      return true;
+    });
+    return dataFiles;
+  }
+
+  private static final void processFiles(FileSystem fs, String basePathStr,
+      Function<LocatedFileStatus, Boolean> consumer) throws IOException {
+    RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(new Path(basePathStr), true);
+    while (allFiles.hasNext()) {
+      LocatedFileStatus status = allFiles.next();
+      boolean success = consumer.apply(status);
+      if (!success) {
+        throw new HoodieException("Failed to process file-status=" + status);
+      }
+    }
+  }
+
   public static List<String> getAllPartitionPaths(FileSystem fs, String basePathStr,
       boolean assumeDatePartitioning)
       throws IOException {
@@ -208,6 +251,12 @@ public class FSUtils {
     return name.replace(getFileExtension(name), "");
   }
 
+  /**
+   * Returns a new unique prefix for creating a file group.
+   */
+  public static String createNewFileIdPfx() {
+    return UUID.randomUUID().toString();
+  }
 
   /**
    * Get the file extension from the log file
@@ -255,6 +304,53 @@ public class FSUtils {
   }
 
   /**
+   * Get TaskId used in log-path
+   */
+  public static Integer getTaskPartitionIdFromLogPath(Path path) {
+    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+    if (!matcher.find()) {
+      throw new InvalidHoodiePathException(path, "LogFile");
+    }
+    String val = matcher.group(7);
+    return val == null ? null : Integer.parseInt(val);
+  }
+
+  /**
+   * Get Write-Token used in log-path
+   */
+  public static String getWriteTokenFromLogPath(Path path) {
+    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+    if (!matcher.find()) {
+      throw new InvalidHoodiePathException(path, "LogFile");
+    }
+    return matcher.group(6);
+  }
+
+  /**
+   * Get StageId used in log-path
+   */
+  public static Integer getStageIdFromLogPath(Path path) {
+    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+    if (!matcher.find()) {
+      throw new InvalidHoodiePathException(path, "LogFile");
+    }
+    String val = matcher.group(8);
+    return val == null ? null : Integer.parseInt(val);
+  }
+
+  /**
+   * Get Task Attempt Id used in log-path
+   */
+  public static Integer getTaskAttemptIdFromLogPath(Path path) {
+    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+    if (!matcher.find()) {
+      throw new InvalidHoodiePathException(path, "LogFile");
+    }
+    String val = matcher.group(9);
+    return val == null ? null : Integer.parseInt(val);
+  }
+
+  /**
    * Get the last part of the file name in the log file and convert to int.
    */
   public static int getFileVersionFromLog(Path logPath) {
@@ -266,14 +362,10 @@ public class FSUtils {
   }
 
   public static String makeLogFileName(String fileId, String logFileExtension,
-      String baseCommitTime, int version) {
-    return LOG_FILE_PREFIX + String
-        .format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
-  }
-
-  public static String maskWithoutLogVersion(String commitTime, String fileId,
-      String logFileExtension) {
-    return LOG_FILE_PREFIX + String.format("%s_%s%s*", fileId, commitTime, logFileExtension);
+      String baseCommitTime, int version, String writeToken) {
+    String suffix = (writeToken == null) ? String.format("%s_%s%s.%d",fileId, baseCommitTime, logFileExtension, version)
+        : String.format("%s_%s%s.%d_%s", fileId, baseCommitTime, logFileExtension, version, writeToken);
+    return LOG_FILE_PREFIX + suffix;
   }
 
   public static boolean isLogFile(Path logPath) {
@@ -288,9 +380,7 @@ public class FSUtils {
    * Get the latest log file written from the list of log files passed in
    */
   public static Optional<HoodieLogFile> getLatestLogFile(Stream<HoodieLogFile> logFiles) {
-    return logFiles.sorted(Comparator
-        .comparing(s -> s.getLogVersion(),
-            Comparator.reverseOrder())).findFirst();
+    return logFiles.sorted(HoodieLogFile.getReverseLogFileComparator()).findFirst();
   }
 
   /**
@@ -308,36 +398,28 @@ public class FSUtils {
   /**
    * Get the latest log version for the fileId in the partition path
    */
-  public static Optional<Integer> getLatestLogVersion(FileSystem fs, Path partitionPath,
+  public static Optional<Pair<Integer, String>> getLatestLogVersion(FileSystem fs, Path partitionPath,
       final String fileId, final String logFileExtension, final String baseCommitTime)
       throws IOException {
     Optional<HoodieLogFile> latestLogFile =
         getLatestLogFile(
             getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime));
     if (latestLogFile.isPresent()) {
-      return Optional.of(latestLogFile.get().getLogVersion());
+      return Optional.of(Pair.of(latestLogFile.get().getLogVersion(),
+          getWriteTokenFromLogPath(latestLogFile.get().getPath())));
     }
     return Optional.empty();
   }
 
-  public static int getCurrentLogVersion(FileSystem fs, Path partitionPath,
-      final String fileId, final String logFileExtension, final String baseCommitTime)
-      throws IOException {
-    Optional<Integer> currentVersion =
-        getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
-    // handle potential overflow
-    return (currentVersion.isPresent()) ? currentVersion.get() : HoodieLogFile.LOGFILE_BASE_VERSION;
-  }
-
   /**
    * computes the next log version for the specified fileId in the partition path
    */
   public static int computeNextLogVersion(FileSystem fs, Path partitionPath, final String fileId,
       final String logFileExtension, final String baseCommitTime) throws IOException {
-    Optional<Integer> currentVersion =
+    Optional<Pair<Integer, String>>  currentVersionWithWriteToken =
         getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, baseCommitTime);
     // handle potential overflow
-    return (currentVersion.isPresent()) ? currentVersion.get() + 1
+    return (currentVersionWithWriteToken.isPresent()) ? currentVersionWithWriteToken.get().getKey() + 1
         : HoodieLogFile.LOGFILE_BASE_VERSION;
   }
 
@@ -349,10 +431,6 @@ public class FSUtils {
     return fs.getDefaultReplication(path);
   }
 
-  public static Long getDefaultBlockSize(FileSystem fs, Path path) {
-    return fs.getDefaultBlockSize(path);
-  }
-
   /**
    * When a file was opened and the task died without closing the stream, another task executor
    * cannot open because the existing lease will be active. We will try to recover the lease, from
@@ -431,8 +509,12 @@ public class FSUtils {
   }
 
   public static Path getPartitionPath(String basePath, String partitionPath) {
+    return getPartitionPath(new Path(basePath), partitionPath);
+  }
+
+  public static Path getPartitionPath(Path basePath, String partitionPath) {
     // FOr non-partitioned table, return only base-path
-    return ((partitionPath == null) || (partitionPath.isEmpty())) ? new Path(basePath) :
+    return ((partitionPath == null) || (partitionPath.isEmpty())) ? basePath :
         new Path(basePath, partitionPath);
   }
 }
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
new file mode 100644
index 0000000..12b4762
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
@@ -0,0 +1,200 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.common.util;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * A consistency checker that fails if it is unable to meet the required condition within a specified timeout
+ */
+public class FailSafeConsistencyGuard implements ConsistencyGuard {
+
+  private static final transient Logger log = LogManager.getLogger(FailSafeConsistencyGuard.class);
+
+  private final FileSystem fs;
+  private final int maxAttempts;
+  private final long initialDelayMs;
+  private final long maxDelayMs;
+
+  public FailSafeConsistencyGuard(FileSystem fs, int maxAttempts, long initalDelayMs, long maxDelayMs) {
+    this.fs = fs;
+    this.maxAttempts = maxAttempts;
+    this.initialDelayMs = initalDelayMs;
+    this.maxDelayMs = maxDelayMs;
+  }
+
+  @Override
+  public void waitTillFileAppears(Path filePath) throws TimeoutException {
+    waitForFileVisibility(filePath,  FileVisibility.APPEAR);
+  }
+
+  @Override
+  public void waitTillFileDisappears(Path filePath)
+      throws TimeoutException {
+    waitForFileVisibility(filePath, FileVisibility.DISAPPEAR);
+  }
+
+  @Override
+  public void waitTillAllFilesAppear(String dirPath, List<String> files) throws TimeoutException {
+    waitForFilesVisibility(dirPath, files, FileVisibility.APPEAR);
+  }
+
+  @Override
+  public void waitTillAllFilesDisappear(String dirPath, List<String> files) throws TimeoutException {
+    waitForFilesVisibility(dirPath, files, FileVisibility.DISAPPEAR);
+  }
+
+  /**
+   * Helper function to wait for all files belonging to single directory to appear
+   * @param dirPath Dir Path
+   * @param files Files to appear/disappear
+   * @param event Appear/Disappear
+   * @throws TimeoutException
+   */
+  public void waitForFilesVisibility(String dirPath, List<String> files, FileVisibility event)
+      throws TimeoutException {
+    Path dir = new Path(dirPath);
+    List<String> filesWithoutSchemeAndAuthority =
+        files.stream().map(f -> Path.getPathWithoutSchemeAndAuthority(new Path(f))).map(p -> p.toString())
+            .collect(Collectors.toList());
+
+    retryTillSuccess((retryNum) -> {
+      try {
+        log.info("Trying " + retryNum);
+        FileStatus[] entries = fs.listStatus(dir);
+        List<String> gotFiles = Arrays.stream(entries).map(e -> Path.getPathWithoutSchemeAndAuthority(e.getPath()))
+            .map(p -> p.toString()).collect(Collectors.toList());
+        List<String> candidateFiles = new ArrayList<>(filesWithoutSchemeAndAuthority);
+        boolean altered = candidateFiles.removeAll(gotFiles);
+
+        switch (event) {
+          case DISAPPEAR:
+            log.info("Following files are visible" + candidateFiles);
+            // If no candidate files gets removed, it means all of them have disappeared
+            return !altered;
+          case APPEAR:
+          default:
+            // if all files appear, the list is empty
+            return candidateFiles.isEmpty();
+        }
+      } catch (IOException ioe) {
+        log.warn("Got IOException waiting for file event. Have tried " + retryNum + " time(s)", ioe);
+      }
+      return false;
+    }, "Timed out waiting for filles to become visible");
+  }
+
+  /**
+   * Helper to check of file visibility
+   * @param filePath File Path
+   * @param visibility Visibility
+   * @return
+   * @throws IOException
+   */
+  private boolean checkFileVisibility(Path filePath, FileVisibility visibility) throws IOException {
+    try {
+      FileStatus[] status = fs.listStatus(filePath);
+      switch (visibility) {
+        case APPEAR:
+          return status.length != 0;
+        case DISAPPEAR:
+        default:
+          return status.length == 0;
+      }
+    } catch (FileNotFoundException nfe) {
+      switch (visibility) {
+        case APPEAR:
+          return false;
+        case DISAPPEAR:
+        default:
+          return true;
+      }
+    }
+  }
+
+  /**
+   * Helper function to wait till file either appears/disappears
+   * @param filePath File Path
+   * @param visibility
+   * @throws TimeoutException
+   */
+  private void waitForFileVisibility(Path filePath, FileVisibility visibility) throws TimeoutException {
+    long waitMs = initialDelayMs;
+    int attempt = 0;
+    while (attempt < maxAttempts) {
+      try {
+        if (checkFileVisibility(filePath, visibility)) {
+          return;
+        }
+      } catch (IOException ioe) {
+        log.warn("Got IOException waiting for file visibility. Retrying", ioe);
+      }
+
+      sleepSafe(waitMs);
+      waitMs = waitMs * 2; // double check interval every attempt
+      waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs;
+      attempt++;
+    }
+    throw new TimeoutException("Timed-out waiting for the file to " + visibility.name());
+  }
+
+  /**
+   * Retries the predicate for condfigurable number of times till we the predicate returns success
+   * @param predicate Predicate Function
+   * @param timedOutMessage Timed-Out message for logging
+   * @throws TimeoutException when retries are exhausted
+   */
+  private void retryTillSuccess(Function<Integer, Boolean> predicate, String timedOutMessage) throws TimeoutException {
+    long waitMs = initialDelayMs;
+    int attempt = 0;
+    log.warn("Max Attempts=" + maxAttempts);
+    while (attempt < maxAttempts) {
+      boolean success = predicate.apply(attempt);
+      if (success) {
+        return;
+      }
+      sleepSafe(waitMs);
+      waitMs = waitMs * 2; // double check interval every attempt
+      waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs;
+      attempt++;
+    }
+    throw new TimeoutException(timedOutMessage);
+
+  }
+
+  void sleepSafe(long waitMs) {
+    try {
+      Thread.sleep(waitMs);
+    } catch (InterruptedException e) {
+      // ignore & continue next attempt
+    }
+  }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/NoOpConsistencyGuard.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/NoOpConsistencyGuard.java
new file mode 100644
index 0000000..acc20b7
--- /dev/null
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/NoOpConsistencyGuard.java
@@ -0,0 +1,46 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.common.util;
+
+import java.util.List;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Default Consistency guard that does nothing. Used for HDFS deployments
+ */
+public class NoOpConsistencyGuard implements ConsistencyGuard {
+
+  @Override
+  public void waitTillFileAppears(Path filePath) {
+  }
+
+  @Override
+  public void waitTillFileDisappears(Path filePath) {
+  }
+
+  @Override
+  public void waitTillAllFilesAppear(String dirPath, List<String> files) {
+
+  }
+
+  @Override
+  public void waitTillAllFilesDisappear(String dirPath, List<String> files) {
+
+  }
+}
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
index a112a6d..24c4868 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ParquetUtils.java
@@ -113,7 +113,6 @@ public class ParquetUtils {
     return footer;
   }
 
-
   /**
    * Get the schema of the given parquet file.
    */
@@ -121,7 +120,6 @@ public class ParquetUtils {
     return readMetadata(configuration, parquetFilePath).getFileMetaData().getSchema();
   }
 
-
   private static List<String> readParquetFooter(Configuration configuration, Path parquetFilePath,
       String... footerNames) {
     List<String> footerVals = new ArrayList<>();
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
index e8b287a..4627fe3 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
@@ -79,7 +79,8 @@ public class HoodieTestUtils {
 
   public static final String TEST_EXTENSION = ".test";
   public static final String RAW_TRIPS_TEST_NAME = "raw_trips";
-  public static final int DEFAULT_TASK_PARTITIONID = 1;
+  public static final String DEFAULT_WRITE_TOKEN = "1-0-1";
+  public static final int DEFAULT_LOG_VERSION = 1;
   public static final String[] DEFAULT_PARTITION_PATHS = {"2016/03/15", "2015/03/16", "2015/03/17"};
   private static Random rand = new Random(46474747);
 
@@ -92,8 +93,7 @@ public class HoodieTestUtils {
     return init(basePath, HoodieTableType.COPY_ON_WRITE);
   }
 
-  public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType)
-      throws IOException {
+  public static HoodieTableMetaClient init(String basePath, HoodieTableType tableType) throws IOException {
     return initTableType(getDefaultHadoopConf(), basePath, tableType);
   }
 
@@ -163,14 +163,30 @@ public class HoodieTestUtils {
     return createDataFile(basePath, partitionPath, commitTime, fileID);
   }
 
+  public static final String createNewMarkerFile(String basePath, String partitionPath, String commitTime)
+      throws IOException {
+    String fileID = UUID.randomUUID().toString();
+    return createMarkerFile(basePath, partitionPath, commitTime, fileID);
+  }
+
   public static final String createDataFile(String basePath, String partitionPath, String commitTime, String fileID)
       throws IOException {
     String folderPath = basePath + "/" + partitionPath + "/";
     new File(folderPath).mkdirs();
-    new File(folderPath + FSUtils.makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID)).createNewFile();
+    new File(folderPath + FSUtils.makeDataFileName(commitTime, DEFAULT_WRITE_TOKEN, fileID)).createNewFile();
     return fileID;
   }
 
+  public static final String createMarkerFile(String basePath, String partitionPath, String commitTime, String fileID)
+      throws IOException {
+    String folderPath = basePath + "/" + HoodieTableMetaClient.TEMPFOLDER_NAME + "/" + commitTime + "/"
+        + partitionPath + "/";
+    new File(folderPath).mkdirs();
+    File f = new File(folderPath + FSUtils.makeMarkerFile(commitTime, DEFAULT_WRITE_TOKEN, fileID));
+    f.createNewFile();
+    return f.getAbsolutePath();
+  }
+
   public static final String createNewLogFile(FileSystem fs, String basePath, String partitionPath, String commitTime,
       String fileID, Optional<Integer> version) throws IOException {
     String folderPath = basePath + "/" + partitionPath + "/";
@@ -179,7 +195,9 @@ public class HoodieTestUtils {
       throw new IOException("cannot create directory for path " + folderPath);
     }
     boolean createFile = fs.createNewFile(new Path(
-        folderPath + FSUtils.makeLogFileName(fileID, ".log", commitTime, version.orElse(DEFAULT_TASK_PARTITIONID))));
+        folderPath + FSUtils
+            .makeLogFileName(fileID, ".log", commitTime, version.orElse(DEFAULT_LOG_VERSION),
+                HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
     if (!createFile) {
       throw new IOException(
           StringUtils.format("cannot create data file for commit %s and fileId %s", commitTime, fileID));
@@ -208,39 +226,38 @@ public class HoodieTestUtils {
         AvroUtils.serializeCompactionPlan(plan));
   }
 
-  public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID)
-      throws IOException {
+  public static final String getDataFilePath(String basePath, String partitionPath, String commitTime, String fileID) {
     return basePath + "/" + partitionPath + "/" + FSUtils
-        .makeDataFileName(commitTime, DEFAULT_TASK_PARTITIONID, fileID);
+        .makeDataFileName(commitTime, DEFAULT_WRITE_TOKEN, fileID);
   }
 
   public static final String getLogFilePath(String basePath, String partitionPath, String commitTime, String fileID,
-      Optional<Integer> version) throws IOException {
+      Optional<Integer> version) {
     return basePath + "/" + partitionPath + "/" + FSUtils.makeLogFileName(fileID, ".log", commitTime,
-        version.orElse(DEFAULT_TASK_PARTITIONID));
+        version.orElse(DEFAULT_LOG_VERSION), HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
   }
 
-  public static final String getCommitFilePath(String basePath, String commitTime) throws IOException {
+  public static final String getCommitFilePath(String basePath, String commitTime) {
     return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime + HoodieTimeline.COMMIT_EXTENSION;
   }
 
-  public static final String getInflightCommitFilePath(String basePath, String commitTime) throws IOException {
+  public static final String getInflightCommitFilePath(String basePath, String commitTime) {
     return basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitTime
         + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
   }
 
-  public static final String getRequestedCompactionFilePath(String basePath, String commitTime) throws IOException {
+  public static final String getRequestedCompactionFilePath(String basePath, String commitTime) {
     return basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + commitTime
         + HoodieTimeline.INFLIGHT_COMMIT_EXTENSION;
   }
 
-  public static final boolean doesDataFileExist(String basePath, String partitionPath, String commitTime, String fileID)
-      throws IOException {
+  public static final boolean doesDataFileExist(String basePath, String partitionPath, String commitTime,
+      String fileID) {
     return new File(getDataFilePath(basePath, partitionPath, commitTime, fileID)).exists();
   }
 
   public static final boolean doesLogFileExist(String basePath, String partitionPath, String commitTime, String fileID,
-      Optional<Integer> version) throws IOException {
+      Optional<Integer> version) {
     return new File(getLogFilePath(basePath, partitionPath, commitTime, fileID, version)).exists();
   }
 
@@ -256,10 +273,6 @@ public class HoodieTestUtils {
         .exists();
   }
 
-  public static String makeInflightTestFileName(String instant) {
-    return instant + TEST_EXTENSION + HoodieTimeline.INFLIGHT_EXTENSION;
-  }
-
   public static void createCleanFiles(String basePath, String commitTime, Configuration configuration)
       throws IOException {
     Path commitFile = new Path(
@@ -395,4 +408,4 @@ public class HoodieTestUtils {
     }
     return writeStatList;
   }
-}
+}
\ No newline at end of file
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieWriteStat.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieWriteStat.java
index a462a6d..7d35895 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieWriteStat.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieWriteStat.java
@@ -35,27 +35,20 @@ public class TestHoodieWriteStat {
     String basePathString = "/data/tables/some-hoodie-table";
     String partitionPathString = "2017/12/31";
     String fileName = UUID.randomUUID().toString();
-    int taskPartitionId = Integer.MAX_VALUE;
-    int stageId = Integer.MAX_VALUE;
-    long taskAttemptId = Long.MAX_VALUE;
+    String writeToken = "1-0-1";
 
     Path basePath = new Path(basePathString);
     Path partitionPath = new Path(basePath, partitionPathString);
     Path tempPath = new Path(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME);
 
-    Path finalizeFilePath = new Path(partitionPath, FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName));
-    Path tempFilePath = new Path(tempPath, FSUtils
-        .makeTempDataFileName(partitionPathString, commitTime, taskPartitionId,
-            fileName, stageId, taskAttemptId));
-
+    Path finalizeFilePath = new Path(partitionPath, FSUtils.makeDataFileName(commitTime, writeToken, fileName));
     HoodieWriteStat writeStat = new HoodieWriteStat();
-    writeStat.setPaths(basePath, finalizeFilePath, tempFilePath);
+    writeStat.setPath(basePath, finalizeFilePath);
     assertEquals(finalizeFilePath, new Path(basePath, writeStat.getPath()));
-    assertEquals(tempFilePath, new Path(basePath, writeStat.getTempPath()));
 
     // test for null tempFilePath
     writeStat = new HoodieWriteStat();
-    writeStat.setPaths(basePath, finalizeFilePath, null);
+    writeStat.setPath(basePath, finalizeFilePath);
     assertEquals(finalizeFilePath, new Path(basePath, writeStat.getPath()));
     assertNull(writeStat.getTempPath());
   }
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
index 602840c..ab5a21b 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/log/HoodieLogFormatTest.java
@@ -19,6 +19,7 @@ package com.uber.hoodie.common.table.log;
 import static com.uber.hoodie.common.util.SchemaTestUtil.getSimpleSchema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -176,6 +177,63 @@ public class HoodieLogFormatTest {
   }
 
   @Test
+  public void testConcurrentAppendOnExistingLogFileWithoutWriteToken() throws Exception {
+    testConcurrentAppend(true, false);
+  }
+
+  @Test
+  public void testConcurrentAppendOnExistingLogFileWithWriteToken() throws Exception {
+    testConcurrentAppend(true, true);
+  }
+
+  @Test
+  public void testConcurrentAppendOnFirstLogFileVersion() throws Exception {
+    testConcurrentAppend(false, true);
+  }
+
+  private void testConcurrentAppend(boolean logFileExists, boolean newLogFileFormat) throws Exception {
+    HoodieLogFormat.WriterBuilder builder1 = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
+        .overBaseCommit("100").withFs(fs);
+    HoodieLogFormat.WriterBuilder builder2 = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
+        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
+        .overBaseCommit("100").withFs(fs);
+
+    if (newLogFileFormat && logFileExists) {
+      // Assume there is an existing log-file with write token
+      builder1 = builder1.withLogVersion(1).withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN)
+          .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+      builder2 = builder2.withLogVersion(1).withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN)
+          .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+    } else if (newLogFileFormat) {
+      // First log file of the file-slice
+      builder1 = builder1.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
+          .withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN)
+          .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+      builder2 = builder2.withLogVersion(HoodieLogFile.LOGFILE_BASE_VERSION)
+          .withLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN)
+          .withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+    } else {
+      builder1 = builder1.withLogVersion(1).withRolloverLogWriteToken(HoodieLogFormat.UNKNOWN_WRITE_TOKEN);
+    }
+    Writer writer = builder1.build();
+    Writer writer2 = builder2.build();
+    HoodieLogFile logFile1 = writer.getLogFile();
+    HoodieLogFile logFile2 = writer2.getLogFile();
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, getSimpleSchema().toString());
+    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    writer = writer.appendBlock(dataBlock);
+    writer2 = writer2.appendBlock(dataBlock);
+    writer.close();
+    writer2.close();
+    assertNotNull(logFile1.getLogWriteToken());
+    assertEquals("Log Files must have different versions", logFile1.getLogVersion(), logFile2.getLogVersion() - 1);
+  }
+
+  @Test
   public void testMultipleAppend() throws IOException, URISyntaxException, InterruptedException {
     Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
         .withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId("test-fileid1")
@@ -225,6 +283,12 @@ public class HoodieLogFormatTest {
     }
   }
 
+  /**
+   * This is actually a test on concurrent append and not recovery lease.
+   * Commenting this out.
+   * https://issues.apache.org/jira/browse/HUDI-117
+   */
+  /**
   @Test
   public void testLeaseRecovery() throws IOException, URISyntaxException, InterruptedException {
     Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath)
@@ -253,6 +317,7 @@ public class HoodieLogFormatTest {
         fs.getFileStatus(writer.getLogFile().getPath()).getLen());
     writer.close();
   }
+  **/
 
   @Test
   public void testAppendNotSupported() throws IOException, URISyntaxException, InterruptedException {
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
index f11acac..cd26a2b 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/HoodieTableFileSystemViewTest.java
@@ -65,6 +65,8 @@ import org.junit.rules.TemporaryFolder;
 @SuppressWarnings("ResultOfMethodCallIgnored")
 public class HoodieTableFileSystemViewTest {
 
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+
   protected HoodieTableMetaClient metaClient;
   protected String basePath;
   protected SyncableFileSystemView fsView;
@@ -119,8 +121,10 @@ public class HoodieTableFileSystemViewTest {
     String instantTime1 = "1";
     String deltaInstantTime1 = "2";
     String deltaInstantTime2 = "3";
-    String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
-    String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 1);
+    String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        instantTime1, 0, TEST_WRITE_TOKEN);
+    String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        instantTime1, 1, TEST_WRITE_TOKEN);
     new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
     new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
     HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
@@ -248,11 +252,13 @@ public class HoodieTableFileSystemViewTest {
 
     String dataFileName = null;
     if (!skipCreatingDataFile) {
-      dataFileName = FSUtils.makeDataFileName(instantTime1, 1, fileId);
+      dataFileName = FSUtils.makeDataFileName(instantTime1, TEST_WRITE_TOKEN, fileId);
       new File(basePath + "/" + partitionPath + "/" + dataFileName).createNewFile();
     }
-    String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
-    String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 1);
+    String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        instantTime1, 0, TEST_WRITE_TOKEN);
+    String fileName2 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        instantTime1, 1, TEST_WRITE_TOKEN);
     new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
     new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
     HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();
@@ -267,7 +273,7 @@ public class HoodieTableFileSystemViewTest {
     refreshFsView();
     List<FileSlice> fileSlices = rtView.getLatestFileSlices(partitionPath).collect(Collectors.toList());
     String compactionRequestedTime = "4";
-    String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId);
+    String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, TEST_WRITE_TOKEN, fileId);
     List<Pair<String, FileSlice>> partitionFileSlicesPairs = new ArrayList<>();
     partitionFileSlicesPairs.add(Pair.of(partitionPath, fileSlices.get(0)));
     HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs,
@@ -299,8 +305,10 @@ public class HoodieTableFileSystemViewTest {
     String deltaInstantTime5 = "6";
     List<String> allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2,
         compactionRequestedTime, deltaInstantTime4, deltaInstantTime5);
-    String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0);
-    String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 1);
+    String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        compactionRequestedTime, 0, TEST_WRITE_TOKEN);
+    String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        compactionRequestedTime, 1, TEST_WRITE_TOKEN);
     new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile();
     new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile();
     HoodieInstant deltaInstant4 = new HoodieInstant(true, HoodieTimeline.DELTA_COMMIT_ACTION, deltaInstantTime4);
@@ -400,15 +408,15 @@ public class HoodieTableFileSystemViewTest {
     final String orphanFileId2 = UUID.randomUUID().toString();
     final String invalidInstantId = "INVALIDTIME";
     String inflightDeltaInstantTime = "7";
-    String orphanDataFileName = FSUtils.makeDataFileName(invalidInstantId, 1, orphanFileId1);
+    String orphanDataFileName = FSUtils.makeDataFileName(invalidInstantId, TEST_WRITE_TOKEN, orphanFileId1);
     new File(basePath + "/" + partitionPath + "/" + orphanDataFileName).createNewFile();
     String orphanLogFileName =
-        FSUtils.makeLogFileName(orphanFileId2, HoodieLogFile.DELTA_EXTENSION, invalidInstantId, 0);
+        FSUtils.makeLogFileName(orphanFileId2, HoodieLogFile.DELTA_EXTENSION, invalidInstantId, 0, TEST_WRITE_TOKEN);
     new File(basePath + "/" + partitionPath + "/" + orphanLogFileName).createNewFile();
-    String inflightDataFileName = FSUtils.makeDataFileName(inflightDeltaInstantTime, 1, inflightFileId1);
+    String inflightDataFileName = FSUtils.makeDataFileName(inflightDeltaInstantTime, TEST_WRITE_TOKEN, inflightFileId1);
     new File(basePath + "/" + partitionPath + "/" + inflightDataFileName).createNewFile();
-    String inflightLogFileName =
-        FSUtils.makeLogFileName(inflightFileId2, HoodieLogFile.DELTA_EXTENSION, inflightDeltaInstantTime, 0);
+    String inflightLogFileName = FSUtils.makeLogFileName(inflightFileId2, HoodieLogFile.DELTA_EXTENSION,
+        inflightDeltaInstantTime, 0, TEST_WRITE_TOKEN);
     new File(basePath + "/" + partitionPath + "/" + inflightLogFileName).createNewFile();
     // Mark instant as inflight
     commitTimeline.saveToInflight(new HoodieInstant(State.INFLIGHT, HoodieTimeline.DELTA_COMMIT_ACTION,
@@ -558,7 +566,7 @@ public class HoodieTableFileSystemViewTest {
 
     // Only one commit, but is not safe
     String commitTime1 = "1";
-    String fileName1 = FSUtils.makeDataFileName(commitTime1, 1, fileId);
+    String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId);
     new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
     refreshFsView();
     assertFalse("No commit, should not find any data file",
@@ -576,7 +584,7 @@ public class HoodieTableFileSystemViewTest {
 
     // Do another commit, but not safe
     String commitTime2 = "2";
-    String fileName2 = FSUtils.makeDataFileName(commitTime2, 1, fileId);
+    String fileName2 = FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId);
     new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();
     refreshFsView();
     assertEquals("", fileName1,
@@ -610,21 +618,21 @@ public class HoodieTableFileSystemViewTest {
     String fileId3 = UUID.randomUUID().toString();
     String fileId4 = UUID.randomUUID().toString();
 
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))
-        .createNewFile();
-    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 1))
-        .createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0))
-        .createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0))
-        .createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+        commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+        commitTime4, 1, TEST_WRITE_TOKEN)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2,
+        HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION,
+        commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile();
 
     new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -653,9 +661,9 @@ public class HoodieTableFileSystemViewTest {
     for (HoodieDataFile status : dataFileList) {
       filenames.add(status.getFileName());
     }
-    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId1)));
-    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
-    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3)));
+    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)));
+    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
+    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)));
 
     filenames = Sets.newHashSet();
     List<HoodieLogFile> logFilesList = rtView.getLatestFileSlicesBeforeOrOn("2016/05/01", commitTime4)
@@ -665,10 +673,14 @@ public class HoodieTableFileSystemViewTest {
     for (HoodieLogFile logFile : logFilesList) {
       filenames.add(logFile.getFileName());
     }
-    assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)));
-    assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 1)));
-    assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0)));
-    assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)));
+    assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+        commitTime4, 0, TEST_WRITE_TOKEN)));
+    assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+        commitTime4, 1, TEST_WRITE_TOKEN)));
+    assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION,
+        commitTime3, 0, TEST_WRITE_TOKEN)));
+    assertTrue(filenames.contains(FSUtils.makeLogFileName(fileId4, HoodieLogFile.DELTA_EXTENSION,
+        commitTime4, 0, TEST_WRITE_TOKEN)));
 
     // Reset the max commit time
     List<HoodieDataFile> dataFiles = roView.getLatestDataFilesBeforeOrOn("2016/05/01", commitTime3)
@@ -679,12 +691,12 @@ public class HoodieTableFileSystemViewTest {
     }
     if (!isLatestFileSliceOnly) {
       assertEquals(3, dataFiles.size());
-      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1)));
-      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
-      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3)));
+      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)));
+      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
+      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)));
     } else {
       assertEquals(1, dataFiles.size());
-      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
+      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
     }
 
     logFilesList =
@@ -692,7 +704,7 @@ public class HoodieTableFileSystemViewTest {
             .flatMap(logFileList -> logFileList).collect(Collectors.toList());
     assertEquals(logFilesList.size(), 1);
     assertTrue(logFilesList.get(0).getFileName()
-        .equals(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0)));
+        .equals(FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0, TEST_WRITE_TOKEN)));
 
   }
 
@@ -713,13 +725,13 @@ public class HoodieTableFileSystemViewTest {
     String fileId2 = UUID.randomUUID().toString();
     String fileId3 = UUID.randomUUID().toString();
 
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
 
     new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -744,22 +756,22 @@ public class HoodieTableFileSystemViewTest {
       Set<String> expFileNames = new HashSet<>();
       if (fileId.equals(fileId1)) {
         if (!isLatestFileSliceOnly) {
-          expFileNames.add(FSUtils.makeDataFileName(commitTime1, 1, fileId1));
+          expFileNames.add(FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1));
         }
-        expFileNames.add(FSUtils.makeDataFileName(commitTime4, 1, fileId1));
+        expFileNames.add(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1));
         assertEquals(expFileNames, filenames);
       } else if (fileId.equals(fileId2)) {
         if (!isLatestFileSliceOnly) {
-          expFileNames.add(FSUtils.makeDataFileName(commitTime1, 1, fileId2));
-          expFileNames.add(FSUtils.makeDataFileName(commitTime2, 1, fileId2));
+          expFileNames.add(FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2));
+          expFileNames.add(FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2));
         }
-        expFileNames.add(FSUtils.makeDataFileName(commitTime3, 1, fileId2));
+        expFileNames.add(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2));
         assertEquals(expFileNames, filenames);
       } else {
         if (!isLatestFileSliceOnly) {
-          expFileNames.add(FSUtils.makeDataFileName(commitTime3, 1, fileId3));
+          expFileNames.add(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3));
         }
-        expFileNames.add(FSUtils.makeDataFileName(commitTime4, 1, fileId3));
+        expFileNames.add(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3));
         assertEquals(expFileNames, filenames);
       }
     }
@@ -782,19 +794,19 @@ public class HoodieTableFileSystemViewTest {
     String fileId2 = UUID.randomUUID().toString();
     String fileId3 = UUID.randomUUID().toString();
 
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0))
-        .createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+        commitTime1, 0, TEST_WRITE_TOKEN)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId1)).createNewFile();
 
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime3, 0))
-        .createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION,
+        commitTime3, 0, TEST_WRITE_TOKEN)).createNewFile();
 
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
 
     new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -817,10 +829,10 @@ public class HoodieTableFileSystemViewTest {
       filenames.add(status.getFileName());
     }
 
-    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId1)));
-    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
+    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId1)));
+    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
     if (!isLatestFileSliceOnly) {
-      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId3)));
+      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)));
     }
 
     List<FileSlice> slices = rtView.getLatestFileSliceInRange(Lists.newArrayList(commitTime3, commitTime4))
@@ -861,13 +873,13 @@ public class HoodieTableFileSystemViewTest {
     String fileId2 = UUID.randomUUID().toString();
     String fileId3 = UUID.randomUUID().toString();
 
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId1)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
-    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
+    new File(fullPartitionPath + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
 
     new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -887,8 +899,8 @@ public class HoodieTableFileSystemViewTest {
       for (HoodieDataFile status : dataFiles) {
         filenames.add(status.getFileName());
       }
-      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, 1, fileId1)));
-      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime2, 1, fileId2)));
+      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1)));
+      assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)));
     } else {
       assertEquals(0, dataFiles.size());
     }
@@ -912,28 +924,31 @@ public class HoodieTableFileSystemViewTest {
     String fileId2 = UUID.randomUUID().toString();
     String fileId3 = UUID.randomUUID().toString();
 
-    new File(fullPartitionPath + "/" + FSUtils.makeDataFileName(commitTime1, 1, fileId1)).createNewFile();
+    new File(fullPartitionPath + "/" + FSUtils.makeDataFileName(commitTime1,
+        TEST_WRITE_TOKEN, fileId1)).createNewFile();
     new File(fullPartitionPath + "/"
-        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime1, 0)).createNewFile();
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+        commitTime1, 0, TEST_WRITE_TOKEN)).createNewFile();
     new File(fullPartitionPath + "/"
-        + FSUtils.makeDataFileName(commitTime4, 1, fileId1)).createNewFile();
+        + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)).createNewFile();
     new File(fullPartitionPath + "/"
-        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION, commitTime4, 0)).createNewFile();
+        + FSUtils.makeLogFileName(fileId1, HoodieLogFile.DELTA_EXTENSION,
+        commitTime4, 0, TEST_WRITE_TOKEN)).createNewFile();
 
     new File(fullPartitionPath + "/"
-        + FSUtils.makeDataFileName(commitTime1, 1, fileId2)).createNewFile();
+        + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2)).createNewFile();
     new File(fullPartitionPath + "/"
-        + FSUtils.makeDataFileName(commitTime2, 1, fileId2)).createNewFile();
+        + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId2)).createNewFile();
     new File(fullPartitionPath + "/"
-        + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0))
+        + FSUtils.makeLogFileName(fileId2, HoodieLogFile.DELTA_EXTENSION, commitTime2, 0, TEST_WRITE_TOKEN))
         .createNewFile();
     new File(fullPartitionPath + "/"
-        + FSUtils.makeDataFileName(commitTime3, 1, fileId2)).createNewFile();
+        + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)).createNewFile();
 
     new File(fullPartitionPath + "/"
-        + FSUtils.makeDataFileName(commitTime3, 1, fileId3)).createNewFile();
+        + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId3)).createNewFile();
     new File(fullPartitionPath + "/"
-        + FSUtils.makeDataFileName(commitTime4, 1, fileId3)).createNewFile();
+        + FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)).createNewFile();
 
     new File(basePath + "/.hoodie/" + commitTime1 + ".commit").createNewFile();
     new File(basePath + "/.hoodie/" + commitTime2 + ".commit").createNewFile();
@@ -979,9 +994,9 @@ public class HoodieTableFileSystemViewTest {
     for (HoodieDataFile status : statuses1) {
       filenames.add(status.getFileName());
     }
-    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId1)));
-    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, 1, fileId2)));
-    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, 1, fileId3)));
+    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId1)));
+    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId2)));
+    assertTrue(filenames.contains(FSUtils.makeDataFileName(commitTime4, TEST_WRITE_TOKEN, fileId3)));
   }
 
   @Test
@@ -1002,16 +1017,17 @@ public class HoodieTableFileSystemViewTest {
     String deltaInstantTime2 = "3";
     String fileId = UUID.randomUUID().toString();
 
-    String dataFileName = FSUtils.makeDataFileName(instantTime1, 1, fileId);
+    String dataFileName = FSUtils.makeDataFileName(instantTime1, TEST_WRITE_TOKEN, fileId);
     new File(fullPartitionPath1 + dataFileName).createNewFile();
 
-    String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, instantTime1, 0);
+    String fileName1 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        instantTime1, 0, TEST_WRITE_TOKEN);
     new File(fullPartitionPath1 + fileName1)
         .createNewFile();
-    new File(fullPartitionPath2 + FSUtils.makeDataFileName(instantTime1, 1, fileId)).createNewFile();
+    new File(fullPartitionPath2 + FSUtils.makeDataFileName(instantTime1, TEST_WRITE_TOKEN, fileId)).createNewFile();
     new File(fullPartitionPath2 + fileName1)
         .createNewFile();
-    new File(fullPartitionPath3 + FSUtils.makeDataFileName(instantTime1, 1, fileId)).createNewFile();
+    new File(fullPartitionPath3 + FSUtils.makeDataFileName(instantTime1, TEST_WRITE_TOKEN, fileId)).createNewFile();
     new File(fullPartitionPath3 + fileName1)
         .createNewFile();
 
@@ -1052,7 +1068,7 @@ public class HoodieTableFileSystemViewTest {
     partitionFileSlicesPairs.add(Pair.of(partitionPath3, fileSlices.get(0)));
 
     String compactionRequestedTime = "2";
-    String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, 1, fileId);
+    String compactDataFileName = FSUtils.makeDataFileName(compactionRequestedTime, TEST_WRITE_TOKEN, fileId);
     HoodieCompactionPlan compactionPlan = CompactionUtils.buildFromFileSlices(partitionFileSlicesPairs,
         Optional.empty(), Optional.empty());
 
@@ -1072,8 +1088,10 @@ public class HoodieTableFileSystemViewTest {
     String deltaInstantTime5 = "6";
     List<String> allInstantTimes = Arrays.asList(instantTime1, deltaInstantTime1, deltaInstantTime2,
         compactionRequestedTime, deltaInstantTime4, deltaInstantTime5);
-    String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 0);
-    String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION, compactionRequestedTime, 1);
+    String fileName3 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        compactionRequestedTime, 0, TEST_WRITE_TOKEN);
+    String fileName4 = FSUtils.makeLogFileName(fileId, HoodieLogFile.DELTA_EXTENSION,
+        compactionRequestedTime, 1, TEST_WRITE_TOKEN);
     new File(basePath + "/" + partitionPath1 + "/" + fileName3).createNewFile();
     new File(basePath + "/" + partitionPath1 + "/" + fileName4).createNewFile();
     new File(basePath + "/" + partitionPath2 + "/" + fileName3).createNewFile();
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/IncrementalFSViewSyncTest.java b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/IncrementalFSViewSyncTest.java
index dc31c1a..a57ebad 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/IncrementalFSViewSyncTest.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/table/view/IncrementalFSViewSyncTest.java
@@ -69,6 +69,8 @@ import org.junit.rules.TemporaryFolder;
 
 public class IncrementalFSViewSyncTest {
 
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+
   protected HoodieTableMetaClient metaClient;
   protected String basePath;
 
@@ -756,8 +758,8 @@ public class IncrementalFSViewSyncTest {
       return fileIdsPerPartition.stream().map(f -> {
         try {
           File file = new File(basePath + "/" + p + "/"
-              + (deltaCommit ? FSUtils.makeLogFileName(f, ".log", baseInstant, Integer.parseInt(instant)) :
-              FSUtils.makeDataFileName(instant, 0, f)));
+              + (deltaCommit ? FSUtils.makeLogFileName(f, ".log", baseInstant,
+              Integer.parseInt(instant), TEST_WRITE_TOKEN) : FSUtils.makeDataFileName(instant, TEST_WRITE_TOKEN, f)));
           file.createNewFile();
           HoodieWriteStat w = new HoodieWriteStat();
           w.setFileId(f);
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java
index 5f46f8b..527322c 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/CompactionTestUtils.java
@@ -49,6 +49,8 @@ import org.junit.Assert;
 
 public class CompactionTestUtils {
 
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+
   public static Map<HoodieFileGroupId, Pair<String, HoodieCompactionOperation>> setupAndValidateCompactionOperations(
       HoodieTableMetaClient metaClient, boolean inflight,
       int numEntriesInPlan1, int numEntriesInPlan2,
@@ -151,7 +153,7 @@ public class CompactionTestUtils {
         FileSlice slice = new FileSlice(DEFAULT_PARTITION_PATHS[0], instantId, fileId);
         if (createDataFile) {
           slice.setDataFile(new TestHoodieDataFile(metaClient.getBasePath() + "/" + DEFAULT_PARTITION_PATHS[0]
-              + "/" + FSUtils.makeDataFileName(instantId, 1, fileId)));
+              + "/" + FSUtils.makeDataFileName(instantId, TEST_WRITE_TOKEN, fileId)));
         }
         String logFilePath1 = HoodieTestUtils
             .getLogFilePath(metaClient.getBasePath(), DEFAULT_PARTITION_PATHS[0], instantId, fileId,
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java
index c364884..f689039 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestCompactionUtils.java
@@ -50,6 +50,8 @@ import org.junit.rules.TemporaryFolder;
 
 public class TestCompactionUtils {
 
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+
   private static final Map<String, Double> metrics =
       new ImmutableMap.Builder<String, Double>()
           .put("key1", 1.0)
@@ -85,9 +87,9 @@ public class TestCompactionUtils {
     //File Slice with no data-file but log files present
     FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
     noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
-        FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
+        FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
     noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
-        FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
+        FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
     op = CompactionUtils.buildFromFileSlice(
         DEFAULT_PARTITION_PATHS[0], noDataFileSlice, Optional.of(metricsCaptureFn));
     testFileSliceCompactionOpEquality(noDataFileSlice, op, DEFAULT_PARTITION_PATHS[0]);
@@ -96,9 +98,9 @@ public class TestCompactionUtils {
     FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
     fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet"));
     fileSlice.addLogFile(new HoodieLogFile(new Path(
-        FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
+        FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
     fileSlice.addLogFile(new HoodieLogFile(new Path(
-        FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
+        FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
     op = CompactionUtils.buildFromFileSlice(
         DEFAULT_PARTITION_PATHS[0], fileSlice, Optional.of(metricsCaptureFn));
     testFileSliceCompactionOpEquality(fileSlice, op, DEFAULT_PARTITION_PATHS[0]);
@@ -112,16 +114,16 @@ public class TestCompactionUtils {
     FileSlice fileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
     fileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet"));
     fileSlice.addLogFile(new HoodieLogFile(new Path(
-        FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
+        FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
     fileSlice.addLogFile(new HoodieLogFile(new Path(
-        FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
+        FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
     FileSlice noLogFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noLog1");
     noLogFileSlice.setDataFile(new TestHoodieDataFile("/tmp/noLog_1_000.parquet"));
     FileSlice noDataFileSlice = new FileSlice(DEFAULT_PARTITION_PATHS[0],"000", "noData1");
     noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
-        FSUtils.makeLogFileName("noData1", ".log", "000", 1))));
+        FSUtils.makeLogFileName("noData1", ".log", "000", 1, TEST_WRITE_TOKEN))));
     noDataFileSlice.addLogFile(new HoodieLogFile(new Path(
-        FSUtils.makeLogFileName("noData1", ".log", "000", 2))));
+        FSUtils.makeLogFileName("noData1", ".log", "000", 2, TEST_WRITE_TOKEN))));
     List<FileSlice> fileSliceList = Arrays.asList(emptyFileSlice, noDataFileSlice, fileSlice, noLogFileSlice);
     List<Pair<String, FileSlice>> input = fileSliceList.stream().map(f -> Pair.of(DEFAULT_PARTITION_PATHS[0], f))
         .collect(Collectors.toList());
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
index 8afcb90..5c3ba02 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
@@ -23,14 +23,18 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.UUID;
+import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.EnvironmentVariables;
 
 public class TestFSUtils {
 
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+
   @Rule
   public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
 
@@ -39,22 +43,8 @@ public class TestFSUtils {
     String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
     int taskPartitionId = 2;
     String fileName = UUID.randomUUID().toString();
-    assertTrue(FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName)
-        .equals(fileName + "_" + taskPartitionId + "_" + commitTime + ".parquet"));
-  }
-
-  @Test
-  public void testMakeTempDataFileName() {
-    String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
-    String partitionPath = "2017/12/31";
-    int taskPartitionId = Integer.MAX_VALUE;
-    int stageId = Integer.MAX_VALUE;
-    long taskAttemptId = Long.MAX_VALUE;
-    String fileName = UUID.randomUUID().toString();
-    assertTrue(
-        FSUtils.makeTempDataFileName(partitionPath, commitTime, taskPartitionId, fileName, stageId, taskAttemptId)
-            .equals(partitionPath.replace("/", "-") + "_" + fileName + "_" + taskPartitionId + "_" + commitTime + "_"
-                + stageId + "_" + taskAttemptId + ".parquet"));
+    assertTrue(FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName)
+        .equals(fileName + "_" + TEST_WRITE_TOKEN + "_" + commitTime + ".parquet"));
   }
 
   @Test
@@ -70,7 +60,7 @@ public class TestFSUtils {
     String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
     int taskPartitionId = 2;
     String fileName = UUID.randomUUID().toString();
-    String fullFileName = FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName);
+    String fullFileName = FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName);
     assertTrue(FSUtils.getCommitTime(fullFileName).equals(commitTime));
   }
 
@@ -79,7 +69,7 @@ public class TestFSUtils {
     String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
     int taskPartitionId = 2;
     String fileName = UUID.randomUUID().toString();
-    String fullFileName = FSUtils.makeDataFileName(commitTime, taskPartitionId, fileName);
+    String fullFileName = FSUtils.makeDataFileName(commitTime, TEST_WRITE_TOKEN, fileName);
     assertTrue(FSUtils.getFileId(fullFileName).equals(fileName));
   }
 
@@ -121,4 +111,47 @@ public class TestFSUtils {
     Path partitionPath = new Path("/test/apache/apache/hudi");
     assertEquals("apache/hudi", FSUtils.getRelativePartitionPath(basePath, partitionPath));
   }
+
+  @Test
+  public void testOldLogFileName() {
+    // Check if old log file names are still parseable by FSUtils method
+    String partitionPath = "2019/01/01/";
+    String fileName = UUID.randomUUID().toString();
+    String oldLogFile = makeOldLogFileName(fileName, ".log", "100", 1);
+    Path rlPath = new Path(new Path(partitionPath), oldLogFile);
+    Assert.assertTrue(FSUtils.isLogFile(rlPath));
+    Assert.assertEquals(fileName, FSUtils.getFileIdFromLogPath(rlPath));
+    Assert.assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(rlPath));
+    Assert.assertEquals(1, FSUtils.getFileVersionFromLog(rlPath));
+    Assert.assertNull(FSUtils.getTaskPartitionIdFromLogPath(rlPath));
+    Assert.assertNull(FSUtils.getStageIdFromLogPath(rlPath));
+    Assert.assertNull(FSUtils.getTaskAttemptIdFromLogPath(rlPath));
+    Assert.assertNull(FSUtils.getWriteTokenFromLogPath(rlPath));
+  }
+
+  @Test
+  public void tesLogFileName() {
+    // Check if log file names are parseable by FSUtils method
+    String partitionPath = "2019/01/01/";
+    String fileName = UUID.randomUUID().toString();
+    String logFile = FSUtils.makeLogFileName(fileName, ".log", "100", 2, "1-0-1");
+    System.out.println("Log File =" + logFile);
+    Path rlPath = new Path(new Path(partitionPath), logFile);
+    Assert.assertTrue(FSUtils.isLogFile(rlPath));
+    Assert.assertEquals(fileName, FSUtils.getFileIdFromLogPath(rlPath));
+    Assert.assertEquals("100", FSUtils.getBaseCommitTimeFromLogPath(rlPath));
+    Assert.assertEquals(2, FSUtils.getFileVersionFromLog(rlPath));
+    Assert.assertEquals(new Integer(1), FSUtils.getTaskPartitionIdFromLogPath(rlPath));
+    Assert.assertEquals(new Integer(0), FSUtils.getStageIdFromLogPath(rlPath));
+    Assert.assertEquals(new Integer(1), FSUtils.getTaskAttemptIdFromLogPath(rlPath));
+
+  }
+
+  public static String makeOldLogFileName(String fileId, String logFileExtension,
+      String baseCommitTime, int version) {
+    Pattern oldLogFilePattern =
+        Pattern.compile("\\.(.*)_(.*)\\.(.*)\\.([0-9]*)(\\.([0-9]*))");
+    return "." + String
+        .format("%s_%s%s.%d", fileId, baseCommitTime, logFileExtension, version);
+  }
 }
diff --git a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java
index 36f6a54..9029c97 100644
--- a/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java
+++ b/hoodie-hadoop-mr/src/main/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeFileSplit.java
@@ -97,4 +97,14 @@ public class HoodieRealtimeFileSplit extends FileSplit {
       deltaFilePaths.add(readString(in));
     }
   }
+
+  @Override
+  public String toString() {
+    return "HoodieRealtimeFileSplit{"
+        + "DataPath=" + getPath()
+        + ", deltaFilePaths=" + deltaFilePaths
+        + ", maxCommitTime='" + maxCommitTime + '\''
+        + ", basePath='" + basePath + '\''
+        + '}';
+  }
 }
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
index 351a306..b774458 100644
--- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/InputFormatTestUtil.java
@@ -36,6 +36,8 @@ import org.junit.rules.TemporaryFolder;
 
 public class InputFormatTestUtil {
 
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+
   public static File prepareDataset(TemporaryFolder basePath, int numberOfFiles,
       String commitNumber) throws IOException {
     basePath.create();
@@ -43,7 +45,7 @@ public class InputFormatTestUtil {
     File partitionPath = basePath.newFolder("2016", "05", "01");
     for (int i = 0; i < numberOfFiles; i++) {
       File dataFile = new File(partitionPath,
-          FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i));
+          FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i));
       dataFile.createNewFile();
     }
     return partitionPath;
@@ -65,7 +67,7 @@ public class InputFormatTestUtil {
         .subList(0, Math.min(numberOfFilesUpdated, dataFiles.size()));
     for (File file : toUpdateList) {
       String fileId = FSUtils.getFileId(file.getName());
-      File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, 1, fileId));
+      File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId));
       dataFile.createNewFile();
     }
   }
@@ -117,7 +119,7 @@ public class InputFormatTestUtil {
       throws IOException {
     AvroParquetWriter parquetWriter;
     for (int i = 0; i < numberOfFiles; i++) {
-      String fileId = FSUtils.makeDataFileName(commitNumber, 1, "fileid" + i);
+      String fileId = FSUtils.makeDataFileName(commitNumber, TEST_WRITE_TOKEN, "fileid" + i);
       File dataFile = new File(partitionPath, fileId);
       parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()), schema);
       try {
@@ -149,7 +151,7 @@ public class InputFormatTestUtil {
       }
     })[0];
     String fileId = FSUtils.getFileId(fileToUpdate.getName());
-    File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, 1, fileId));
+    File dataFile = new File(directory, FSUtils.makeDataFileName(newCommit, TEST_WRITE_TOKEN, fileId));
     AvroParquetWriter parquetWriter = new AvroParquetWriter(new Path(dataFile.getAbsolutePath()),
         schema);
     try {
diff --git a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
index fbd635e..d1ed214 100644
--- a/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
+++ b/hoodie-hadoop-mr/src/test/java/com/uber/hoodie/hadoop/realtime/HoodieRealtimeRecordReaderTest.java
@@ -141,7 +141,7 @@ public class HoodieRealtimeRecordReaderTest {
     //create a split with baseFile (parquet file written earlier) and new log file(s)
     String logFilePath = writer.getLogFile().getPath().toString();
     HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
-        new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + ".parquet"), 0, 1,
+        new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1,
             jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
 
     //create a RecordReader to be used by HoodieRealtimeRecordReader
@@ -203,7 +203,7 @@ public class HoodieRealtimeRecordReaderTest {
     //create a split with baseFile (parquet file written earlier) and new log file(s)
     String logFilePath = writer.getLogFile().getPath().toString();
     HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
-        new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + ".parquet"), 0, 1,
+        new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1,
             jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
 
     //create a RecordReader to be used by HoodieRealtimeRecordReader
@@ -286,7 +286,7 @@ public class HoodieRealtimeRecordReaderTest {
     //create a split with baseFile (parquet file written earlier) and new log file(s)
     String logFilePath = writer.getLogFile().getPath().toString();
     HoodieRealtimeFileSplit split = new HoodieRealtimeFileSplit(
-        new FileSplit(new Path(partitionDir + "/fileid0_1_" + commitTime + ".parquet"), 0, 1,
+        new FileSplit(new Path(partitionDir + "/fileid0_1-0-1_" + commitTime + ".parquet"), 0, 1,
             jobConf), basePath.getRoot().getPath(), Arrays.asList(logFilePath), newCommitTime);
 
     //create a RecordReader to be used by HoodieRealtimeRecordReader
diff --git a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java
index 7093bcf..d9d3461 100644
--- a/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java
+++ b/hoodie-hive/src/test/java/com/uber/hoodie/hive/TestUtil.java
@@ -16,7 +16,6 @@
 
 package com.uber.hoodie.hive;
 
-import static com.uber.hoodie.common.model.HoodieTestUtils.DEFAULT_TASK_PARTITIONID;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.Lists;
@@ -260,7 +259,7 @@ public class TestUtil {
       // Create 5 files
       String fileId = UUID.randomUUID().toString();
       Path filePath = new Path(partPath.toString() + "/" + FSUtils.makeDataFileName(commitTime,
-          DEFAULT_TASK_PARTITIONID, fileId));
+          "1-0-1", fileId));
       generateParquetData(filePath, isParquetSchemaSimple);
       HoodieWriteStat writeStat = new HoodieWriteStat();
       writeStat.setFileId(fileId);
diff --git a/hoodie-timeline-service/pom.xml b/hoodie-timeline-service/pom.xml
index 9f26109..0202d85 100644
--- a/hoodie-timeline-service/pom.xml
+++ b/hoodie-timeline-service/pom.xml
@@ -119,7 +119,7 @@
     <dependency>
       <groupId>io.javalin</groupId>
       <artifactId>javalin</artifactId>
-      <version>2.4.0</version>
+      <version>2.8.0</version>
     </dependency>
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
diff --git a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/RowBasedSchemaProvider.java b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/RowBasedSchemaProvider.java
index 0a9d7c6..6592eb6 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/RowBasedSchemaProvider.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/schema/RowBasedSchemaProvider.java
@@ -20,6 +20,6 @@ public class RowBasedSchemaProvider extends SchemaProvider {
   @Override
   public Schema getSourceSchema() {
     return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct, HOODIE_RECORD_STRUCT_NAME,
-        HOODIE_RECORD_NAMESPACE);
+            HOODIE_RECORD_NAMESPACE);
   }
 }
diff --git a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java
index 45344da..86c2d38 100644
--- a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java
+++ b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieSnapshotCopier.java
@@ -37,6 +37,8 @@ import org.junit.rules.TemporaryFolder;
 
 public class TestHoodieSnapshotCopier {
 
+  private static String TEST_WRITE_TOKEN = "1-0-1";
+
   private String rootPath = null;
   private String basePath = null;
   private String outputPath = null;
@@ -102,35 +104,35 @@ public class TestHoodieSnapshotCopier {
             basePath);
     // Make commit1
     File file11 = new File(
-        basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, 1, "id11"));
+        basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id11"));
     file11.createNewFile();
     File file12 = new File(
-        basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, 1, "id12"));
+        basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id12"));
     file12.createNewFile();
     File file13 = new File(
-        basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, 1, "id13"));
+        basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, "id13"));
     file13.createNewFile();
 
     // Make commit2
     File file21 = new File(
-        basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, 1, "id21"));
+        basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id21"));
     file21.createNewFile();
     File file22 = new File(
-        basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, 1, "id22"));
+        basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id22"));
     file22.createNewFile();
     File file23 = new File(
-        basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, 1, "id23"));
+        basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, "id23"));
     file23.createNewFile();
 
     // Make commit3
     File file31 = new File(
-        basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, 1, "id31"));
+        basePath + "/2016/05/01/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id31"));
     file31.createNewFile();
     File file32 = new File(
-        basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, 1, "id32"));
+        basePath + "/2016/05/02/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id32"));
     file32.createNewFile();
     File file33 = new File(
-        basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, 1, "id33"));
+        basePath + "/2016/05/06/" + FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, "id33"));
     file33.createNewFile();
 
     // Do a snapshot copy