You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2019/11/22 23:05:33 UTC

[incubator-hudi] branch master updated: [HUDI-328] Adding delete api to HoodieWriteClient (#1004)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c335510  [HUDI-328] Adding delete api to HoodieWriteClient (#1004)
c335510 is described below

commit c3355109b1fa8f3055a2ba57d6e2b49679581db5
Author: Sivabalan Narayanan <si...@uber.com>
AuthorDate: Fri Nov 22 15:05:25 2019 -0800

    [HUDI-328] Adding delete api to HoodieWriteClient (#1004)
    
    [HUDI-328]  Adding delete api to HoodieWriteClient and Spark DataSource
---
 .../java/org/apache/hudi/HoodieWriteClient.java    | 137 ++++++++--
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  18 ++
 .../java/org/apache/hudi/index/HoodieIndex.java    |   2 +-
 hudi-client/src/test/java/HoodieClientExample.java |  19 +-
 .../java/org/apache/hudi/TestHoodieClientBase.java | 146 +++++++++-
 .../hudi/TestHoodieClientOnCopyOnWriteStorage.java | 185 ++++++++++++-
 .../apache/hudi/common/HoodieClientTestUtils.java  |  24 ++
 .../hudi/common/HoodieTestDataGenerator.java       |  43 ++-
 .../common/model}/EmptyHoodieRecordPayload.java    |   9 +-
 .../hudi/common/model/HoodieCommitMetadata.java    |   1 +
 .../org/apache/hudi/integ/ITTestHoodieSanity.java  |  10 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  13 +-
 .../org/apache/hudi/AvroConversionUtils.scala      |   5 +
 .../scala/org/apache/hudi/DataSourceOptions.scala  |  31 +--
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 295 +++++++++++++--------
 hudi-spark/src/test/java/DataSourceTestUtils.java  |   7 +
 hudi-spark/src/test/java/HoodieJavaApp.java        |  42 ++-
 .../src/test/scala/TestDataSourceDefaults.scala    |   3 +-
 18 files changed, 818 insertions(+), 172 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
index 326bdac..600c7d2 100644
--- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.text.ParseException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -39,6 +40,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieSavepointMetadata;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.HoodieRollbackStat;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieDataFile;
 import org.apache.hudi.common.model.HoodieKey;
@@ -94,6 +96,8 @@ import scala.Tuple2;
 public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHoodieClient {
 
   private static Logger logger = LogManager.getLogger(HoodieWriteClient.class);
+  private static final String UPDATE_STR = "update";
+  private static final String LOOKUP_STR = "lookup";
   private final boolean rollbackInFlight;
   private final transient HoodieMetrics metrics;
   private final transient HoodieIndex<T> index;
@@ -103,18 +107,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   private transient Timer.Context indexTimer = null;
 
   /**
-   * @param jsc
-   * @param clientConfig
-   * @throws Exception
+   *
    */
   public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) throws Exception {
     this(jsc, clientConfig, false);
   }
 
   /**
-   * @param jsc
-   * @param clientConfig
-   * @param rollbackInFlight
+   *
    */
   public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight) {
     this(jsc, clientConfig, rollbackInFlight, HoodieIndex.createIndex(clientConfig, jsc));
@@ -150,7 +150,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     HoodieTable<T> table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
     indexTimer = metrics.getIndexCtx();
     JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table);
-    metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
+    metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
     indexTimer = null;
     return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
   }
@@ -159,7 +159,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * Upserts a bunch of new records into the Hoodie table, at the supplied commitTime
    */
   public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
-    HoodieTable<T> table = getTableAndInitCtx(records);
+    HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT);
     try {
       // De-dupe/merge if needed
       JavaRDD<HoodieRecord<T>> dedupedRecords =
@@ -168,7 +168,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
       indexTimer = metrics.getIndexCtx();
       // perform index loop up to get existing location of records
       JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
-      metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
+      metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
       indexTimer = null;
       return upsertRecordsInternal(taggedRecords, commitTime, table, true);
     } catch (Throwable e) {
@@ -189,7 +189,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
    */
   public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) {
-    HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
+    HoodieTable<T> table = getTableAndInitCtx(OperationType.UPSERT_PREPPED);
     try {
       return upsertRecordsInternal(preppedRecords, commitTime, table, true);
     } catch (Throwable e) {
@@ -211,7 +211,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
    */
   public JavaRDD<WriteStatus> insert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
-    HoodieTable<T> table = getTableAndInitCtx(records);
+    HoodieTable<T> table = getTableAndInitCtx(OperationType.INSERT);
     try {
       // De-dupe/merge if needed
       JavaRDD<HoodieRecord<T>> dedupedRecords =
@@ -238,7 +238,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
    */
   public JavaRDD<WriteStatus> insertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime) {
-    HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
+    HoodieTable<T> table = getTableAndInitCtx(OperationType.INSERT_PREPPED);
     try {
       return upsertRecordsInternal(preppedRecords, commitTime, table, false);
     } catch (Throwable e) {
@@ -281,7 +281,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> bulkInsert(JavaRDD<HoodieRecord<T>> records, final String commitTime,
       Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
-    HoodieTable<T> table = getTableAndInitCtx(records);
+    HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT);
     try {
       // De-dupe/merge if needed
       JavaRDD<HoodieRecord<T>> dedupedRecords =
@@ -314,7 +314,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    */
   public JavaRDD<WriteStatus> bulkInsertPreppedRecords(JavaRDD<HoodieRecord<T>> preppedRecords, final String commitTime,
       Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
-    HoodieTable<T> table = getTableAndInitCtx(preppedRecords);
+    HoodieTable<T> table = getTableAndInitCtx(OperationType.BULK_INSERT_PREPPED);
     try {
       return bulkInsertInternal(preppedRecords, commitTime, table, bulkInsertPartitioner);
     } catch (Throwable e) {
@@ -325,6 +325,46 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     }
   }
 
+  /**
+   * Deletes a list of {@link HoodieKey}s from the Hoodie table, at the supplied commitTime {@link HoodieKey}s will be
+   * deduped and non existant keys will be removed before deleting.
+   *
+   * @param keys {@link List} of {@link HoodieKey}s to be deleted
+   * @param commitTime Commit time handle
+   * @return JavaRDD[WriteStatus] - RDD of WriteStatus to inspect errors and counts
+   */
+  public JavaRDD<WriteStatus> delete(JavaRDD<HoodieKey> keys, final String commitTime) {
+    HoodieTable<T> table = getTableAndInitCtx(OperationType.DELETE);
+    try {
+      // De-dupe/merge if needed
+      JavaRDD<HoodieKey> dedupedKeys =
+          config.shouldCombineBeforeDelete() ? deduplicateKeys(keys, config.getDeleteShuffleParallelism()) : keys;
+
+      JavaRDD<HoodieRecord<T>> dedupedRecords =
+          dedupedKeys.map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+      indexTimer = metrics.getIndexCtx();
+      // perform index loop up to get existing location of records
+      JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
+      // filter out non existant keys/records
+      JavaRDD<HoodieRecord<T>> taggedValidRecords = taggedRecords.filter(record -> record.isCurrentLocationKnown());
+      if (!taggedValidRecords.isEmpty()) {
+        metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
+        indexTimer = null;
+        return upsertRecordsInternal(taggedValidRecords, commitTime, table, true);
+      } else {
+        // if entire set of keys are non existent
+        JavaRDD<WriteStatus> writeStatusRDD = jsc.parallelize(Collections.EMPTY_LIST, 1);
+        commitOnAutoCommit(commitTime, writeStatusRDD, table.getMetaClient().getCommitActionType());
+        return writeStatusRDD;
+      }
+    } catch (Throwable e) {
+      if (e instanceof HoodieUpsertException) {
+        throw (HoodieUpsertException) e;
+      }
+      throw new HoodieUpsertException("Failed to delete for commit time " + commitTime, e);
+    }
+  }
+
   private JavaRDD<WriteStatus> bulkInsertInternal(JavaRDD<HoodieRecord<T>> dedupedRecords, String commitTime,
       HoodieTable<T> table, Option<UserDefinedBulkInsertPartitioner> bulkInsertPartitioner) {
     final JavaRDD<HoodieRecord<T>> repartitionedRecords;
@@ -366,10 +406,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
 
   private JavaRDD<HoodieRecord<T>> combineOnCondition(boolean condition, JavaRDD<HoodieRecord<T>> records,
       int parallelism) {
-    if (condition) {
-      return deduplicateRecords(records, parallelism);
-    }
-    return records;
+    return condition ? deduplicateRecords(records, parallelism) : records;
   }
 
   /**
@@ -451,7 +488,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     indexTimer = metrics.getIndexCtx();
     // Update the index back
     JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
-    metrics.updateIndexMetrics("update", metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
+    metrics.updateIndexMetrics(UPDATE_STR, metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
     indexTimer = null;
     // Trigger the insert and collect statuses
     commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType());
@@ -501,6 +538,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     if (extraMetadata.isPresent()) {
       extraMetadata.get().forEach(metadata::addMetadata);
     }
+    metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, config.getSchema());
 
     try {
       activeTimeline.saveAsComplete(new HoodieInstant(true, actionType, commitTime),
@@ -929,8 +967,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
    * configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
    * cleaned)
-   * 
-   * @throws HoodieIOException
    */
   public void clean() throws HoodieIOException {
     cleanClient.clean();
@@ -942,7 +978,6 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
    * cleaned)
    *
    * @param startCleanTime Cleaner Instant Timestamp
-   * @return
    * @throws HoodieIOException in case of any IOException
    */
   protected HoodieCleanMetadata clean(String startCleanTime) throws HoodieIOException {
@@ -1089,6 +1124,20 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   }
 
   /**
+   * Deduplicate Hoodie records, using the given deduplication funciton.
+   */
+  JavaRDD<HoodieKey> deduplicateKeys(JavaRDD<HoodieKey> keys, int parallelism) {
+    boolean isIndexingGlobal = index.isGlobal();
+    if (isIndexingGlobal) {
+      return keys.keyBy(HoodieKey::getRecordKey)
+          .reduceByKey((key1, key2) -> key1)
+          .values();
+    } else {
+      return keys.distinct();
+    }
+  }
+
+  /**
    * Cleanup all inflight commits
    */
   private void rollbackInflightCommits() {
@@ -1101,9 +1150,13 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     }
   }
 
-  private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) {
+  private HoodieTable getTableAndInitCtx(OperationType operationType) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    if (operationType == OperationType.DELETE) {
+      setWriteSchemaFromLastInstant(metaClient);
+    }
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), config, jsc);
+    HoodieTable table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     if (table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION)) {
       writeContext = metrics.getCommitCtx();
     } else {
@@ -1113,6 +1166,30 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
   }
 
   /**
+   * Sets write schema from last instant since deletes may not have schema set in the config.
+   */
+  private void setWriteSchemaFromLastInstant(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+      Option<HoodieInstant> lastInstant =
+          activeTimeline.filterCompletedInstants().filter(s -> s.getAction().equals(metaClient.getCommitActionType()))
+              .lastInstant();
+      if (lastInstant.isPresent()) {
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+            activeTimeline.getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
+        if (commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY)) {
+          config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY));
+        } else {
+          throw new HoodieIOException("Latest commit does not have any schema in commit metadata");
+        }
+      } else {
+        throw new HoodieIOException("Deletes issued without any prior commits");
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("IOException thrown while reading last commit metadata", e);
+    }
+  }
+  /**
    * Compaction specific private methods
    */
 
@@ -1323,4 +1400,16 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
     }
   }
 
+  /**
+   * Refers to different operation types
+   */
+  enum OperationType {
+    INSERT,
+    INSERT_PREPPED,
+    UPSERT,
+    UPSERT_PREPPED,
+    DELETE,
+    BULK_INSERT,
+    BULK_INSERT_PREPPED
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 913baa1..a391bca 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -51,6 +51,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String INSERT_PARALLELISM = "hoodie.insert.shuffle.parallelism";
   private static final String BULKINSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism";
   private static final String UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism";
+  private static final String DELETE_PARALLELISM = "hoodie.delete.shuffle.parallelism";
   private static final String DEFAULT_ROLLBACK_PARALLELISM = "100";
   private static final String ROLLBACK_PARALLELISM = "hoodie.rollback.parallelism";
   private static final String WRITE_BUFFER_LIMIT_BYTES = "hoodie.write.buffer.limit.bytes";
@@ -59,6 +60,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String DEFAULT_COMBINE_BEFORE_INSERT = "false";
   private static final String COMBINE_BEFORE_UPSERT_PROP = "hoodie.combine.before.upsert";
   private static final String DEFAULT_COMBINE_BEFORE_UPSERT = "true";
+  private static final String COMBINE_BEFORE_DELETE_PROP = "hoodie.combine.before.delete";
+  private static final String DEFAULT_COMBINE_BEFORE_DELETE = "true";
   private static final String WRITE_STATUS_STORAGE_LEVEL = "hoodie.write.status.storage.level";
   private static final String DEFAULT_WRITE_STATUS_STORAGE_LEVEL = "MEMORY_AND_DISK_SER";
   private static final String HOODIE_AUTO_COMMIT_PROP = "hoodie.auto.commit";
@@ -119,6 +122,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return props.getProperty(AVRO_SCHEMA);
   }
 
+  public void setSchema(String schemaStr) {
+    props.setProperty(AVRO_SCHEMA, schemaStr);
+  }
+
   public String getTableName() {
     return props.getProperty(TABLE_NAME);
   }
@@ -143,6 +150,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Integer.parseInt(props.getProperty(UPSERT_PARALLELISM));
   }
 
+  public int getDeleteShuffleParallelism() {
+    return Integer.parseInt(props.getProperty(DELETE_PARALLELISM));
+  }
+
   public int getRollbackParallelism() {
     return Integer.parseInt(props.getProperty(ROLLBACK_PARALLELISM));
   }
@@ -159,6 +170,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_UPSERT_PROP));
   }
 
+  public boolean shouldCombineBeforeDelete() {
+    return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP));
+  }
+
   public StorageLevel getWriteStatusStorageLevel() {
     return StorageLevel.fromString(props.getProperty(WRITE_STATUS_STORAGE_LEVEL));
   }
@@ -666,11 +681,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       setDefaultOnCondition(props, !props.containsKey(BULKINSERT_PARALLELISM), BULKINSERT_PARALLELISM,
           DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(UPSERT_PARALLELISM), UPSERT_PARALLELISM, DEFAULT_PARALLELISM);
+      setDefaultOnCondition(props, !props.containsKey(DELETE_PARALLELISM), DELETE_PARALLELISM, DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(ROLLBACK_PARALLELISM), ROLLBACK_PARALLELISM, DEFAULT_PARALLELISM);
       setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_INSERT_PROP), COMBINE_BEFORE_INSERT_PROP,
           DEFAULT_COMBINE_BEFORE_INSERT);
       setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_UPSERT_PROP), COMBINE_BEFORE_UPSERT_PROP,
           DEFAULT_COMBINE_BEFORE_UPSERT);
+      setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP,
+          DEFAULT_COMBINE_BEFORE_DELETE);
       setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL,
           DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
       setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
diff --git a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
index 78a7510..68612d7 100644
--- a/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
+++ b/hudi-client/src/main/java/org/apache/hudi/index/HoodieIndex.java
@@ -72,7 +72,7 @@ public abstract class HoodieIndex<T extends HoodieRecordPayload> implements Seri
 
   /**
    * Looks up the index and tags each incoming record with a location of a file that contains the row (if it is actually
-   * present)
+   * present).
    */
   public abstract JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
       HoodieTable<T> hoodieTable) throws HoodieIndexException;
diff --git a/hudi-client/src/test/java/HoodieClientExample.java b/hudi-client/src/test/java/HoodieClientExample.java
index a697402..9e42db1 100644
--- a/hudi-client/src/test/java/HoodieClientExample.java
+++ b/hudi-client/src/test/java/HoodieClientExample.java
@@ -18,13 +18,16 @@
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hudi.HoodieWriteClient;
 import org.apache.hudi.WriteStatus;
+import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -93,6 +96,7 @@ public class HoodieClientExample {
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(2, 3).build()).build();
     HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);
 
+    List<HoodieRecord> recordsSoFar = new ArrayList<>();
     /**
      * Write 1 (only inserts)
      */
@@ -100,6 +104,7 @@ public class HoodieClientExample {
     logger.info("Starting commit " + newCommitTime);
 
     List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
+    recordsSoFar.addAll(records);
     JavaRDD<HoodieRecord> writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
     client.upsert(writeRecords, newCommitTime);
 
@@ -108,11 +113,23 @@ public class HoodieClientExample {
      */
     newCommitTime = client.startCommit();
     logger.info("Starting commit " + newCommitTime);
-    records.addAll(dataGen.generateUpdates(newCommitTime, 100));
+    List<HoodieRecord> toBeUpdated = dataGen.generateUpdates(newCommitTime, 100);
+    records.addAll(toBeUpdated);
+    recordsSoFar.addAll(toBeUpdated);
     writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
     client.upsert(writeRecords, newCommitTime);
 
     /**
+     * Delete 1
+     */
+    newCommitTime = client.startCommit();
+    logger.info("Starting commit " + newCommitTime);
+    List<HoodieKey> toBeDeleted = HoodieClientTestUtils
+        .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(recordsSoFar), 10);
+    JavaRDD<HoodieKey> deleteRecords = jsc.<HoodieKey>parallelize(toBeDeleted, 1);
+    client.delete(deleteRecords, newCommitTime);
+
+    /**
      * Schedule a compaction and also perform compaction on a MOR dataset
      */
     if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) {
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java
index 183ea35..5200a05 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientBase.java
@@ -35,6 +35,8 @@ import org.apache.hudi.common.HoodieCleanStat;
 import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
 import org.apache.hudi.common.TestRawTripPayload.MetadataMergeWriteStatus;
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTestUtils;
@@ -108,13 +110,23 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
     return getConfigBuilder().build();
   }
 
+
   /**
    * Get Config builder with default configs set
    *
    * @return Config Builder
    */
   HoodieWriteConfig.Builder getConfigBuilder() {
-    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+    return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+  }
+
+  /**
+   * Get Config builder with default configs set
+   *
+   * @return Config Builder
+   */
+  HoodieWriteConfig.Builder getConfigBuilder(String schemaStr) {
+    return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(schemaStr)
         .withParallelism(2, 2).withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
         .withWriteStatusClass(MetadataMergeWriteStatus.class)
         .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
@@ -215,6 +227,29 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
   }
 
   /**
+   * Helper to generate delete keys generation function for testing Prepped version of API. Prepped APIs expect the keys
+   * to be already de-duped and have location set. This wrapper takes care of record-location setting. Uniqueness is
+   * guaranteed by key-generation function itself.
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param keyGenFunction Keys Generation function
+   * @return Wrapped function
+   */
+  private Function2<List<HoodieKey>, String, Integer> wrapDeleteKeysGenFunctionForPreppedCalls(
+      final HoodieWriteConfig writeConfig, final Function2<List<HoodieKey>, String, Integer> keyGenFunction) {
+    return (commit, numRecords) -> {
+      final HoodieIndex index = HoodieIndex.createIndex(writeConfig, jsc);
+      List<HoodieKey> records = keyGenFunction.apply(commit, numRecords);
+      final HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
+      HoodieTable table = HoodieTable.getHoodieTable(metaClient, writeConfig, jsc);
+      JavaRDD<HoodieRecord> recordsToDelete = jsc.parallelize(records, 1)
+          .map(key -> new HoodieRecord(key, new EmptyHoodieRecordPayload()));
+      JavaRDD<HoodieRecord> taggedRecords = index.tagLocation(recordsToDelete, jsc, table);
+      return taggedRecords.map(record -> record.getKey()).collect();
+    };
+  }
+
+  /**
    * Generate wrapper for record generation function for testing Prepped APIs
    *
    * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
@@ -232,6 +267,23 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
   }
 
   /**
+   * Generate wrapper for delete key generation function for testing Prepped APIs
+   *
+   * @param isPreppedAPI Flag to indicate if this is for testing prepped-version of APIs
+   * @param writeConfig Hoodie Write Config
+   * @param wrapped Actual Records Generation function
+   * @return Wrapped Function
+   */
+  Function2<List<HoodieKey>, String, Integer> generateWrapDeleteKeysFn(boolean isPreppedAPI,
+      HoodieWriteConfig writeConfig, Function2<List<HoodieKey>, String, Integer> wrapped) {
+    if (isPreppedAPI) {
+      return wrapDeleteKeysGenFunctionForPreppedCalls(writeConfig, wrapped);
+    } else {
+      return wrapped;
+    }
+  }
+
+  /**
    * Helper to insert first batch of records and do regular assertions on the state after successful completion
    *
    * @param writeConfig Hoodie Write Config
@@ -290,6 +342,36 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
   }
 
   /**
+   * Helper to delete batch of keys and do regular assertions on the state after successful completion
+   *
+   * @param writeConfig Hoodie Write Config
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param numRecordsInThisCommit Number of records to be added in the new commit
+   * @param deleteFn Delete Function to be used for deletes
+   * @param isPreppedAPI Boolean flag to indicate writeFn expects prepped records
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @return RDD of write-status
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> deleteBatch(HoodieWriteConfig writeConfig, HoodieWriteClient client, String newCommitTime,
+      String prevCommitTime, String initCommitTime,
+      int numRecordsInThisCommit,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn, boolean isPreppedAPI,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+    final Function2<List<HoodieKey>, String, Integer> keyGenFunction =
+        generateWrapDeleteKeysFn(isPreppedAPI, writeConfig, dataGen::generateUniqueDeletes);
+
+    return deleteBatch(client, newCommitTime, prevCommitTime, initCommitTime, numRecordsInThisCommit,
+        keyGenFunction,
+        deleteFn, assertForCommit, expRecordsInThisCommit, expTotalRecords);
+  }
+
+  /**
    * Helper to insert/upsert batch of records and do regular assertions on the state after successful completion
    *
    * @param client Hoodie Write Client
@@ -361,6 +443,68 @@ public class TestHoodieClientBase extends HoodieClientTestHarness {
   }
 
   /**
+   * Helper to delete batch of hoodie keys and do regular assertions on the state after successful completion
+   *
+   * @param client Hoodie Write Client
+   * @param newCommitTime New Commit Timestamp to be used
+   * @param prevCommitTime Commit Timestamp used in previous commit
+   * @param initCommitTime Begin Timestamp (usually "000")
+   * @param keyGenFunction Key Generation function
+   * @param deleteFn Write Function to be used for delete
+   * @param assertForCommit Enable Assertion of Writes
+   * @param expRecordsInThisCommit Expected number of records in this commit
+   * @param expTotalRecords Expected number of records when scanned
+   * @throws Exception in case of error
+   */
+  JavaRDD<WriteStatus> deleteBatch(HoodieWriteClient client, String newCommitTime, String prevCommitTime,
+      String initCommitTime, int numRecordsInThisCommit,
+      Function2<List<HoodieKey>, String, Integer> keyGenFunction,
+      Function3<JavaRDD<WriteStatus>, HoodieWriteClient, JavaRDD<HoodieKey>, String> deleteFn,
+      boolean assertForCommit, int expRecordsInThisCommit, int expTotalRecords) throws Exception {
+
+    // Delete 1 (only deletes)
+    client.startCommitWithTime(newCommitTime);
+
+    List<HoodieKey> keysToDelete = keyGenFunction.apply(newCommitTime, numRecordsInThisCommit);
+    JavaRDD<HoodieKey> deleteRecords = jsc.parallelize(keysToDelete, 1);
+
+    JavaRDD<WriteStatus> result = deleteFn.apply(client, deleteRecords, newCommitTime);
+    List<WriteStatus> statuses = result.collect();
+    assertNoWriteErrors(statuses);
+
+    // check the partition metadata is written out
+    assertPartitionMetadata(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS, fs);
+
+    // verify that there is a commit
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTimeline timeline = new HoodieActiveTimeline(metaClient).getCommitTimeline();
+
+    if (assertForCommit) {
+      assertEquals("Expecting 3 commits.", 3,
+          timeline.findInstantsAfter(initCommitTime, Integer.MAX_VALUE).countInstants());
+      Assert.assertEquals("Latest commit should be " + newCommitTime, newCommitTime,
+          timeline.lastInstant().get().getTimestamp());
+      assertEquals("Must contain " + expRecordsInThisCommit + " records", expRecordsInThisCommit,
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count());
+
+      // Check the entire dataset has all records still
+      String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+      for (int i = 0; i < fullPartitionPaths.length; i++) {
+        fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+      }
+      assertEquals("Must contain " + expTotalRecords + " records", expTotalRecords,
+          HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+      // Check that the incremental consumption from prevCommitTime
+      assertEquals("Incremental consumption from " + prevCommitTime + " should give no records in latest commit,"
+              + " since it is a delete operation",
+          HoodieClientTestUtils.readCommit(basePath, sqlContext, timeline, newCommitTime).count(),
+          HoodieClientTestUtils.readSince(basePath, sqlContext, timeline, prevCommitTime).count());
+    }
+    return result;
+  }
+
+  /**
    * Get Cleaner state corresponding to a partition path
    *
    * @param hoodieCleanStatsTwo List of Clean Stats
diff --git a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
index c2dedff..724c3fa 100644
--- a/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
+++ b/hudi-client/src/test/java/org/apache/hudi/TestHoodieClientOnCopyOnWriteStorage.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi;
 
+import static org.apache.hudi.common.HoodieTestDataGenerator.NULL_SCHEMA;
+import static org.apache.hudi.common.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.apache.hudi.common.util.ParquetUtils.readRowKeysFromParquet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -27,6 +29,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.FileInputStream;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -274,6 +277,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     updateBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
         Option.of(Arrays.asList(commitTimeBetweenPrevAndNew)), initCommitTime, numRecords, writeFn, isPrepped, true,
         numRecords, 200, 2);
+
+    // Delete 1
+    prevCommitTime = newCommitTime;
+    newCommitTime = "005";
+    numRecords = 50;
+
+    deleteBatch(hoodieWriteConfig, client, newCommitTime, prevCommitTime,
+        initCommitTime, numRecords, HoodieWriteClient::delete, isPrepped, true,
+        0, 150);
   }
 
   /**
@@ -330,7 +342,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     final int insertSplitLimit = 100;
     // setup the small file handling params
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
-    dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+    dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
 
     HoodieWriteClient client = getHoodieWriteClient(config, false);
 
@@ -443,7 +455,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     final int insertSplitLimit = 100;
     // setup the small file handling params
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
-    dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
+    dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
     HoodieWriteClient client = getHoodieWriteClient(config, false);
 
     // Inserts => will write file1
@@ -455,7 +467,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
     List<WriteStatus> statuses = client.insert(insertRecordsRDD1, commitTime1).collect();
 
     assertNoWriteErrors(statuses);
-    assertPartitionMetadata(new String[] {testPartitionPath}, fs);
+    assertPartitionMetadata(new String[]{testPartitionPath}, fs);
 
     assertEquals("Just 1 file needs to be added.", 1, statuses.size());
     String file1 = statuses.get(0).getFileId();
@@ -516,6 +528,164 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
   }
 
   /**
+   * Test delete with delete api
+   */
+  @Test
+  public void testDeletesWithDeleteApi() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    final int insertSplitLimit = 100;
+    List<String> keysSoFar = new ArrayList<>();
+    // setup the small file handling params
+    HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit); // hold upto 200 records max
+    dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
+
+    HoodieWriteClient client = getHoodieWriteClient(config, false);
+
+    // Inserts => will write file1
+    String commitTime1 = "001";
+    client.startCommitWithTime(commitTime1);
+    List<HoodieRecord> inserts1 = dataGen.generateInserts(commitTime1, insertSplitLimit); // this writes ~500kb
+    Set<String> keys1 = HoodieClientTestUtils.getRecordKeys(inserts1);
+    keysSoFar.addAll(keys1);
+    JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts1, 1);
+    List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
+
+    assertNoWriteErrors(statuses);
+
+    assertEquals("Just 1 file needs to be added.", 1, statuses.size());
+    String file1 = statuses.get(0).getFileId();
+    Assert.assertEquals("file should contain 100 records",
+        readRowKeysFromParquet(jsc.hadoopConfiguration(), new Path(basePath, statuses.get(0).getStat().getPath()))
+            .size(),
+        100);
+
+    // Delete 20 among 100 inserted
+    testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
+
+    // Insert and update 40 records
+    Pair<Set<String>, List<HoodieRecord>> updateBatch2 = testUpdates("003", client, 40, 120);
+    keysSoFar.addAll(updateBatch2.getLeft());
+
+    // Delete 10 records among 40 updated
+    testDeletes(client, updateBatch2.getRight(), 10, file1, "004", 110, keysSoFar);
+
+    // do another batch of updates
+    Pair<Set<String>, List<HoodieRecord>> updateBatch3 = testUpdates("005", client, 40, 150);
+    keysSoFar.addAll(updateBatch3.getLeft());
+
+    // delete non existent keys
+    String commitTime6 = "006";
+    client.startCommitWithTime(commitTime6);
+
+    List<HoodieRecord> dummyInserts3 = dataGen.generateInserts(commitTime6, 20);
+    List<HoodieKey> hoodieKeysToDelete3 = HoodieClientTestUtils
+        .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(dummyInserts3), 20);
+    JavaRDD<HoodieKey> deleteKeys3 = jsc.parallelize(hoodieKeysToDelete3, 1);
+    statuses = client.delete(deleteKeys3, commitTime6).collect();
+    assertNoWriteErrors(statuses);
+    assertEquals("Just 0 write status for delete.", 0, statuses.size());
+
+    // Check the entire dataset has all records still
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+    assertEquals("Must contain " + 150 + " records", 150,
+        HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+    // delete another batch. previous delete commit should have persisted the schema. If not,
+    // this will throw exception
+    testDeletes(client, updateBatch3.getRight(), 10, file1, "007", 140, keysSoFar);
+  }
+
+  private Pair<Set<String>, List<HoodieRecord>> testUpdates(String commitTime, HoodieWriteClient client,
+      int sizeToInsertAndUpdate, int expectedTotalRecords)
+      throws IOException {
+    client.startCommitWithTime(commitTime);
+    List<HoodieRecord> inserts = dataGen.generateInserts(commitTime, sizeToInsertAndUpdate);
+    Set<String> keys = HoodieClientTestUtils.getRecordKeys(inserts);
+    List<HoodieRecord> insertsAndUpdates = new ArrayList<>();
+    insertsAndUpdates.addAll(inserts);
+    insertsAndUpdates.addAll(dataGen.generateUpdates(commitTime, inserts));
+
+    JavaRDD<HoodieRecord> insertAndUpdatesRDD = jsc.parallelize(insertsAndUpdates, 1);
+    List<WriteStatus> statuses = client.upsert(insertAndUpdatesRDD, commitTime).collect();
+    assertNoWriteErrors(statuses);
+
+    // Check the entire dataset has all records still
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+    assertEquals("Must contain " + expectedTotalRecords + " records", expectedTotalRecords,
+        HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+    return Pair.of(keys, inserts);
+  }
+
+  private void testDeletes(HoodieWriteClient client, List<HoodieRecord> previousRecords, int sizeToDelete,
+      String existingFile, String commitTime, int exepctedRecords, List<String> keys) {
+    client.startCommitWithTime(commitTime);
+
+    List<HoodieKey> hoodieKeysToDelete = HoodieClientTestUtils
+        .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(previousRecords), sizeToDelete);
+    JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1);
+    List<WriteStatus> statuses = client.delete(deleteKeys, commitTime).collect();
+
+    assertNoWriteErrors(statuses);
+
+    assertEquals("Just 1 file needs to be added.", 1, statuses.size());
+    assertEquals("Existing file should be expanded", existingFile, statuses.get(0).getFileId());
+
+    // Check the entire dataset has all records still
+    String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
+    for (int i = 0; i < fullPartitionPaths.length; i++) {
+      fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
+    }
+    assertEquals("Must contain " + exepctedRecords + " records", exepctedRecords,
+        HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count());
+
+    Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
+    assertEquals("file should contain 110 records", readRowKeysFromParquet(jsc.hadoopConfiguration(), newFile).size(),
+        exepctedRecords);
+
+    List<GenericRecord> records = ParquetUtils.readAvroRecords(jsc.hadoopConfiguration(), newFile);
+    for (GenericRecord record : records) {
+      String recordKey = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+      assertTrue("key expected to be part of " + commitTime, keys.contains(recordKey));
+      assertFalse("Key deleted", hoodieKeysToDelete.contains(recordKey));
+    }
+  }
+
+  /**
+   * Test delete with delete api
+   */
+  @Test
+  public void testDeletesWithoutInserts() throws Exception {
+    final String testPartitionPath = "2016/09/26";
+    final int insertSplitLimit = 100;
+    // setup the small file handling params
+    HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, true); // hold upto 200 records max
+    dataGen = new HoodieTestDataGenerator(new String[]{testPartitionPath});
+
+    HoodieWriteClient client = getHoodieWriteClient(config, false);
+
+    // delete non existent keys
+    String commitTime1 = "001";
+    client.startCommitWithTime(commitTime1);
+
+    List<HoodieRecord> dummyInserts = dataGen.generateInserts(commitTime1, 20);
+    List<HoodieKey> hoodieKeysToDelete = HoodieClientTestUtils
+        .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(dummyInserts), 20);
+    JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(hoodieKeysToDelete, 1);
+    try {
+      client.delete(deleteKeys, commitTime1).collect();
+      fail("Should have thrown Exception");
+    } catch (HoodieIOException e) {
+      // ignore
+    }
+  }
+
+  /**
    * Test to ensure commit metadata points to valid files
    */
   @Test
@@ -710,7 +880,14 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
    * Build Hoodie Write Config for small data file sizes
    */
   private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize) {
-    HoodieWriteConfig.Builder builder = getConfigBuilder();
+    return getSmallInsertWriteConfig(insertSplitSize, false);
+  }
+
+  /**
+   * Build Hoodie Write Config for small data file sizes
+   */
+  private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean useNullSchema) {
+    HoodieWriteConfig.Builder builder = getConfigBuilder(useNullSchema ? NULL_SCHEMA : TRIP_EXAMPLE_SCHEMA);
     return builder
         .withCompactionConfig(
             HoodieCompactionConfig.newBuilder().compactionSmallFileSize(HoodieTestDataGenerator.SIZE_PER_RECORD * 15)
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
index 7890f3a..32aa594 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieClientTestUtils.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
@@ -39,6 +40,7 @@ import org.apache.hudi.WriteStatus;
 import org.apache.hudi.avro.HoodieAvroWriteSupport;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieDataFile;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTestUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -69,6 +71,7 @@ import org.apache.spark.sql.SQLContext;
 public class HoodieClientTestUtils {
 
   private static final transient Logger log = LogManager.getLogger(HoodieClientTestUtils.class);
+  private static final Random RANDOM = new Random();
 
   public static List<WriteStatus> collectStatuses(Iterator<List<WriteStatus>> statusListItr) {
     List<WriteStatus> statuses = new ArrayList<>();
@@ -86,6 +89,27 @@ public class HoodieClientTestUtils {
     return keys;
   }
 
+  public static List<HoodieKey> getHoodieKeys(List<HoodieRecord> hoodieRecords) {
+    List<HoodieKey> keys = new ArrayList<>();
+    for (HoodieRecord rec : hoodieRecords) {
+      keys.add(rec.getKey());
+    }
+    return keys;
+  }
+
+  public static List<HoodieKey> getKeysToDelete(List<HoodieKey> keys, int size) {
+    List<HoodieKey> toReturn = new ArrayList<>();
+    int counter = 0;
+    while (counter < size) {
+      int index = RANDOM.nextInt(keys.size());
+      if (!toReturn.contains(keys.get(index))) {
+        toReturn.add(keys.get(index));
+        counter++;
+      }
+    }
+    return toReturn;
+  }
+
   private static void fakeMetaFile(String basePath, String commitTime, String suffix) throws IOException {
     String parentPath = basePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME;
     new File(parentPath).mkdirs();
diff --git a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 68be7d3..52aae1b 100644
--- a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -78,6 +78,7 @@ public class HoodieTestDataGenerator {
       + "{\"name\": \"begin_lat\", \"type\": \"double\"}," + "{\"name\": \"begin_lon\", \"type\": \"double\"},"
       + "{\"name\": \"end_lat\", \"type\": \"double\"}," + "{\"name\": \"end_lon\", \"type\": \"double\"},"
       + "{\"name\":\"fare\",\"type\": \"double\"}]}";
+  public static String NULL_SCHEMA = Schema.create(Schema.Type.NULL).toString();
   public static String TRIP_HIVE_COLUMN_TYPES = "double,string,string,string,double,double,double,double,double";
   public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
   public static Schema avroSchemaWithMetadataFields = HoodieAvroUtils.addMetadataFields(avroSchema);
@@ -302,7 +303,8 @@ public class HoodieTestDataGenerator {
   }
 
   /**
-   * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned list
+   * Generates new updates, randomly distributed across the keys above. There can be duplicates within the returned
+   * list
    *
    * @param commitTime Commit Timestamp
    * @param n Number of updates (including dups)
@@ -330,6 +332,17 @@ public class HoodieTestDataGenerator {
   }
 
   /**
+   * Generates deduped delete of keys previously inserted, randomly distributed across the keys above.
+   *
+   * @param commitTime Commit Timestamp
+   * @param n Number of unique records
+   * @return list of hoodie record updates
+   */
+  public List<HoodieKey> generateUniqueDeletes(String commitTime, Integer n) {
+    return generateUniqueDeleteStream(commitTime, n).collect(Collectors.toList());
+  }
+
+  /**
    * Generates deduped updates of keys previously inserted, randomly distributed across the keys above.
    *
    * @param commitTime Commit Timestamp
@@ -360,6 +373,33 @@ public class HoodieTestDataGenerator {
     });
   }
 
+  /**
+   * Generates deduped delete of keys previously inserted, randomly distributed across the keys above.
+   *
+   * @param commitTime Commit Timestamp
+   * @param n Number of unique records
+   * @return stream of hoodie record updates
+   */
+  public Stream<HoodieKey> generateUniqueDeleteStream(String commitTime, Integer n) {
+    final Set<KeyPartition> used = new HashSet<>();
+
+    if (n > numExistingKeys) {
+      throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
+    }
+
+    return IntStream.range(0, n).boxed().map(i -> {
+      int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
+      KeyPartition kp = existingKeys.get(index);
+      // Find the available keyPartition starting from randomly chosen one.
+      while (used.contains(kp)) {
+        index = (index + 1) % numExistingKeys;
+        kp = existingKeys.get(index);
+      }
+      used.add(kp);
+      return kp.key;
+    });
+  }
+
   public String[] getPartitionPaths() {
     return partitionPaths;
   }
@@ -369,6 +409,7 @@ public class HoodieTestDataGenerator {
   }
 
   public static class KeyPartition implements Serializable {
+
     HoodieKey key;
     String partitionPath;
   }
diff --git a/hudi-spark/src/main/java/org/apache/hudi/EmptyHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java
similarity index 93%
rename from hudi-spark/src/main/java/org/apache/hudi/EmptyHoodieRecordPayload.java
rename to hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java
index ddcbeb7..b9e122a 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/EmptyHoodieRecordPayload.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EmptyHoodieRecordPayload.java
@@ -16,12 +16,11 @@
  * limitations under the License.
  */
 
-package org.apache.hudi;
+package org.apache.hudi.common.model;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
 
 /**
@@ -29,7 +28,11 @@ import org.apache.hudi.common.util.Option;
  */
 public class EmptyHoodieRecordPayload implements HoodieRecordPayload<EmptyHoodieRecordPayload> {
 
-  public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {}
+  public EmptyHoodieRecordPayload() {
+  }
+
+  public EmptyHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {
+  }
 
   @Override
   public EmptyHoodieRecordPayload preCombine(EmptyHoodieRecordPayload another) {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 5eb8ce4..fb70b37 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -40,6 +40,7 @@ import org.apache.log4j.Logger;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class HoodieCommitMetadata implements Serializable {
 
+  public static final String SCHEMA_KEY = "schema";
   private static volatile Logger log = LogManager.getLogger(HoodieCommitMetadata.class);
   protected Map<String, List<HoodieWriteStat>> partitionToWriteStats;
   protected Boolean compacted;
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index 55b64db..7e73460 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -147,15 +147,15 @@ public class ITTestHoodieSanity extends ITTestBase {
     stdOutErr = executeHiveCommand("show tables like '" + hiveTableName + "'");
     Assert.assertEquals("Table exists", hiveTableName, stdOutErr.getLeft());
 
-    // Ensure row count is 100 (without duplicates)
+    // Ensure row count is 80 (without duplicates) (100 - 20 deleted)
     stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName);
-    Assert.assertEquals("Expecting 100 rows to be present in the new table", 100,
+    Assert.assertEquals("Expecting 100 rows to be present in the new table", 80,
         Integer.parseInt(stdOutErr.getLeft().trim()));
 
-    // If is MOR table, ensure realtime table row count is 100 (without duplicates)
+    // If is MOR table, ensure realtime table row count is 100 - 20 = 80 (without duplicates)
     if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
       stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName + "_rt");
-      Assert.assertEquals("Expecting 100 rows to be present in the realtime table,", 100,
+      Assert.assertEquals("Expecting 100 rows to be present in the realtime table,", 80,
           Integer.parseInt(stdOutErr.getLeft().trim()));
     }
 
@@ -167,7 +167,7 @@ public class ITTestHoodieSanity extends ITTestBase {
 
     // Run the count query again. Without Hoodie, all versions are included. So we get a wrong count
     stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName);
-    Assert.assertEquals("Expecting 200 rows to be present in the new table", 200,
+    Assert.assertEquals("Expecting 280 rows to be present in the new table", 280,
         Integer.parseInt(stdOutErr.getLeft().trim()));
   }
 
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 9ce79e9..90128b8 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -92,9 +92,9 @@ public class DataSourceUtils {
   /**
    * Create a key generator class via reflection, passing in any configs needed.
    *
-   * If the class name of key generator is configured through the properties file, i.e., {@code
-   * props}, use the corresponding key generator class; otherwise, use the default key generator class specified in
-   * {@code DataSourceWriteOptions}.
+   * If the class name of key generator is configured through the properties file, i.e., {@code props}, use the
+   * corresponding key generator class; otherwise, use the default key generator class specified in {@code
+   * DataSourceWriteOptions}.
    */
   public static KeyGenerator createKeyGenerator(TypedProperties props) throws IOException {
     String keyGeneratorClass = props.getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
@@ -124,7 +124,7 @@ public class DataSourceUtils {
       throws IOException {
     try {
       return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
-          new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
+          new Class<?>[]{GenericRecord.class, Comparable.class}, record, orderingVal);
     } catch (Throwable e) {
       throw new IOException("Could not create payload for class: " + payloadClass, e);
     }
@@ -172,6 +172,11 @@ public class DataSourceUtils {
     }
   }
 
+  public static JavaRDD<WriteStatus> doDeleteOperation(HoodieWriteClient client, JavaRDD<HoodieKey> hoodieKeys,
+      String commitTime) {
+    return client.delete(hoodieKeys, commitTime);
+  }
+
   public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable orderingVal, HoodieKey hKey,
       String payloadClass) throws IOException {
     HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass, gr, orderingVal);
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index a3ce3c3..372b44a 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
 import com.databricks.spark.avro.SchemaConverters
 import org.apache.avro.generic.GenericRecord
 import org.apache.avro.{Schema, SchemaBuilder}
+import org.apache.hudi.common.model.HoodieKey
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.types._
@@ -41,6 +42,10 @@ object AvroConversionUtils {
       }
   }
 
+  def createRddForDeletes(df: DataFrame, rowField: String, partitionField: String): RDD[HoodieKey] = {
+    df.rdd.map(row => (new HoodieKey(row.getAs[String](rowField), row.getAs[String](partitionField))))
+  }
+
   def createDataFrame(rdd: RDD[GenericRecord], schemaStr: String, ss: SparkSession): Dataset[Row] = {
     if (rdd.isEmpty()) {
       ss.emptyDataFrame
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index bbcc98c..81d13ad 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -66,8 +66,8 @@ object DataSourceReadOptions {
 
   /**
     * For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies opaque map functions,
-    *  filters appearing late in the sequence of transformations cannot be automatically pushed down.
-    *  This option allows setting filters directly on Hoodie Source
+    * filters appearing late in the sequence of transformations cannot be automatically pushed down.
+    * This option allows setting filters directly on Hoodie Source
     */
   val PUSH_DOWN_INCR_FILTERS_OPT_KEY = "hoodie.datasource.read.incr.filters"
 }
@@ -85,6 +85,7 @@ object DataSourceWriteOptions {
   val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
   val INSERT_OPERATION_OPT_VAL = "insert"
   val UPSERT_OPERATION_OPT_VAL = "upsert"
+  val DELETE_OPERATION_OPT_VAL = "delete"
   val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
 
   /**
@@ -152,31 +153,31 @@ object DataSourceWriteOptions {
   val DEFAULT_COMMIT_METADATA_KEYPREFIX_OPT_VAL = "_"
 
   /**
-   *  Flag to indicate whether to drop duplicates upon insert.
-   *  By default insert will accept duplicates, to gain extra performance.
-   */
+    * Flag to indicate whether to drop duplicates upon insert.
+    * By default insert will accept duplicates, to gain extra performance.
+    */
   val INSERT_DROP_DUPS_OPT_KEY = "hoodie.datasource.write.insert.drop.duplicates"
   val DEFAULT_INSERT_DROP_DUPS_OPT_VAL = "false"
 
   /**
-   *  Flag to indicate how many times streaming job should retry for a failed microbatch
-   *  By default 3
-   */
+    * Flag to indicate how many times streaming job should retry for a failed microbatch
+    * By default 3
+    */
   val STREAMING_RETRY_CNT_OPT_KEY = "hoodie.datasource.write.streaming.retry.count"
   val DEFAULT_STREAMING_RETRY_CNT_OPT_VAL = "3"
 
   /**
-   *  Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch
-   *  By default 2000 and it will be doubled by every retry
-   */
+    * Flag to indicate how long (by millisecond) before a retry should issued for failed microbatch
+    * By default 2000 and it will be doubled by every retry
+    */
   val STREAMING_RETRY_INTERVAL_MS_OPT_KEY = "hoodie.datasource.write.streaming.retry.interval.ms"
   val DEFAULT_STREAMING_RETRY_INTERVAL_MS_OPT_VAL = "2000"
 
   /**
-   *  Flag to indicate whether to ignore any non exception error (e.g. writestatus error)
-   *  within a streaming microbatch
-   *  By default true (in favor of streaming progressing over data integrity)
-   */
+    * Flag to indicate whether to ignore any non exception error (e.g. writestatus error)
+    * within a streaming microbatch
+    * By default true (in favor of streaming progressing over data integrity)
+    */
   val STREAMING_IGNORE_FAILED_BATCH_OPT_KEY = "hoodie.datasource.write.streaming.ignore.failed.batch"
   val DEFAULT_STREAMING_IGNORE_FAILED_BATCH_OPT_VAL = "true"
 
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f8bd2df..eeb40ca 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -19,6 +19,7 @@ package org.apache.hudi
 
 import java.util
 
+import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
@@ -29,7 +30,7 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.{HiveSyncConfig, HiveSyncTool}
 import org.apache.log4j.LogManager
-import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
@@ -72,131 +73,215 @@ private[hudi] object HoodieSparkSqlWriter {
         parameters(OPERATION_OPT_KEY)
       }
 
-    // register classes & schemas
-    val structName = s"${tblName.get}_record"
-    val nameSpace = s"hoodie.${tblName.get}"
-    sparkContext.getConf.registerKryoClasses(
-      Array(classOf[org.apache.avro.generic.GenericData],
-        classOf[org.apache.avro.Schema]))
-    val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-    sparkContext.getConf.registerAvroSchemas(schema)
-    log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-    // Convert to RDD[HoodieRecord]
-    val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-    val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-    val hoodieAllIncomingRecords = genericRecords.map(gr => {
-      val orderingVal = DataSourceUtils.getNestedFieldValAsString(
-        gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
-      DataSourceUtils.createHoodieRecord(gr,
-        orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
-    }).toJavaRDD()
+    var writeSuccessful: Boolean = false
+    var commitTime: String = null
+    var writeStatuses: JavaRDD[WriteStatus] = null
 
     val jsc = new JavaSparkContext(sparkContext)
-
     val basePath = new Path(parameters("path"))
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
 
-    // Handle various save modes
-    if (mode == SaveMode.ErrorIfExists && exists) {
-      throw new HoodieException(s"hoodie dataset at $basePath already exists.")
-    }
-    if (mode == SaveMode.Ignore && exists) {
-      log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not performing actual writes.")
-      return (true, common.util.Option.empty())
-    }
-    if (mode == SaveMode.Overwrite && exists) {
-      log.warn(s"hoodie dataset at $basePath already exists. Deleting existing data & overwriting with new data.")
-      fs.delete(basePath, true)
-      exists = false
-    }
+    // Running into issues wrt generic type conversion from Java to Scala.  Couldn't make common code paths for
+    // write and deletes. Specifically, instantiating client of type HoodieWriteClient<T extends HoodieRecordPayload>
+    // is having issues. Hence some codes blocks are same in both if and else blocks.
+    if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+      // register classes & schemas
+      val structName = s"${tblName.get}_record"
+      val nameSpace = s"hoodie.${tblName.get}"
+      sparkContext.getConf.registerKryoClasses(
+        Array(classOf[org.apache.avro.generic.GenericData],
+          classOf[org.apache.avro.Schema]))
+      val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
+      sparkContext.getConf.registerAvroSchemas(schema)
+      log.info(s"Registered avro schema : ${schema.toString(true)}")
 
-    // Create the dataset if not present
-    if (!exists) {
-      HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
-        tblName.get, "archived")
-    }
+      // Convert to RDD[HoodieRecord]
+      val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
+      val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+      val hoodieAllIncomingRecords = genericRecords.map(gr => {
+        val orderingVal = DataSourceUtils.getNestedFieldValAsString(
+          gr, parameters(PRECOMBINE_FIELD_OPT_KEY)).asInstanceOf[Comparable[_]]
+        DataSourceUtils.createHoodieRecord(gr,
+          orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
+      }).toJavaRDD()
 
-    // Create a HoodieWriteClient & issue the write.
-    val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get,
-      mapAsJavaMap(parameters)
-    )
-
-    val hoodieRecords =
-      if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-        DataSourceUtils.dropDuplicates(
-          jsc,
-          hoodieAllIncomingRecords,
-          mapAsJavaMap(parameters), client.getTimelineServer)
-      } else {
-        hoodieAllIncomingRecords
+      // Handle various save modes
+      if (mode == SaveMode.ErrorIfExists && exists) {
+        throw new HoodieException(s"hoodie dataset at $basePath already exists.")
       }
-
-    if (hoodieRecords.isEmpty()) {
-      log.info("new batch has no new records, skipping...")
-      return (true, common.util.Option.empty())
-    }
-
-    val commitTime = client.startCommit()
-
-    val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
-    // Check for errors and commit the write.
-    val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
-    val writeSuccessful =
-    if (errorCount == 0) {
-      log.info("No errors. Proceeding to commit the write.")
-      val metaMap = parameters.filter(kv =>
-        kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
-      val commitSuccess = if (metaMap.isEmpty) {
-        client.commit(commitTime, writeStatuses)
-      } else {
-        client.commit(commitTime, writeStatuses,
-          common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
+      if (mode == SaveMode.Ignore && exists) {
+        log.warn(s"hoodie dataset at $basePath already exists. Ignoring & not performing actual writes.")
+        return (true, common.util.Option.empty())
       }
-
-      if (commitSuccess) {
-        log.info("Commit " + commitTime + " successful!")
+      if (mode == SaveMode.Overwrite && exists) {
+        log.warn(s"hoodie dataset at $basePath already exists. Deleting existing data & overwriting with new data.")
+        fs.delete(basePath, true)
+        exists = false
       }
-      else {
-        log.info("Commit " + commitTime + " failed!")
+
+      // Create the dataset if not present
+      if (!exists) {
+        HoodieTableMetaClient.initTableType(sparkContext.hadoopConfiguration, path.get, storageType,
+          tblName.get, "archived")
       }
 
-      val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
-      val syncHiveSucess = if (hiveSyncEnabled) {
-        log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
-        val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
-        syncHive(basePath, fs, parameters)
-      } else {
-        true
+      // Create a HoodieWriteClient & issue the write.
+      val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName.get,
+        mapAsJavaMap(parameters)
+      )
+
+      val hoodieRecords =
+        if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
+          DataSourceUtils.dropDuplicates(
+            jsc,
+            hoodieAllIncomingRecords,
+            mapAsJavaMap(parameters), client.getTimelineServer)
+        } else {
+          hoodieAllIncomingRecords
+        }
+
+      if (hoodieRecords.isEmpty()) {
+        log.info("new batch has no new records, skipping...")
+        return (true, common.util.Option.empty())
       }
-      client.close()
-      commitSuccess && syncHiveSucess
+      commitTime = client.startCommit()
+      writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, commitTime, operation)
+      // Check for errors and commit the write.
+      val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
+      writeSuccessful =
+        if (errorCount == 0) {
+          log.info("No errors. Proceeding to commit the write.")
+          val metaMap = parameters.filter(kv =>
+            kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
+          val commitSuccess = if (metaMap.isEmpty) {
+            client.commit(commitTime, writeStatuses)
+          } else {
+            client.commit(commitTime, writeStatuses,
+              common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
+          }
+
+          if (commitSuccess) {
+            log.info("Commit " + commitTime + " successful!")
+          }
+          else {
+            log.info("Commit " + commitTime + " failed!")
+          }
+
+          val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+          val syncHiveSucess = if (hiveSyncEnabled) {
+            log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+            val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+            syncHive(basePath, fs, parameters)
+          } else {
+            true
+          }
+          client.close()
+          commitSuccess && syncHiveSucess
+        } else {
+          log.error(s"$operation failed with ${errorCount} errors :");
+          if (log.isTraceEnabled) {
+            log.trace("Printing out the top 100 errors")
+            writeStatuses.rdd.filter(ws => ws.hasErrors)
+              .take(100)
+              .foreach(ws => {
+                log.trace("Global error :", ws.getGlobalError)
+                if (ws.getErrors.size() > 0) {
+                  ws.getErrors.foreach(kt =>
+                    log.trace(s"Error for key: ${kt._1}", kt._2))
+                }
+              })
+          }
+          false
+        }
     } else {
-      log.error(s"$operation failed with ${errorCount} errors :");
-      if (log.isTraceEnabled) {
-        log.trace("Printing out the top 100 errors")
-        writeStatuses.rdd.filter(ws => ws.hasErrors)
-          .take(100)
-          .foreach(ws => {
-            log.trace("Global error :", ws.getGlobalError)
-            if (ws.getErrors.size() > 0) {
-              ws.getErrors.foreach(kt =>
-                log.trace(s"Error for key: ${kt._1}", kt._2))
-            }
-          })
+
+      // Handle save modes
+      if (mode != SaveMode.Append) {
+        throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
       }
-      false
+
+      val structName = s"${tblName.get}_record"
+      val nameSpace = s"hoodie.${tblName.get}"
+      sparkContext.getConf.registerKryoClasses(
+        Array(classOf[org.apache.avro.generic.GenericData],
+          classOf[org.apache.avro.Schema]))
+
+      // Convert to RDD[HoodieKey]
+      val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
+      val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+      val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
+
+      if (!exists) {
+        throw new HoodieException(s"hoodie dataset at $basePath does not exist")
+      }
+
+      // Create a HoodieWriteClient & issue the delete.
+      val client = DataSourceUtils.createHoodieClient(jsc,
+        Schema.create(Schema.Type.NULL).toString, path.get, tblName.get,
+        mapAsJavaMap(parameters)
+      )
+
+      // Issue deletes
+      commitTime = client.startCommit()
+      writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, commitTime)
+      val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
+      writeSuccessful =
+        if (errorCount == 0) {
+          log.info("No errors. Proceeding to commit the write.")
+          val metaMap = parameters.filter(kv =>
+            kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX_OPT_KEY)))
+          val commitSuccess = if (metaMap.isEmpty) {
+            client.commit(commitTime, writeStatuses)
+          } else {
+            client.commit(commitTime, writeStatuses,
+              common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
+          }
+
+          if (commitSuccess) {
+            log.info("Commit " + commitTime + " successful!")
+          }
+          else {
+            log.info("Commit " + commitTime + " failed!")
+          }
+
+          val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
+          val syncHiveSucess = if (hiveSyncEnabled) {
+            log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
+            val fs = FSUtils.getFs(basePath.toString, jsc.hadoopConfiguration)
+            syncHive(basePath, fs, parameters)
+          } else {
+            true
+          }
+          client.close()
+          commitSuccess && syncHiveSucess
+        } else {
+          log.error(s"$operation failed with ${errorCount} errors :");
+          if (log.isTraceEnabled) {
+            log.trace("Printing out the top 100 errors")
+            writeStatuses.rdd.filter(ws => ws.hasErrors)
+              .take(100)
+              .foreach(ws => {
+                log.trace("Global error :", ws.getGlobalError)
+                if (ws.getErrors.size() > 0) {
+                  ws.getErrors.foreach(kt =>
+                    log.trace(s"Error for key: ${kt._1}", kt._2))
+                }
+              })
+          }
+          false
+        }
     }
+
     (writeSuccessful, common.util.Option.ofNullable(commitTime))
   }
 
   /**
-   * Add default options for unspecified write options keys.
-   *
-   * @param parameters
-   * @return
-   */
+    * Add default options for unspecified write options keys.
+    *
+    * @param parameters
+    * @return
+    */
   def parametersWithWriteDefaults(parameters: Map[String, String]): Map[String, String] = {
     Map(OPERATION_OPT_KEY -> DEFAULT_OPERATION_OPT_VAL,
       STORAGE_TYPE_OPT_KEY -> DEFAULT_STORAGE_TYPE_OPT_VAL,
diff --git a/hudi-spark/src/test/java/DataSourceTestUtils.java b/hudi-spark/src/test/java/DataSourceTestUtils.java
index 9f85b53..15fea33 100644
--- a/hudi-spark/src/test/java/DataSourceTestUtils.java
+++ b/hudi-spark/src/test/java/DataSourceTestUtils.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.stream.Collectors;
 import org.apache.hudi.common.TestRawTripPayload;
+import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 
@@ -42,4 +43,10 @@ public class DataSourceTestUtils {
     return records.stream().map(hr -> convertToString(hr)).filter(os -> os.isPresent()).map(os -> os.get())
         .collect(Collectors.toList());
   }
+
+  public static List<String> convertKeysToStringList(List<HoodieKey> keys) {
+    return keys.stream()
+        .map(hr -> "{\"_row_key\":\"" + hr.getRecordKey() + "\",\"partition\":\"" + hr.getPartitionPath() + "\"}")
+        .collect(Collectors.toList());
+  }
 }
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java
index 32ae31b..50ac65c 100644
--- a/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -18,6 +18,7 @@
 
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hudi.DataSourceReadOptions;
@@ -25,7 +26,9 @@ import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieDataSourceHelpers;
 import org.apache.hudi.NonpartitionedKeyGenerator;
 import org.apache.hudi.SimpleKeyGenerator;
+import org.apache.hudi.common.HoodieClientTestUtils;
 import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
@@ -105,16 +108,18 @@ public class HoodieJavaApp {
     HoodieTestDataGenerator dataGen = null;
     if (nonPartitionedTable) {
       // All data goes to base-path
-      dataGen = new HoodieTestDataGenerator(new String[] {""});
+      dataGen = new HoodieTestDataGenerator(new String[]{""});
     } else {
       dataGen = new HoodieTestDataGenerator();
     }
+    List<HoodieRecord> recordsSoFar = new ArrayList<>();
 
     /**
      * Commit with only inserts
      */
     // Generate some input..
-    List<String> records1 = DataSourceTestUtils.convertToStringList(dataGen.generateInserts("001"/* ignore */, 100));
+    recordsSoFar.addAll(dataGen.generateInserts("001"/* ignore */, 100));
+    List<String> records1 = DataSourceTestUtils.convertToStringList(recordsSoFar);
     Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
 
     // Save as hoodie dataset (copy on write)
@@ -152,7 +157,9 @@ public class HoodieJavaApp {
     /**
      * Commit that updates records
      */
-    List<String> records2 = DataSourceTestUtils.convertToStringList(dataGen.generateUpdates("002"/* ignore */, 100));
+    List<HoodieRecord> recordsToBeUpdated = dataGen.generateUpdates("002"/* ignore */, 100);
+    recordsSoFar.addAll(recordsToBeUpdated);
+    List<String> records2 = DataSourceTestUtils.convertToStringList(recordsToBeUpdated);
     Dataset<Row> inputDF2 = spark.read().json(jssc.parallelize(records2, 2));
     writer = inputDF2.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2")
         .option("hoodie.upsert.shuffle.parallelism", "2")
@@ -168,7 +175,31 @@ public class HoodieJavaApp {
     updateHiveSyncConfig(writer);
     writer.save(tablePath);
     String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
-    logger.info("Second commit at instant time :" + commitInstantTime1);
+    logger.info("Second commit at instant time :" + commitInstantTime2);
+
+    /**
+     * Commit that Deletes some records
+     */
+    List<String> deletes = DataSourceTestUtils.convertKeysToStringList(
+        HoodieClientTestUtils
+            .getKeysToDelete(HoodieClientTestUtils.getHoodieKeys(recordsSoFar), 20));
+    Dataset<Row> inputDF3 = spark.read().json(jssc.parallelize(deletes, 2));
+    writer = inputDF3.write().format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2")
+        .option("hoodie.upsert.shuffle.parallelism", "2")
+        .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY(), tableType) // Hoodie Table Type
+        .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), "delete")
+        .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
+        .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
+        .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "_row_key")
+        .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
+            nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
+                : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
+        .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append);
+
+    updateHiveSyncConfig(writer);
+    writer.save(tablePath);
+    String commitInstantTime3 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+    logger.info("Third commit at instant time :" + commitInstantTime3);
 
     /**
      * Read & do some queries
@@ -200,9 +231,6 @@ public class HoodieJavaApp {
 
   /**
    * Setup configs for syncing to hive
-   * 
-   * @param writer
-   * @return
    */
   private DataFrameWriter<Row> updateHiveSyncConfig(DataFrameWriter<Row> writer) {
     if (enableHiveSync) {
diff --git a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala
index 1f6cd7f..a2a1804 100644
--- a/hudi-spark/src/test/scala/TestDataSourceDefaults.scala
+++ b/hudi-spark/src/test/scala/TestDataSourceDefaults.scala
@@ -16,9 +16,10 @@
  */
 
 import org.apache.avro.generic.GenericRecord
+import org.apache.hudi.common.model.EmptyHoodieRecordPayload
 import org.apache.hudi.common.util.{Option, SchemaTestUtil, TypedProperties}
 import org.apache.hudi.exception.HoodieException
-import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, EmptyHoodieRecordPayload, OverwriteWithLatestAvroPayload, SimpleKeyGenerator}
+import org.apache.hudi.{ComplexKeyGenerator, DataSourceWriteOptions, OverwriteWithLatestAvroPayload, SimpleKeyGenerator}
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.scalatest.junit.AssertionsForJUnit