You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/01/04 19:07:37 UTC

[incubator-hudi] branch master updated: [HUDI-377] Adding Delete() support to DeltaStreamer (#1073)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7031445  [HUDI-377] Adding Delete() support to DeltaStreamer (#1073)
7031445 is described below

commit 7031445eb3cae5a4557786c7eb080944320609aa
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Sat Jan 4 11:07:31 2020 -0800

    [HUDI-377] Adding Delete() support to DeltaStreamer (#1073)
    
    - Provides ability to perform hard deletes by writing delete marker records into the source data
    - if the record contains a special field _hoodie_delete_marker set to true, deletes are performed
---
 .../hudi/common/HoodieTestDataGenerator.java       | 83 ++++++++++++++++++----
 .../main/java/org/apache/hudi/BaseAvroPayload.java |  7 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java | 16 ++---
 .../hudi/OverwriteWithLatestAvroPayload.java       | 12 +++-
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 14 ++--
 .../deltastreamer/SourceFormatAdapter.java         | 11 +--
 .../hudi/utilities/TestHoodieDeltaStreamer.java    | 31 ++++----
 .../utilities/sources/AbstractBaseTestSource.java  | 25 +++++--
 .../resources/delta-streamer-config/source.avsc    |  8 ++-
 .../sql-transformer.properties                     |  2 +-
 .../resources/delta-streamer-config/target.avsc    | 12 +++-
 11 files changed, 150 insertions(+), 71 deletions(-)

diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index efc2444..ddeec6a 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -57,6 +57,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
+
 /**
  * Class to be used in tests to keep generating test inserts and updates against a corpus.
  * <p>
@@ -73,14 +74,15 @@ public class HoodieTestDataGenerator {
   public static final String[] DEFAULT_PARTITION_PATHS =
       {DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
   public static final int DEFAULT_PARTITION_DEPTH = 3;
-  public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
-      + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\": \"_row_key\", \"type\": \"string\"},"
-      + "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
-      + "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
-      + "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
-      + "{\"name\":\"fare\",\"type\": \"double\"}]}";
+  public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\"," + "\"name\": \"triprec\"," + "\"fields\": [ "
+      + "{\"name\": \"timestamp\",\"type\": \"double\"}," + "{\"name\": \"_row_key\", \"type\": \"string\"},"
+      + "{\"name\": \"rider\", \"type\": \"string\"}," + "{\"name\": \"driver\", \"type\": \"string\"},"
+      + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
+      + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
+      + "{\"name\":\"fare\",\"type\": \"double\"},"
+      + "{\"name\": \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false} ]}";
   public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
-  public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double";
+  public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double,boolean";
   public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
   public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);
 
@@ -118,6 +120,15 @@ public class HoodieTestDataGenerator {
   }
 
   /**
+   * Generates a new avro record of the above schema format for a delete.
+   */
+  public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, String commitTime) throws IOException {
+    GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0.0,
+        true);
+    return new TestRawTripPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
    * Generates a new avro record of the above schema format, retaining the key if optionally provided.
    */
   public static HoodieAvroPayload generateAvroPayload(HoodieKey key, String commitTime) {
@@ -126,7 +137,12 @@ public class HoodieTestDataGenerator {
   }
 
   public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
-      double timestamp) {
+                                                    double timestamp) {
+    return generateGenericRecord(rowKey, riderName, driverName, timestamp, false);
+  }
+
+  public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
+                                                    double timestamp, boolean isDeleteRecord) {
     GenericRecord rec = new GenericData.Record(avroSchema);
     rec.put("_row_key", rowKey);
     rec.put("timestamp", timestamp);
@@ -137,12 +153,18 @@ public class HoodieTestDataGenerator {
     rec.put("end_lat", rand.nextDouble());
     rec.put("end_lon", rand.nextDouble());
     rec.put("fare", rand.nextDouble() * 100);
+    if (isDeleteRecord) {
+      rec.put("_hoodie_is_deleted", true);
+    } else {
+      rec.put("_hoodie_is_deleted", false);
+    }
     return rec;
   }
 
   public static void createCommitFile(String basePath, String commitTime, Configuration configuration) {
     Arrays.asList(HoodieTimeline.makeCommitFileName(commitTime), HoodieTimeline.makeInflightCommitFileName(commitTime),
-        HoodieTimeline.makeRequestedCommitFileName(commitTime)).forEach(f -> {
+        HoodieTimeline.makeRequestedCommitFileName(commitTime))
+        .forEach(f -> {
           Path commitFile = new Path(
               basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + f);
           FSDataOutputStream os = null;
@@ -176,7 +198,7 @@ public class HoodieTestDataGenerator {
   }
 
   public static void createCompactionAuxiliaryMetadata(String basePath, HoodieInstant instant,
-      Configuration configuration) throws IOException {
+                                                       Configuration configuration) throws IOException {
     Path commitFile =
         new Path(basePath + "/" + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + "/" + instant.getFileName());
     FileSystem fs = FSUtils.getFs(basePath, configuration);
@@ -332,7 +354,7 @@ public class HoodieTestDataGenerator {
    * list
    *
    * @param commitTime Commit Timestamp
-   * @param n Number of updates (including dups)
+   * @param n          Number of updates (including dups)
    * @return list of hoodie record updates
    */
   public List<HoodieRecord> generateUpdates(String commitTime, Integer n) throws IOException {
@@ -349,7 +371,7 @@ public class HoodieTestDataGenerator {
    * Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
    *
    * @param commitTime Commit Timestamp
-   * @param n Number of unique records
+   * @param n          Number of unique records
    * @return list of hoodie record updates
    */
   public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer n) {
@@ -370,7 +392,7 @@ public class HoodieTestDataGenerator {
    * Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
    *
    * @param commitTime Commit Timestamp
-   * @param n Number of unique records
+   * @param n          Number of unique records
    * @return stream of hoodie record updates
    */
   public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, Integer n) {
@@ -418,11 +440,46 @@ public class HoodieTestDataGenerator {
         index = (index + 1) % numExistingKeys;
         kp = existingKeys.get(index);
       }
+      existingKeys.remove(kp);
+      numExistingKeys--;
       used.add(kp);
       return kp.key;
     });
   }
 
+  /**
+   * Generates deduped delete records previously inserted, randomly distributed across the keys above.
+   *
+   * @param commitTime Commit Timestamp
+   * @param n          Number of unique records
+   * @return stream of hoodie records for delete
+   */
+  public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String commitTime, Integer n) {
+    final Set<KeyPartition> used = new HashSet<>();
+
+    if (n > numExistingKeys) {
+      throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
+    }
+
+    return IntStream.range(0, n).boxed().map(i -> {
+      int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
+      KeyPartition kp = existingKeys.get(index);
+      // Find the available keyPartition starting from randomly chosen one.
+      while (used.contains(kp)) {
+        index = (index + 1) % numExistingKeys;
+        kp = existingKeys.get(index);
+      }
+      existingKeys.remove(kp);
+      numExistingKeys--;
+      used.add(kp);
+      try {
+        return new HoodieRecord(kp.key, generateRandomDeleteValue(kp.key, commitTime));
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+    });
+  }
+
   public String[] getPartitionPaths() {
     return partitionPaths;
   }
diff --git a/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java
index b133560..3bf07d9 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/BaseAvroPayload.java
@@ -31,7 +31,6 @@ import java.io.Serializable;
  * Base class for all AVRO record based payloads, that can be ordered based on a field.
  */
 public abstract class BaseAvroPayload implements Serializable {
-
   /**
    * Avro data extracted from the source converted to bytes.
    */
@@ -43,8 +42,10 @@ public abstract class BaseAvroPayload implements Serializable {
   protected final Comparable orderingVal;
 
   /**
-   * @param record
-   * @param orderingVal
+   * Instantiate {@link BaseAvroPayload}.
+   *
+   * @param record      Generic record for the payload.
+   * @param orderingVal {@link Comparable} to be used in pre combine.
    */
   public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
     try {
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 1dbd944..b06a9ae 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -103,7 +103,7 @@ public class DataSourceUtils {
 
   /**
    * Create a key generator class via reflection, passing in any configs needed.
-   *
+   * <p>
    * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the
    * corresponding key generator class; otherwise, use the default key generator class specified in {@code
    * DataSourceWriteOptions}.
@@ -125,7 +125,7 @@ public class DataSourceUtils {
       throws IOException {
     try {
       return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-          new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
+          new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
     } catch (Throwable e) {
       throw new IOException("Could not create payload for class: " + payloadClass, e);
     }
@@ -140,7 +140,7 @@ public class DataSourceUtils {
   }
 
   public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
-      String tblName, Map<String, String> parameters) {
+                                                     String tblName, Map<String, String> parameters) {
 
     // inline compaction is on by default for MOR
     boolean inlineCompact = parameters.get(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY())
@@ -162,7 +162,7 @@ public class DataSourceUtils {
   }
 
   public static JavaRDD<WriteStatus> doWriteOperation(HoodieWriteClient client, JavaRDD<HoodieRecord> hoodieRecords,
-      String commitTime, String operation) {
+                                                      String commitTime, String operation) {
     if (operation.equals(DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL())) {
       return client.bulkInsert(hoodieRecords, commitTime);
     } else if (operation.equals(DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL())) {
@@ -174,19 +174,19 @@ public class DataSourceUtils {
   }
 
   public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
-      String commitTime) {
+                                                       String commitTime) {
     return client.delete(hoodieKeys, commitTime);
   }
 
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
-      String payloadClass) throws IOException {
+                                                String payloadClass) throws IOException {
     HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
     return new HoodieRecord<>(hKey, payload);
   }
 
   @SuppressWarnings("unchecked")
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
-      HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
+                                                     HoodieWriteConfig writeConfig, Option<EmbeddedTimelineService> timelineService) {
     HoodieReadClient client = null;
     try {
       client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
@@ -205,7 +205,7 @@ public class DataSourceUtils {
 
   @SuppressWarnings("unchecked")
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc, JavaRDD<HoodieRecord> incomingHoodieRecords,
-      Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
+                                                     Map<String, String> parameters, Option<EmbeddedTimelineService> timelineService) {
     HoodieWriteConfig writeConfig =
         HoodieWriteConfig.newBuilder().withPath(parameters.get("path")).withProps(parameters).build();
     return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, timelineService);
diff --git a/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java b/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java
index e860837..32d584e 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/OverwriteWithLatestAvroPayload.java
@@ -38,8 +38,7 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
     implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
 
   /**
-   * @param record
-   * @param orderingVal
+   *
    */
   public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable orderingVal) {
     super(record, orderingVal);
@@ -61,8 +60,15 @@ public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema) throws IOException {
+
+    GenericRecord genericRecord = (GenericRecord) getInsertValue(schema).get();
     // combining strategy here trivially ignores currentValue on disk and writes this record
-    return getInsertValue(schema);
+    Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+    if (deleteMarker instanceof Boolean && (boolean) deleteMarker) {
+      return Option.empty();
+    } else {
+      return Option.of(genericRecord);
+    }
   }
 
   @Override
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 7dfb015..a001323 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -158,8 +158,8 @@ public class DeltaSync implements Serializable {
   private final HoodieTableType tableType;
 
   public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
-      HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
-      Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
+                   HoodieTableType tableType, TypedProperties props, JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
+                   Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
 
     this.cfg = cfg;
     this.jssc = jssc;
@@ -288,7 +288,7 @@ public class DeltaSync implements Serializable {
       // default to RowBasedSchemaProvider
       schemaProvider = this.schemaProvider == null || this.schemaProvider.getTargetSchema() == null
           ? transformed.map(r -> (SchemaProvider) new RowBasedSchemaProvider(r.schema())).orElse(
-              dataAndCheckpoint.getSchemaProvider())
+          dataAndCheckpoint.getSchemaProvider())
           : this.schemaProvider;
     } else {
       // Pull the data from the source & prepare the write
@@ -316,22 +316,22 @@ public class DeltaSync implements Serializable {
           (Comparable) DataSourceUtils.getNestedFieldVal(gr, cfg.sourceOrderingField));
       return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
     });
+
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
   /**
    * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive if needed.
    *
-   * @param records Input Records
+   * @param records       Input Records
    * @param checkpointStr Checkpoint String
-   * @param metrics Metrics
+   * @param metrics       Metrics
    * @return Option Compaction instant if one is scheduled
    */
   private Option<String> writeToSink(JavaRDD<HoodieRecord> records, String checkpointStr,
-      HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
+                                     HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) throws Exception {
 
     Option<String> scheduledCompactionInstant = Option.empty();
-
     // filter dupes if needed
     if (cfg.filterDupes) {
       // turn upserts to insert
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
index a21d263..dd266ed 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SourceFormatAdapter.java
@@ -50,10 +50,6 @@ public final class SourceFormatAdapter {
 
   /**
    * Fetch new data in avro format. If the source provides data in different format, they are translated to Avro format
-   * 
-   * @param lastCkptStr
-   * @param sourceLimit
-   * @return
    */
   public InputBatch<JavaRDD<GenericRecord>> fetchNewDataInAvroFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
@@ -78,10 +74,6 @@ public final class SourceFormatAdapter {
 
   /**
    * Fetch new data in row format. If the source provides data in different format, they are translated to Row format
-   * 
-   * @param lastCkptStr
-   * @param sourceLimit
-   * @return
    */
   public InputBatch<Dataset<Row>> fetchNewDataInRowFormat(Option<String> lastCkptStr, long sourceLimit) {
     switch (source.getSourceType()) {
@@ -95,7 +87,8 @@ public final class SourceFormatAdapter {
                 .ofNullable(
                     r.getBatch()
                         .map(rdd -> AvroConversionUtils.createDataFrame(JavaRDD.toRDD(rdd), sourceSchema.toString(),
-                            source.getSparkSession()))
+                            source.getSparkSession())
+                        )
                         .orElse(null)),
             r.getCheckpointForNextBatch(), r.getSchemaProvider());
       }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index c5f6c76..51e5b3c 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -179,7 +179,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     }
 
     static HoodieDeltaStreamer.Config makeConfig(String basePath, Operation op, String transformerClassName,
-        String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) {
+                                                 String propsFilename, boolean enableHiveSync, boolean useSchemaProviderClass) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips";
@@ -198,7 +198,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     }
 
     static HoodieDeltaStreamer.Config makeConfigForHudiIncrSrc(String srcBasePath, String basePath, Operation op,
-        boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
+                                                               boolean addReadLatestOnMissingCkpt, String schemaProviderClassName) {
       HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
       cfg.targetBasePath = basePath;
       cfg.targetTableName = "hoodie_trips_copy";
@@ -352,11 +352,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     cfg.sourceLimit = 2000;
     cfg.operation = Operation.UPSERT;
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
-    TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
     TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
     List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
-    assertEquals(2000, counts.get(0).getLong(1));
+    assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
   }
 
   @Test
@@ -396,8 +396,8 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
       } else {
         TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs);
       }
-      TestHelpers.assertRecordCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext);
-      TestHelpers.assertDistanceCount(totalRecords, datasetBasePath + "/*/*.parquet", sqlContext);
+      TestHelpers.assertRecordCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext);
+      TestHelpers.assertDistanceCount(totalRecords + 200, datasetBasePath + "/*/*.parquet", sqlContext);
       return true;
     }, 180);
     ds.shutdownGracefully();
@@ -457,12 +457,12 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     cfg.sourceLimit = 2000;
     cfg.operation = Operation.UPSERT;
     new HoodieDeltaStreamer(cfg, jsc, dfs, hiveServer.getHiveConf()).sync();
-    TestHelpers.assertRecordCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
-    TestHelpers.assertDistanceCount(2000, datasetBasePath + "/*/*.parquet", sqlContext);
-    TestHelpers.assertDistanceCountWithExactValue(2000, datasetBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertRecordCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertDistanceCount(1950, datasetBasePath + "/*/*.parquet", sqlContext);
+    TestHelpers.assertDistanceCountWithExactValue(1950, datasetBasePath + "/*/*.parquet", sqlContext);
     lastInstantForUpstreamTable = TestHelpers.assertCommitMetadata("00001", datasetBasePath, dfs, 2);
     List<Row> counts = TestHelpers.countsPerCommit(datasetBasePath + "/*/*.parquet", sqlContext);
-    assertEquals(2000, counts.get(0).getLong(1));
+    assertEquals(1950, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
 
     // Incrementally pull changes in upstream hudi table and apply to downstream table
     downstreamCfg =
@@ -476,7 +476,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
     String finalInstant =
         TestHelpers.assertCommitMetadata(lastInstantForUpstreamTable, downstreamDatasetBasePath, dfs, 2);
     counts = TestHelpers.countsPerCommit(downstreamDatasetBasePath + "/*/*.parquet", sqlContext);
-    assertEquals(2000, counts.get(0).getLong(1));
+    assertEquals(2000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum());
 
     // Test Hive integration
     HoodieHiveClient hiveClient = new HoodieHiveClient(hiveSyncConfig, hiveServer.getHiveConf(), dfs);
@@ -566,12 +566,11 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
 
     /**
      * Returns some random number as distance between the points.
-     * 
+     *
      * @param lat1 Latitiude of source
      * @param lat2 Latitude of destination
      * @param lon1 Longitude of source
      * @param lon2 Longitude of destination
-     * @return
      */
     @Override
     public Double call(Double lat1, Double lat2, Double lon1, Double lon2) {
@@ -586,7 +585,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
 
     @Override
     public Dataset<Row> apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
-        TypedProperties properties) {
+                              TypedProperties properties) {
       rowDataset.sqlContext().udf().register("distance_udf", new DistanceUDF(), DataTypes.DoubleType);
       return rowDataset.withColumn("haversine_distance", functions.callUDF("distance_udf", functions.col("begin_lat"),
           functions.col("end_lat"), functions.col("begin_lon"), functions.col("end_lat")));
@@ -607,7 +606,7 @@ public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
 
     @Override
     public Dataset apply(JavaSparkContext jsc, SparkSession sparkSession, Dataset<Row> rowDataset,
-        TypedProperties properties) {
+                         TypedProperties properties) {
       System.out.println("DropAllTransformer called !!");
       return sparkSession.createDataFrame(jsc.emptyRDD(), rowDataset.schema());
     }
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
index 745b0f0..4eab369 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
@@ -76,12 +76,12 @@ public abstract class AbstractBaseTestSource extends AvroSource {
   }
 
   protected AbstractBaseTestSource(TypedProperties props, JavaSparkContext sparkContext, SparkSession sparkSession,
-      SchemaProvider schemaProvider) {
+                                   SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
   }
 
   protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, int sourceLimit, String commitTime,
-      int partition) {
+                                                        int partition) {
     int maxUniqueKeys =
         props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
 
@@ -94,10 +94,12 @@ public abstract class AbstractBaseTestSource extends AvroSource {
     int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
     int numInserts = sourceLimit - numUpdates;
     LOG.info("Before adjustments => numInserts=" + numInserts + ", numUpdates=" + numUpdates);
+    boolean reachedMax = false;
 
     if (numInserts + numExistingKeys > maxUniqueKeys) {
       // Limit inserts so that maxUniqueRecords is maintained
       numInserts = Math.max(0, maxUniqueKeys - numExistingKeys);
+      reachedMax = true;
     }
 
     if ((numInserts + numUpdates) < sourceLimit) {
@@ -105,16 +107,25 @@ public abstract class AbstractBaseTestSource extends AvroSource {
       numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
     }
 
-    LOG.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
+    Stream<GenericRecord> deleteStream = Stream.empty();
+    Stream<GenericRecord> updateStream;
     long memoryUsage1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
     LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total Memory=" + Runtime.getRuntime().totalMemory()
         + ", Free Memory=" + Runtime.getRuntime().freeMemory());
-
-    Stream<GenericRecord> updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
-        .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+    if (!reachedMax && numUpdates >= 50) {
+      LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords="
+          + maxUniqueKeys);
+      // if we generate update followed by deletes -> some keys in update batch might be picked up for deletes. Hence generating delete batch followed by updates
+      deleteStream = dataGenerator.generateUniqueDeleteRecordStream(commitTime, 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+      updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+    } else {
+      LOG.info("After adjustments => NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
+      updateStream = dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
+          .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+    }
     Stream<GenericRecord> insertStream = dataGenerator.generateInsertsStream(commitTime, numInserts)
         .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
-    return Stream.concat(updateStream, insertStream);
+    return Stream.concat(deleteStream, Stream.concat(updateStream, insertStream));
   }
 
   private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, HoodieTestDataGenerator dataGenerator) {
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
index 2796e08..95757a3 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
@@ -43,9 +43,15 @@
   }, {
     "name" : "end_lon",
     "type" : "double"
-  }, {
+  },
+  {
     "name" : "fare",
     "type" : "double"
+  },
+  {
+    "name" : "_hoodie_is_deleted",
+    "type" : "boolean",
+    "default" : false
   } ]
 }
 
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
index 16a09ed..e8b2857 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
@@ -16,4 +16,4 @@
 # limitations under the License.
 ###
 include=base.properties
-hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
+hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider, a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.fare, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
diff --git a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
index 11e23a4..38e7255 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
@@ -46,9 +46,15 @@
   }, {
     "name" : "fare",
     "type" : "double"
-  }, {
-    "name" : "haversine_distance",
-    "type" : "double"
+  },
+  {
+     "name" : "_hoodie_is_deleted",
+     "type" : "boolean",
+     "default" : false
+  },
+  {
+     "name" : "haversine_distance",
+     "type" : "double"
   }]
 }