You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vb...@apache.org on 2020/08/05 14:50:23 UTC

[hudi] branch master updated: [HUDI-575] Spark Streaming with async compaction support (#1752)

This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a2429f  [HUDI-575] Spark Streaming with async compaction support (#1752)
7a2429f is described below

commit 7a2429f5ba45e3c3c6f4e167fdc5b79be716febb
Author: Balaji Varadarajan <ba...@robinhood.com>
AuthorDate: Wed Aug 5 07:50:15 2020 -0700

    [HUDI-575] Spark Streaming with async compaction support (#1752)
---
 .../apache/hudi/async/AbstractAsyncService.java    |  20 +-
 .../org/apache/hudi/async/AsyncCompactService.java | 161 ++++++++++++
 .../java/org/apache/hudi/client}/Compactor.java    |   4 +-
 .../common/testutils/HoodieTestDataGenerator.java  |  20 ++
 .../org/apache/hudi/integ/HoodieTestHiveBase.java  |   4 +-
 .../java/org/apache/hudi/integ/ITTestBase.java     |   2 +
 .../org/apache/hudi/integ/ITTestHoodieSanity.java  |  48 ++--
 hudi-spark/run_hoodie_app.sh                       |   2 +-
 ...un_hoodie_app.sh => run_hoodie_generate_app.sh} |   2 +-
 ...n_hoodie_app.sh => run_hoodie_streaming_app.sh} |   4 +-
 .../main/java/org/apache/hudi/DataSourceUtils.java |  11 +-
 .../async/SparkStreamingAsyncCompactService.java   |  35 +++
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   4 +
 .../main/scala/org/apache/hudi/DefaultSource.scala |   4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 277 ++++++++++++---------
 .../org/apache/hudi/HoodieStreamingSink.scala      | 104 ++++++--
 hudi-spark/src/test/java/HoodieJavaApp.java        |   3 +
 .../src/test/java/HoodieJavaStreamingApp.java      | 212 +++++++++++++---
 .../functional/HoodieSparkSqlWriterSuite.scala     |   3 +-
 .../apache/hudi/functional/TestDataSource.scala    |  41 ++-
 .../deltastreamer/HoodieDeltaStreamer.java         | 109 +-------
 .../deltastreamer/SchedulerConfGenerator.java      |   5 +-
 22 files changed, 763 insertions(+), 312 deletions(-)

diff --git a/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
index 7ac236d..714fa60 100644
--- a/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
+++ b/hudi-client/src/main/java/org/apache/hudi/async/AbstractAsyncService.java
@@ -48,9 +48,16 @@ public abstract class AbstractAsyncService implements Serializable {
   private transient ExecutorService executor;
   // Future tracking delta-sync/compaction
   private transient CompletableFuture future;
+  // Run in daemon mode
+  private final boolean runInDaemonMode;
 
   protected AbstractAsyncService() {
+    this(false);
+  }
+
+  protected AbstractAsyncService(boolean runInDaemonMode) {
     shutdownRequested = false;
+    this.runInDaemonMode = runInDaemonMode;
   }
 
   protected boolean isShutdownRequested() {
@@ -129,7 +136,11 @@ public abstract class AbstractAsyncService implements Serializable {
    */
   private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
     LOG.info("Submitting monitor thread !!");
-    Executors.newSingleThreadExecutor().submit(() -> {
+    Executors.newSingleThreadExecutor(r -> {
+      Thread t = new Thread(r, "Monitor Thread");
+      t.setDaemon(isRunInDaemonMode());
+      return t;
+    }).submit(() -> {
       boolean error = false;
       try {
         LOG.info("Monitoring thread(s) !!");
@@ -137,18 +148,21 @@ public abstract class AbstractAsyncService implements Serializable {
       } catch (ExecutionException ex) {
         LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
         error = true;
-        shutdown(false);
       } catch (InterruptedException ie) {
         LOG.error("Got interrupted Monitoring threads", ie);
         error = true;
-        shutdown(false);
       } finally {
         // Mark as shutdown
         shutdown = true;
         if (null != onShutdownCallback) {
           onShutdownCallback.apply(error);
         }
+        shutdown(false);
       }
     });
   }
+
+  public boolean isRunInDaemonMode() {
+    return runInDaemonMode;
+  }
 }
diff --git a/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java b/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java
new file mode 100644
index 0000000..1c39bb6
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/async/AsyncCompactService.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.Compactor;
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.IntStream;
+
+/**
+ * Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
+ */
+public class AsyncCompactService extends AbstractAsyncService {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);
+
+  /**
+   * This is the job pool used by async compaction.
+   */
+  public static final String COMPACT_POOL_NAME = "hoodiecompact";
+
+  private final int maxConcurrentCompaction;
+  private transient Compactor compactor;
+  private transient JavaSparkContext jssc;
+  private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
+  private transient ReentrantLock queueLock = new ReentrantLock();
+  private transient Condition consumed = queueLock.newCondition();
+
+  public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
+    this(jssc, client, false);
+  }
+
+  public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client, boolean runInDaemonMode) {
+    super(runInDaemonMode);
+    this.jssc = jssc;
+    this.compactor = new Compactor(client, jssc);
+    this.maxConcurrentCompaction = 1;
+  }
+
+  /**
+   * Enqueues new Pending compaction.
+   */
+  public void enqueuePendingCompaction(HoodieInstant instant) {
+    pendingCompactions.add(instant);
+  }
+
+  /**
+   * Wait till outstanding pending compactions reduces to the passed in value.
+   *
+   * @param numPendingCompactions Maximum pending compactions allowed
+   * @throws InterruptedException
+   */
+  public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
+    try {
+      queueLock.lock();
+      while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) {
+        consumed.await();
+      }
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  /**
+   * Fetch Next pending compaction if available.
+   *
+   * @return
+   * @throws InterruptedException
+   */
+  private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
+    LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
+    HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS);
+    if (instant != null) {
+      try {
+        queueLock.lock();
+        // Signal waiting thread
+        consumed.signal();
+      } finally {
+        queueLock.unlock();
+      }
+    }
+    return instant;
+  }
+
+  /**
+   * Start Compaction Service.
+   */
+  @Override
+  protected Pair<CompletableFuture, ExecutorService> startService() {
+    ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction,
+        r -> {
+        Thread t = new Thread(r, "async_compact_thread");
+        t.setDaemon(isRunInDaemonMode());
+        return t;
+      });
+    return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
+      try {
+        // Set Compactor Pool Name for allowing users to prioritize compaction
+        LOG.info("Setting Spark Pool name for compaction to " + COMPACT_POOL_NAME);
+        jssc.setLocalProperty("spark.scheduler.pool", COMPACT_POOL_NAME);
+
+        while (!isShutdownRequested()) {
+          final HoodieInstant instant = fetchNextCompactionInstant();
+
+          if (null != instant) {
+            LOG.info("Starting Compaction for instant " + instant);
+            compactor.compact(instant);
+            LOG.info("Finished Compaction for instant " + instant);
+          }
+        }
+        LOG.info("Compactor shutting down properly!!");
+      } catch (InterruptedException ie) {
+        LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
+      } catch (IOException e) {
+        LOG.error("Compactor executor failed", e);
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+      return true;
+    }, executor)).toArray(CompletableFuture[]::new)), executor);
+  }
+
+
+  /**
+   * Check whether compactor thread needs to be stopped.
+   * @return
+   */
+  protected boolean shouldStopCompactor() {
+    return false;
+  }
+}
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java b/hudi-client/src/main/java/org/apache/hudi/client/Compactor.java
similarity index 94%
rename from hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
rename to hudi-client/src/main/java/org/apache/hudi/client/Compactor.java
index 4c23537..1f11620 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/Compactor.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/Compactor.java
@@ -16,10 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.utilities.deltastreamer;
+package org.apache.hudi.client;
 
-import org.apache.hudi.client.HoodieWriteClient;
-import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 1ead5ff..6b6074b 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -599,6 +599,26 @@ public class HoodieTestDataGenerator {
     return updates;
   }
 
+  /**
+   * Generate update for each record in the dataset.
+   * @param instantTime
+   * @return
+   * @throws IOException
+   */
+  public List<HoodieRecord> generateUpdatesForAllRecords(String instantTime) {
+    List<HoodieRecord> updates = new ArrayList<>();
+    Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+    existingKeys.values().forEach(kp -> {
+      try {
+        HoodieRecord record = generateUpdateRecord(kp.key, instantTime);
+        updates.add(record);
+      } catch (IOException ioe) {
+        throw new HoodieIOException(ioe.getMessage(), ioe);
+      }
+    });
+    return updates;
+  }
+
   public List<HoodieRecord> generateUpdatesAsPerSchema(String commitTime, Integer n, String schemaStr) {
     return generateUniqueUpdatesStream(commitTime, n, schemaStr).collect(Collectors.toList());
   }
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
index 170ca60..95e4c01 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/HoodieTestHiveBase.java
@@ -72,8 +72,8 @@ public class HoodieTestHiveBase extends ITTestBase {
     }
 
     // Run Hoodie Java App
-    String cmd = String.format("%s %s --hive-sync --table-path %s  --hive-url %s  --table-type %s  --hive-table %s" +
-        " --commit-type %s  --table-name %s", HOODIE_JAVA_APP, "HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL,
+    String cmd = String.format("%s --hive-sync --table-path %s  --hive-url %s  --table-type %s  --hive-table %s" +
+        " --commit-type %s  --table-name %s", HOODIE_GENERATE_APP, hdfsUrl, HIVE_SERVER_JDBC_URL,
         tableType, hiveTableName, commitType, hoodieTableName);
     if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
       cmd = cmd + " --use-multi-partition-keys";
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
index a5e6ed9..3ffa0cf 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestBase.java
@@ -60,6 +60,8 @@ public abstract class ITTestBase {
   protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
   protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
   protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
+  protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_generate_app.sh";
+  protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh";
   protected static final String HUDI_HADOOP_BUNDLE =
       HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar";
   protected static final String HUDI_HIVE_SYNC_BUNDLE =
diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
index 4b586a3..c7787a7 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java
@@ -23,11 +23,12 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-
 /**
  * Smoke tests to run as part of verification.
  */
@@ -37,27 +38,31 @@ public class ITTestHoodieSanity extends ITTestBase {
     SINGLE_KEY_PARTITIONED, MULTI_KEYS_PARTITIONED, NON_PARTITIONED,
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
   /**
    * A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with single partition key data-set
    * and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
    * console.
    */
-  public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
+  public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable(String command) throws Exception {
     String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
-    testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED);
+    testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
+        PartitionType.SINGLE_KEY_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
   /**
    * A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie with multiple partition-keys
    * data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
    * in hive console.
    */
-  public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
+  public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable(String command) throws Exception {
     String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
-    testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.MULTI_KEYS_PARTITIONED);
+    testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.COPY_ON_WRITE.name(),
+        PartitionType.MULTI_KEYS_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
   }
 
@@ -73,27 +78,31 @@ public class ITTestHoodieSanity extends ITTestBase {
     dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
   /**
    * A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set
    * and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
    * console.
    */
-  public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception {
+  public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable(String command) throws Exception {
     String hiveTableName = "docker_hoodie_single_partition_key_mor_test";
-    testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.SINGLE_KEY_PARTITIONED);
+    testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
+        PartitionType.SINGLE_KEY_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
   }
 
-  @Test
+  @ParameterizedTest
+  @ValueSource(strings = { HOODIE_JAVA_APP, HOODIE_JAVA_STREAMING_APP })
   /**
    * A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with multiple partition-keys
    * data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
    * in hive console.
    */
-  public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable() throws Exception {
+  public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable(String command) throws Exception {
     String hiveTableName = "docker_hoodie_multi_partition_key_mor_test";
-    testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.MULTI_KEYS_PARTITIONED);
+    testRunHoodieJavaApp(command, hiveTableName, HoodieTableType.MERGE_ON_READ.name(),
+        PartitionType.MULTI_KEYS_PARTITIONED);
     dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
   }
 
@@ -114,7 +123,7 @@ public class ITTestHoodieSanity extends ITTestBase {
    * Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
    * spark-shell test-case
    */
-  public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType)
+  public void testRunHoodieJavaApp(String command, String hiveTableName, String tableType, PartitionType partitionType)
       throws Exception {
 
     String hdfsPath = "/" + hiveTableName;
@@ -137,13 +146,13 @@ public class ITTestHoodieSanity extends ITTestBase {
     // Run Hoodie Java App
     String cmd;
     if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
-      cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+      cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
           + " --table-type " + tableType + " --hive-table " + hiveTableName;
     } else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
-      cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+      cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
           + " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
     } else {
-      cmd = HOODIE_JAVA_APP + " HoodieJavaApp --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+      cmd = command + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
           + " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
     }
     executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);
@@ -182,6 +191,11 @@ public class ITTestHoodieSanity extends ITTestBase {
         "Expecting 280 rows to be present in the new table");
   }
 
+  public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType)
+      throws Exception {
+    testRunHoodieJavaApp(HOODIE_JAVA_APP, hiveTableName, tableType, partitionType);
+  }
+
   private void dropHiveTables(String hiveTableName, String tableType) throws Exception {
     if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
       executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_app.sh
index e2acc6c..7c63e74 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_app.sh
@@ -37,4 +37,4 @@ fi
 OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
 #TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
 echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
-java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
+java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaApp "$@"
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_generate_app.sh
similarity index 98%
copy from hudi-spark/run_hoodie_app.sh
copy to hudi-spark/run_hoodie_generate_app.sh
index e2acc6c..a4b4090 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_generate_app.sh
@@ -37,4 +37,4 @@ fi
 OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
 #TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
 echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
-java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
+java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaGenerateApp "$@"
diff --git a/hudi-spark/run_hoodie_app.sh b/hudi-spark/run_hoodie_streaming_app.sh
similarity index 95%
copy from hudi-spark/run_hoodie_app.sh
copy to hudi-spark/run_hoodie_streaming_app.sh
index e2acc6c..01f1a4e 100755
--- a/hudi-spark/run_hoodie_app.sh
+++ b/hudi-spark/run_hoodie_streaming_app.sh
@@ -36,5 +36,5 @@ fi
 
 OTHER_JARS=`ls -1 $DIR/target/lib/*jar | grep -v '*avro*-1.' | tr '\n' ':'`
 #TODO - Need to move TestDataGenerator and HoodieJavaApp out of tests
-echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS $@"
-java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS "$@"
+echo "Running command : java -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaStreamingApp $@"
+java -Xmx1G -cp $DIR/target/test-classes/:$DIR/../hudi-client/target/test-classes/:${HADOOP_CONF_DIR}:$HUDI_JAR:${CLIENT_JAR}:$OTHER_JARS HoodieJavaStreamingApp "$@"
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index 3345204..5647e65 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -226,11 +226,16 @@ public class DataSourceUtils {
   }
 
   public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
-                                                     String tblName, Map<String, String> parameters) {
-
+      String tblName, Map<String, String> parameters) {
+    boolean asyncCompact = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY()));
     // inline compaction is on by default for MOR
-    boolean inlineCompact = parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY())
+    boolean inlineCompact = !asyncCompact && parameters.get(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY())
         .equals(DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL());
+    return createHoodieClient(jssc, schemaStr, basePath, tblName, parameters, inlineCompact);
+  }
+
+  public static HoodieWriteClient createHoodieClient(JavaSparkContext jssc, String schemaStr, String basePath,
+      String tblName, Map<String, String> parameters, boolean inlineCompact) {
 
     // insert/bulk-insert combining to be true, if filtering for duplicates
     boolean combineInserts = Boolean.parseBoolean(parameters.get(DataSourceWriteOptions.INSERT_DROP_DUPS_OPT_KEY()));
diff --git a/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java b/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java
new file mode 100644
index 0000000..ae0ad73
--- /dev/null
+++ b/hudi-spark/src/main/java/org/apache/hudi/async/SparkStreamingAsyncCompactService.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.async;
+
+import org.apache.hudi.client.HoodieWriteClient;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Async Compaction Service used by Structured Streaming. Here, async compaction is run in daemon mode to prevent
+ * blocking shutting down the Spark application.
+ */
+public class SparkStreamingAsyncCompactService extends AsyncCompactService {
+
+  private static final long serialVersionUID = 1L;
+
+  public SparkStreamingAsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
+    super(jssc, client, true);
+  }
+}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 8a8f87f..c505ec4 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -281,4 +281,8 @@ object DataSourceWriteOptions {
   val DEFAULT_HIVE_ASSUME_DATE_PARTITION_OPT_VAL = "false"
   val DEFAULT_USE_PRE_APACHE_INPUT_FORMAT_OPT_VAL = "false"
   val DEFAULT_HIVE_USE_JDBC_OPT_VAL = "true"
+
+  // Async Compaction - Enabled by default for MOR
+  val ASYNC_COMPACT_ENABLE_KEY = "hoodie.datasource.compaction.async.enable"
+  val DEFAULT_ASYNC_COMPACT_ENABLE_VAL = "true"
 }
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
index fbdd4ea..e26c1c8 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -18,6 +18,8 @@
 package org.apache.hudi
 
 import org.apache.hudi.DataSourceReadOptions._
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hadoop.HoodieROTablePathFilter
 import org.apache.log4j.LogManager
@@ -103,10 +105,8 @@ class DefaultSource extends RelationProvider
                               mode: SaveMode,
                               optParams: Map[String, String],
                               df: DataFrame): BaseRelation = {
-
     val parameters = HoodieSparkSqlWriter.parametersWithWriteDefaults(optParams)
     HoodieSparkSqlWriter.write(sqlContext, mode, parameters, df)
-
     new HudiEmptyRelation(sqlContext, df.schema)
   }
 
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 05ef863..479005b 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -21,6 +21,7 @@ import java.util
 
 import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hudi.DataSourceWriteOptions._
@@ -29,7 +30,7 @@ import org.apache.hudi.client.{HoodieWriteClient, WriteStatus}
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{HoodieRecordPayload, HoodieTableType}
-import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieException
@@ -49,7 +50,13 @@ private[hudi] object HoodieSparkSqlWriter {
   def write(sqlContext: SQLContext,
             mode: SaveMode,
             parameters: Map[String, String],
-            df: DataFrame): (Boolean, common.util.Option[String]) = {
+            df: DataFrame,
+            hoodieTableConfig: Option[HoodieTableConfig] = Option.empty,
+            hoodieWriteClient: Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty,
+            asyncCompactionTriggerFn: Option[Function1[HoodieWriteClient[HoodieRecordPayload[Nothing]], Unit]] = Option.empty
+           )
+  : (Boolean, common.util.Option[String], common.util.Option[String],
+    HoodieWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = {
 
     val sparkContext = sqlContext.sparkContext
     val path = parameters.get("path")
@@ -84,113 +91,134 @@ private[hudi] object HoodieSparkSqlWriter {
     val instantTime = HoodieActiveTimeline.createNewInstantTime()
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     var exists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME))
-
-    if (exists && mode == SaveMode.Append) {
-      val existingTableName = new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig.getTableName
-      if (!existingTableName.equals(tblName)) {
-        throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
-      }
+    var tableConfig : HoodieTableConfig = if (exists) {
+      hoodieTableConfig.getOrElse(
+        new HoodieTableMetaClient(sparkContext.hadoopConfiguration, path.get).getTableConfig)
+    } else {
+      null
     }
 
-    val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
-      if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
-      // register classes & schemas
-      val structName = s"${tblName}_record"
-      val nameSpace = s"hoodie.${tblName}"
-      sparkContext.getConf.registerKryoClasses(
-        Array(classOf[org.apache.avro.generic.GenericData],
-          classOf[org.apache.avro.Schema]))
-      val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
-      sparkContext.getConf.registerAvroSchemas(schema)
-      log.info(s"Registered avro schema : ${schema.toString(true)}")
-
-      // Convert to RDD[HoodieRecord]
-      val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-      val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-      val hoodieAllIncomingRecords = genericRecords.map(gr => {
-        val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
-            .asInstanceOf[Comparable[_]]
-        DataSourceUtils.createHoodieRecord(gr,
-          orderingVal, keyGenerator.getKey(gr), parameters(PAYLOAD_CLASS_OPT_KEY))
-      }).toJavaRDD()
-
-      // Handle various save modes
-      if (mode == SaveMode.ErrorIfExists && exists) {
-        throw new HoodieException(s"hoodie table at $basePath already exists.")
-      }
-      if (mode == SaveMode.Ignore && exists) {
-        log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
-        (true, common.util.Option.empty())
-      }
-      if (mode == SaveMode.Overwrite && exists) {
-        log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
-        fs.delete(basePath, true)
-        exists = false
-      }
-
-      // Create the table if not present
-      if (!exists) {
-        //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
-        HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration, path.get, HoodieTableType.valueOf(tableType),
-          tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
-      }
-
-      // Create a HoodieWriteClient & issue the write.
-      val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get, tblName,
-        mapAsJavaMap(parameters)
-      )
-
-      val hoodieRecords =
-        if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-          DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
-        } else {
-          hoodieAllIncomingRecords
-        }
-
-      if (hoodieRecords.isEmpty()) {
-        log.info("new batch has no new records, skipping...")
-        (true, common.util.Option.empty())
-      }
-      client.startCommitWithTime(instantTime)
-      val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
-      (writeStatuses, client)
+    if (mode == SaveMode.Ignore && exists) {
+      log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
+      (false, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
     } else {
-
-      // Handle save modes
-      if (mode != SaveMode.Append) {
-        throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
+      if (exists && mode == SaveMode.Append) {
+        val existingTableName = tableConfig.getTableName
+        if (!existingTableName.equals(tblName)) {
+          throw new HoodieException(s"hoodie table with name $existingTableName already exist at $basePath")
+        }
       }
+      val (writeStatuses, writeClient: HoodieWriteClient[HoodieRecordPayload[Nothing]]) =
+        if (!operation.equalsIgnoreCase(DELETE_OPERATION_OPT_VAL)) {
+          // register classes & schemas
+          val structName = s"${tblName}_record"
+          val nameSpace = s"hoodie.${tblName}"
+          sparkContext.getConf.registerKryoClasses(
+            Array(classOf[org.apache.avro.generic.GenericData],
+              classOf[org.apache.avro.Schema]))
+          val schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)
+          sparkContext.getConf.registerAvroSchemas(schema)
+          log.info(s"Registered avro schema : ${schema.toString(true)}")
+
+          // Convert to RDD[HoodieRecord]
+          val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
+          val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+          val hoodieAllIncomingRecords = genericRecords.map(gr => {
+            val orderingVal = HoodieAvroUtils.getNestedFieldVal(gr, parameters(PRECOMBINE_FIELD_OPT_KEY), false)
+              .asInstanceOf[Comparable[_]]
+            DataSourceUtils.createHoodieRecord(gr,
+              orderingVal, keyGenerator.getKey(gr),
+              parameters(PAYLOAD_CLASS_OPT_KEY))
+          }).toJavaRDD()
+
+          // Handle various save modes
+          if (mode == SaveMode.ErrorIfExists && exists) {
+            throw new HoodieException(s"hoodie table at $basePath already exists.")
+          }
+
+          if (mode == SaveMode.Overwrite && exists) {
+            log.warn(s"hoodie table at $basePath already exists. Deleting existing data & overwriting with new data.")
+            fs.delete(basePath, true)
+            exists = false
+          }
+
+          // Create the table if not present
+          if (!exists) {
+            //FIXME(bootstrap): bootstrapIndexClass needs to be set when bootstrap index class is integrated.
+            val tableMetaClient = HoodieTableMetaClient.initTableTypeWithBootstrap(sparkContext.hadoopConfiguration,
+              path.get, HoodieTableType.valueOf(tableType),
+              tblName, "archived", parameters(PAYLOAD_CLASS_OPT_KEY), null, null, null)
+            tableConfig = tableMetaClient.getTableConfig
+          }
+
+          // Create a HoodieWriteClient & issue the write.
+          val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, schema.toString, path.get,
+            tblName, mapAsJavaMap(parameters)
+          )).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+
+          if (asyncCompactionTriggerFn.isDefined &&
+            isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
+            asyncCompactionTriggerFn.get.apply(client)
+          }
+
+          val hoodieRecords =
+            if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
+              DataSourceUtils.dropDuplicates(jsc, hoodieAllIncomingRecords, mapAsJavaMap(parameters))
+            } else {
+              hoodieAllIncomingRecords
+            }
 
-      val structName = s"${tblName}_record"
-      val nameSpace = s"hoodie.${tblName}"
-      sparkContext.getConf.registerKryoClasses(
-        Array(classOf[org.apache.avro.generic.GenericData],
-          classOf[org.apache.avro.Schema]))
-
-      // Convert to RDD[HoodieKey]
-      val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
-      val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
-      val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
-
-      if (!exists) {
-        throw new HoodieException(s"hoodie table at $basePath does not exist")
-      }
+          if (hoodieRecords.isEmpty()) {
+            log.info("new batch has no new records, skipping...")
+            (true, common.util.Option.empty())
+          }
+          client.startCommitWithTime(instantTime)
+          val writeStatuses = DataSourceUtils.doWriteOperation(client, hoodieRecords, instantTime, operation)
+          (writeStatuses, client)
+        } else {
 
-      // Create a HoodieWriteClient & issue the delete.
-      val client = DataSourceUtils.createHoodieClient(jsc,
-        Schema.create(Schema.Type.NULL).toString, path.get, tblName,
-        mapAsJavaMap(parameters)
-      )
+          // Handle save modes
+          if (mode != SaveMode.Append) {
+            throw new HoodieException(s"Append is the only save mode applicable for $operation operation")
+          }
+
+          val structName = s"${tblName}_record"
+          val nameSpace = s"hoodie.${tblName}"
+          sparkContext.getConf.registerKryoClasses(
+            Array(classOf[org.apache.avro.generic.GenericData],
+              classOf[org.apache.avro.Schema]))
+
+          // Convert to RDD[HoodieKey]
+          val keyGenerator = DataSourceUtils.createKeyGenerator(toProperties(parameters))
+          val genericRecords: RDD[GenericRecord] = AvroConversionUtils.createRdd(df, structName, nameSpace)
+          val hoodieKeysToDelete = genericRecords.map(gr => keyGenerator.getKey(gr)).toJavaRDD()
+
+          if (!exists) {
+            throw new HoodieException(s"hoodie table at $basePath does not exist")
+          }
+
+          // Create a HoodieWriteClient & issue the delete.
+          val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
+            Schema.create(Schema.Type.NULL).toString, path.get, tblName,
+            mapAsJavaMap(parameters))).asInstanceOf[HoodieWriteClient[HoodieRecordPayload[Nothing]]]
+
+          if (asyncCompactionTriggerFn.isDefined &&
+            isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())) {
+            asyncCompactionTriggerFn.get.apply(client)
+          }
+
+          // Issue deletes
+          client.startCommitWithTime(instantTime)
+          val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
+          (writeStatuses, client)
+        }
 
-      // Issue deletes
-      client.startCommitWithTime(instantTime)
-      val writeStatuses = DataSourceUtils.doDeleteOperation(client, hoodieKeysToDelete, instantTime)
-      (writeStatuses, client)
+      // Check for errors and commit the write.
+      val (writeSuccessful, compactionInstant) =
+        commitAndPerformPostOperations(writeStatuses, parameters, writeClient, tableConfig, instantTime, basePath,
+          operation, jsc)
+      (writeSuccessful, common.util.Option.ofNullable(instantTime), compactionInstant, writeClient, tableConfig)
     }
-
-    // Check for errors and commit the write.
-    val writeSuccessful = checkWriteStatus(writeStatuses, parameters, writeClient, instantTime, basePath, operation, jsc)
-    (writeSuccessful, common.util.Option.ofNullable(instantTime))
   }
 
   /**
@@ -222,7 +250,8 @@ private[hudi] object HoodieSparkSqlWriter {
       HIVE_PARTITION_FIELDS_OPT_KEY -> DEFAULT_HIVE_PARTITION_FIELDS_OPT_VAL,
       HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
       HIVE_STYLE_PARTITIONING_OPT_KEY -> DEFAULT_HIVE_STYLE_PARTITIONING_OPT_VAL,
-      HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL
+      HIVE_USE_JDBC_OPT_KEY -> DEFAULT_HIVE_USE_JDBC_OPT_VAL,
+      ASYNC_COMPACT_ENABLE_KEY -> DEFAULT_ASYNC_COMPACT_ENABLE_VAL
     ) ++ translateStorageTypeToTableType(parameters)
   }
 
@@ -258,13 +287,14 @@ private[hudi] object HoodieSparkSqlWriter {
     hiveSyncConfig
   }
 
-  private def checkWriteStatus(writeStatuses: JavaRDD[WriteStatus],
-                               parameters: Map[String, String],
-                               client: HoodieWriteClient[_],
-                               instantTime: String,
-                               basePath: Path,
-                               operation: String,
-                               jsc: JavaSparkContext): Boolean = {
+  private def commitAndPerformPostOperations(writeStatuses: JavaRDD[WriteStatus],
+                                             parameters: Map[String, String],
+                                             client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
+                                             tableConfig: HoodieTableConfig,
+                                             instantTime: String,
+                                             basePath: Path,
+                                             operation: String,
+                                             jsc: JavaSparkContext): (Boolean, common.util.Option[java.lang.String]) = {
     val errorCount = writeStatuses.rdd.filter(ws => ws.hasErrors).count()
     if (errorCount == 0) {
       log.info("No errors. Proceeding to commit the write.")
@@ -284,6 +314,15 @@ private[hudi] object HoodieSparkSqlWriter {
         log.info("Commit " + instantTime + " failed!")
       }
 
+      val asyncCompactionEnabled = isAsyncCompactionEnabled(client, tableConfig, parameters, jsc.hadoopConfiguration())
+      val compactionInstant : common.util.Option[java.lang.String] =
+      if (asyncCompactionEnabled) {
+        client.scheduleCompaction(common.util.Option.of(new util.HashMap[String, String](mapAsJavaMap(metaMap))))
+      } else {
+        common.util.Option.empty()
+      }
+
+      log.info(s"Compaction Scheduled is $compactionInstant")
       val hiveSyncEnabled = parameters.get(HIVE_SYNC_ENABLED_OPT_KEY).exists(r => r.toBoolean)
       val syncHiveSucess = if (hiveSyncEnabled) {
         log.info("Syncing to Hive Metastore (URL: " + parameters(HIVE_URL_OPT_KEY) + ")")
@@ -292,8 +331,12 @@ private[hudi] object HoodieSparkSqlWriter {
       } else {
         true
       }
-      client.close()
-      commitSuccess && syncHiveSucess
+
+      log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
+      if (!asyncCompactionEnabled) {
+        client.close()
+      }
+      (commitSuccess && syncHiveSucess, compactionInstant)
     } else {
       log.error(s"$operation failed with $errorCount errors :")
       if (log.isTraceEnabled) {
@@ -308,6 +351,18 @@ private[hudi] object HoodieSparkSqlWriter {
             }
           })
       }
+      (false, common.util.Option.empty())
+    }
+  }
+
+  private def isAsyncCompactionEnabled(client: HoodieWriteClient[HoodieRecordPayload[Nothing]],
+                                       tableConfig: HoodieTableConfig,
+                                       parameters: Map[String, String], configuration: Configuration) : Boolean = {
+    log.info(s"Config.isInlineCompaction ? ${client.getConfig.isInlineCompaction}")
+    if (!client.getConfig.isInlineCompaction
+      && parameters.get(ASYNC_COMPACT_ENABLE_KEY).exists(r => r.toBoolean)) {
+      tableConfig.getTableType == HoodieTableType.MERGE_ON_READ
+    } else {
       false
     }
   }
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
index 9f18a2e..1600ab0 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieStreamingSink.scala
@@ -16,13 +16,25 @@
  */
 package org.apache.hudi
 
+import java.lang
+import java.util.function.{Function, Supplier}
+
+import org.apache.hudi.async.{AsyncCompactService, SparkStreamingAsyncCompactService}
+import org.apache.hudi.client.HoodieWriteClient
+import org.apache.hudi.common.model.HoodieRecordPayload
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.timeline.HoodieInstant.State
+import org.apache.hudi.common.table.timeline.{HoodieInstant, HoodieTimeline}
+import org.apache.hudi.common.util.CompactionUtils
 import org.apache.hudi.exception.HoodieCorruptedDataException
 import org.apache.log4j.LogManager
+import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.execution.streaming.Sink
-import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryListener}
 import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
 import scala.util.{Failure, Success, Try}
+import scala.collection.JavaConversions._
 
 class HoodieStreamingSink(sqlContext: SQLContext,
                           options: Map[String, String],
@@ -38,6 +50,8 @@ class HoodieStreamingSink(sqlContext: SQLContext,
   private val retryIntervalMs = options(DataSourceWriteOptions.STREAMING_RETRY_INTERVAL_MS_OPT_KEY).toLong
   private val ignoreFailedBatch = options(DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY).toBoolean
 
+  private var isAsyncCompactorServiceShutdownAbnormally = false
+
   private val mode =
     if (outputMode == OutputMode.Append()) {
       SaveMode.Append
@@ -45,39 +59,54 @@ class HoodieStreamingSink(sqlContext: SQLContext,
       SaveMode.Overwrite
     }
 
-  override def addBatch(batchId: Long, data: DataFrame): Unit = {
+  private var asyncCompactorService : AsyncCompactService = _
+  private var writeClient : Option[HoodieWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty
+  private var hoodieTableConfig : Option[HoodieTableConfig] = Option.empty
+
+  override def addBatch(batchId: Long, data: DataFrame): Unit = this.synchronized {
+    if (isAsyncCompactorServiceShutdownAbnormally)  {
+      throw new IllegalStateException("Async Compactor shutdown unexpectedly")
+    }
+
     retry(retryCnt, retryIntervalMs)(
       Try(
         HoodieSparkSqlWriter.write(
-          sqlContext,
-          mode,
-          options,
-          data)
+          sqlContext, mode, options, data, hoodieTableConfig, writeClient, Some(triggerAsyncCompactor))
       ) match {
-        case Success((true, commitOps)) =>
+        case Success((true, commitOps, compactionInstantOps, client, tableConfig)) =>
           log.info(s"Micro batch id=$batchId succeeded"
             + (commitOps.isPresent match {
                 case true => s" for commit=${commitOps.get()}"
                 case _ => s" with no new commits"
             }))
-          Success((true, commitOps))
+          writeClient = Some(client)
+          hoodieTableConfig = Some(tableConfig)
+          if (compactionInstantOps.isPresent) {
+            asyncCompactorService.enqueuePendingCompaction(
+              new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, compactionInstantOps.get()))
+          }
+          Success((true, commitOps, compactionInstantOps))
         case Failure(e) =>
           // clean up persist rdds in the write process
           data.sparkSession.sparkContext.getPersistentRDDs
             .foreach {
               case (id, rdd) =>
-                rdd.unpersist()
+                try {
+                  rdd.unpersist()
+                } catch {
+                  case t: Exception => log.warn("Got excepting trying to unpersist rdd", t)
+                }
             }
-          log.error(s"Micro batch id=$batchId threw following expection: ", e)
+          log.error(s"Micro batch id=$batchId threw following exception: ", e)
           if (ignoreFailedBatch) {
             log.info(s"Ignore the exception and move on streaming as per " +
               s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration")
-            Success((true, None))
+            Success((true, None, None))
           } else {
             if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
             Failure(e)
           }
-        case Success((false, commitOps)) =>
+        case Success((false, commitOps, compactionInstantOps, client, tableConfig)) =>
           log.error(s"Micro batch id=$batchId ended up with errors"
             + (commitOps.isPresent match {
               case true =>  s" for commit=${commitOps.get()}"
@@ -86,7 +115,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
           if (ignoreFailedBatch) {
             log.info(s"Ignore the errors and move on streaming as per " +
               s"${DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH_OPT_KEY} configuration")
-            Success((true, None))
+            Success((true, None, None))
           } else {
             if (retryCnt > 1) log.info(s"Retrying the failed micro batch id=$batchId ...")
             Failure(new HoodieCorruptedDataException(s"Micro batch id=$batchId ended up with errors"))
@@ -100,6 +129,7 @@ class HoodieStreamingSink(sqlContext: SQLContext,
           // spark sometimes hangs upon exceptions and keep on hold of the executors
           // this is to force exit upon errors / exceptions and release all executors
           // will require redeployment / supervise mode to restart the streaming
+          reset(true)
           System.exit(1)
         }
       case Success(_) =>
@@ -112,11 +142,55 @@ class HoodieStreamingSink(sqlContext: SQLContext,
   @annotation.tailrec
   private def retry[T](n: Int, waitInMillis: Long)(fn: => Try[T]): Try[T] = {
     fn match {
-      case x: util.Success[T] => x
+      case x: Success[T] =>
+        x
       case _ if n > 1 =>
         Thread.sleep(waitInMillis)
         retry(n - 1, waitInMillis * 2)(fn)
-      case f => f
+      case f =>
+        reset(false)
+        f
+    }
+  }
+
+  protected def triggerAsyncCompactor(client: HoodieWriteClient[HoodieRecordPayload[Nothing]]): Unit = {
+    if (null == asyncCompactorService) {
+      log.info("Triggering Async compaction !!")
+      asyncCompactorService = new SparkStreamingAsyncCompactService(new JavaSparkContext(sqlContext.sparkContext),
+        client)
+      asyncCompactorService.start(new Function[java.lang.Boolean, java.lang.Boolean] {
+        override def apply(errored: lang.Boolean): lang.Boolean = {
+          log.info(s"Async Compactor shutdown. Errored ? $errored")
+          isAsyncCompactorServiceShutdownAbnormally = errored
+          reset(false)
+          log.info("Done resetting write client.")
+          true
+        }
+      })
+
+      // Add Shutdown Hook
+      Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
+        override def run(): Unit = reset(true)
+      }))
+
+      // First time, scan .hoodie folder and get all pending compactions
+      val metaClient = new HoodieTableMetaClient(sqlContext.sparkContext.hadoopConfiguration,
+        client.getConfig.getBasePath)
+      val pendingInstants :java.util.List[HoodieInstant] =
+        CompactionUtils.getPendingCompactionInstantTimes(metaClient)
+      pendingInstants.foreach((h : HoodieInstant) => asyncCompactorService.enqueuePendingCompaction(h))
+    }
+  }
+
+  private def reset(force: Boolean) : Unit = this.synchronized {
+    if (asyncCompactorService != null) {
+      asyncCompactorService.shutdown(force)
+      asyncCompactorService = null
+    }
+
+    if (writeClient.isDefined) {
+      writeClient.get.close()
+      writeClient = Option.empty
     }
   }
 }
diff --git a/hudi-spark/src/test/java/HoodieJavaApp.java b/hudi-spark/src/test/java/HoodieJavaApp.java
index 2cf36f9..6eda051 100644
--- a/hudi-spark/src/test/java/HoodieJavaApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaApp.java
@@ -151,6 +151,7 @@ public class HoodieJavaApp {
         .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(),
             nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
                 : SimpleKeyGenerator.class.getCanonicalName())
+        .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false")
         // This will remove any existing data at path below, and create a
         .mode(SaveMode.Overwrite);
 
@@ -177,6 +178,7 @@ public class HoodieJavaApp {
             nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
                 : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
+        .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false")
         .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append);
 
     updateHiveSyncConfig(writer);
@@ -202,6 +204,7 @@ public class HoodieJavaApp {
             nonPartitionedTable ? NonpartitionedKeyGenerator.class.getCanonicalName()
                 : SimpleKeyGenerator.class.getCanonicalName()) // Add Key Extractor
         .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
+        .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "false")
         .option(HoodieWriteConfig.TABLE_NAME, tableName).mode(SaveMode.Append);
 
     updateHiveSyncConfig(writer);
diff --git a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
index 977ae09..500189d 100644
--- a/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
+++ b/hudi-spark/src/test/java/HoodieJavaStreamingApp.java
@@ -16,12 +16,18 @@
  * limitations under the License.
  */
 
+import java.util.stream.Collectors;
 import org.apache.hudi.DataSourceReadOptions;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.HoodieDataSourceHelpers;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 
 import com.beust.jcommander.JCommander;
@@ -43,6 +49,7 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import org.apache.spark.sql.streaming.StreamingQuery;
 
 import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
 
@@ -52,14 +59,14 @@ import static org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrin
 public class HoodieJavaStreamingApp {
 
   @Parameter(names = {"--table-path", "-p"}, description = "path for Hoodie sample table")
-  private String tablePath = "file:///tmp/hoodie/streaming/sample-table";
+  private String tablePath = "/tmp/hoodie/streaming/sample-table";
 
   @Parameter(names = {"--streaming-source-path", "-ssp"}, description = "path for streaming source file folder")
-  private String streamingSourcePath = "file:///tmp/hoodie/streaming/source";
+  private String streamingSourcePath = "/tmp/hoodie/streaming/source";
 
   @Parameter(names = {"--streaming-checkpointing-path", "-scp"},
       description = "path for streaming checking pointing folder")
-  private String streamingCheckpointingPath = "file:///tmp/hoodie/streaming/checkpoint";
+  private String streamingCheckpointingPath = "/tmp/hoodie/streaming/checkpoint";
 
   @Parameter(names = {"--streaming-duration-in-ms", "-sdm"},
       description = "time in millisecond for the streaming duration")
@@ -106,7 +113,15 @@ public class HoodieJavaStreamingApp {
       cmd.usage();
       System.exit(1);
     }
-    cli.run();
+    int errStatus = 0;
+    try {
+      cli.run();
+    } catch (Exception ex) {
+      LOG.error("Got error running app ", ex);
+      errStatus = -1;
+    } finally {
+      System.exit(errStatus);
+    }
   }
 
   /**
@@ -132,38 +147,118 @@ public class HoodieJavaStreamingApp {
     List<String> records1 = recordsToStrings(dataGen.generateInserts("001", 100));
     Dataset<Row> inputDF1 = spark.read().json(jssc.parallelize(records1, 2));
 
-    List<String> records2 = recordsToStrings(dataGen.generateUpdates("002", 100));
-
+    List<String> records2 = recordsToStrings(dataGen.generateUpdatesForAllRecords("002"));
     Dataset<Row> inputDF2 = spark.read().json(jssc.parallelize(records2, 2));
 
-    // setup the input for streaming
-    Dataset<Row> streamingInput = spark.readStream().schema(inputDF1.schema()).json(streamingSourcePath);
 
+    String ckptPath = streamingCheckpointingPath + "/stream1";
+    String srcPath = streamingSourcePath + "/stream1";
+    fs.mkdirs(new Path(ckptPath));
+    fs.mkdirs(new Path(srcPath));
+
+    // setup the input for streaming
+    Dataset<Row> streamingInput = spark.readStream().schema(inputDF1.schema()).json(srcPath + "/*");
 
     // start streaming and showing
     ExecutorService executor = Executors.newFixedThreadPool(2);
+    int numInitialCommits = 0;
 
     // thread for spark strucutured streaming
-    Future<Void> streamFuture = executor.submit(() -> {
-      LOG.info("===== Streaming Starting =====");
-      stream(streamingInput);
-      LOG.info("===== Streaming Ends =====");
-      return null;
-    });
-
-    // thread for adding data to the streaming source and showing results over time
-    Future<Void> showFuture = executor.submit(() -> {
-      LOG.info("===== Showing Starting =====");
-      show(spark, fs, inputDF1, inputDF2);
-      LOG.info("===== Showing Ends =====");
-      return null;
-    });
-
-    // let the threads run
-    streamFuture.get();
-    showFuture.get();
-
-    executor.shutdown();
+    try {
+      Future<Void> streamFuture = executor.submit(() -> {
+        LOG.info("===== Streaming Starting =====");
+        stream(streamingInput, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL(), ckptPath);
+        LOG.info("===== Streaming Ends =====");
+        return null;
+      });
+
+      // thread for adding data to the streaming source and showing results over time
+      Future<Integer> showFuture = executor.submit(() -> {
+        LOG.info("===== Showing Starting =====");
+        int numCommits = addInputAndValidateIngestion(spark, fs,  srcPath,0, 100, inputDF1, inputDF2, true);
+        LOG.info("===== Showing Ends =====");
+        return numCommits;
+      });
+
+      // let the threads run
+      streamFuture.get();
+      numInitialCommits = showFuture.get();
+    } finally {
+      executor.shutdownNow();
+    }
+
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jssc.hadoopConfiguration(), tablePath);
+    if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      // Ensure we have successfully completed one compaction commit
+      ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() == 1);
+    } else {
+      ValidationUtils.checkArgument(metaClient.getActiveTimeline().getCommitTimeline().getInstants().count() >= 1);
+    }
+
+    // Deletes Stream
+    // Need to restart application to ensure spark does not assume there are multiple streams active.
+    spark.close();
+    SparkSession newSpark = SparkSession.builder().appName("Hoodie Spark Streaming APP")
+        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").master("local[1]").getOrCreate();
+    jssc = new JavaSparkContext(newSpark.sparkContext());
+    String ckptPath2 = streamingCheckpointingPath + "/stream2";
+    String srcPath2 = srcPath + "/stream2";
+    fs.mkdirs(new Path(ckptPath2));
+    fs.mkdirs(new Path(srcPath2));
+    Dataset<Row> delStreamingInput = newSpark.readStream().schema(inputDF1.schema()).json(srcPath2 + "/*");
+    List<String> deletes = recordsToStrings(dataGen.generateUniqueUpdates("002", 20));
+    Dataset<Row> inputDF3 = newSpark.read().json(jssc.parallelize(deletes, 2));
+    executor = Executors.newFixedThreadPool(2);
+
+    // thread for spark strucutured streaming
+    try {
+      Future<Void> streamFuture = executor.submit(() -> {
+        LOG.info("===== Streaming Starting =====");
+        stream(delStreamingInput, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL(), ckptPath2);
+        LOG.info("===== Streaming Ends =====");
+        return null;
+      });
+
+      final int numCommits = numInitialCommits;
+      // thread for adding data to the streaming source and showing results over time
+      Future<Void> showFuture = executor.submit(() -> {
+        LOG.info("===== Showing Starting =====");
+        addInputAndValidateIngestion(newSpark, fs, srcPath2, numCommits, 80, inputDF3, null, false);
+        LOG.info("===== Showing Ends =====");
+        return null;
+      });
+
+      // let the threads run
+      streamFuture.get();
+      showFuture.get();
+    } finally {
+      executor.shutdown();
+    }
+  }
+
+  private void waitTillNCommits(FileSystem fs, int numCommits, int timeoutSecs, int sleepSecsAfterEachRun)
+      throws InterruptedException {
+    long beginTime = System.currentTimeMillis();
+    long currTime = beginTime;
+    long timeoutMsecs = timeoutSecs * 1000;
+
+    while ((currTime - beginTime) < timeoutMsecs) {
+      try {
+        HoodieTimeline timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath);
+        LOG.info("Timeline :" + timeline.getInstants().collect(Collectors.toList()));
+        if (timeline.countInstants() >= numCommits) {
+          return;
+        }
+        HoodieTableMetaClient metaClient = new HoodieTableMetaClient(fs.getConf(), tablePath, true);
+        System.out.println("Instants :" + metaClient.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      } catch (TableNotFoundException te) {
+        LOG.info("Got table not found exception. Retrying");
+      } finally {
+        Thread.sleep(sleepSecsAfterEachRun * 1000);
+        currTime = System.currentTimeMillis();
+      }
+    }
+    throw new IllegalStateException("Timedout waiting for " + numCommits + " commits to appear in " + tablePath);
   }
 
   /**
@@ -175,23 +270,40 @@ public class HoodieJavaStreamingApp {
    * @param inputDF2
    * @throws Exception
    */
-  public void show(SparkSession spark, FileSystem fs, Dataset<Row> inputDF1, Dataset<Row> inputDF2) throws Exception {
-    inputDF1.write().mode(SaveMode.Append).json(streamingSourcePath);
+  public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, String srcPath,
+      int initialCommits, int expRecords,
+      Dataset<Row> inputDF1, Dataset<Row> inputDF2, boolean instantTimeValidation) throws Exception {
+    inputDF1.write().mode(SaveMode.Append).json(srcPath);
+
+    int numExpCommits = initialCommits + 1;
     // wait for spark streaming to process one microbatch
-    Thread.sleep(3000);
+    waitTillNCommits(fs, numExpCommits, 180, 3);
     String commitInstantTime1 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
     LOG.info("First commit at instant time :" + commitInstantTime1);
 
-    inputDF2.write().mode(SaveMode.Append).json(streamingSourcePath);
-    // wait for spark streaming to process one microbatch
-    Thread.sleep(3000);
-    String commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
-    LOG.info("Second commit at instant time :" + commitInstantTime2);
+    String commitInstantTime2 = commitInstantTime1;
+    if (null != inputDF2) {
+      numExpCommits += 1;
+      inputDF2.write().mode(SaveMode.Append).json(srcPath);
+      // wait for spark streaming to process one microbatch
+      Thread.sleep(3000);
+      waitTillNCommits(fs, numExpCommits, 180, 3);
+      commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+      LOG.info("Second commit at instant time :" + commitInstantTime2);
+    }
+
+    if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      numExpCommits += 1;
+      // Wait for compaction to also finish and track latest timestamp as commit timestamp
+      waitTillNCommits(fs, numExpCommits, 180, 3);
+      commitInstantTime2 = HoodieDataSourceHelpers.latestCommit(fs, tablePath);
+      LOG.info("Compaction commit at instant time :" + commitInstantTime2);
+    }
 
     /**
      * Read & do some queries
      */
-    Dataset<Row> hoodieROViewDF = spark.read().format("org.apache.hudi")
+    Dataset<Row> hoodieROViewDF = spark.read().format("hudi")
         // pass any path glob, can include hoodie & non-hoodie
         // datasets
         .load(tablePath + "/*/*/*/*");
@@ -200,11 +312,24 @@ public class HoodieJavaStreamingApp {
     // all trips whose fare amount was greater than 2.
     spark.sql("select fare.amount, begin_lon, begin_lat, timestamp from hoodie_ro where fare.amount > 2.0").show();
 
+    if (instantTimeValidation) {
+      System.out.println("Showing all records. Latest Instant Time =" + commitInstantTime2);
+      spark.sql("select * from hoodie_ro").show(200, false);
+      long numRecordsAtInstant2 =
+          spark.sql("select * from hoodie_ro where _hoodie_commit_time = " + commitInstantTime2).count();
+      ValidationUtils.checkArgument(numRecordsAtInstant2 == expRecords,
+          "Expecting " + expRecords + " records, Got " + numRecordsAtInstant2);
+    }
+
+    long numRecords = spark.sql("select * from hoodie_ro").count();
+    ValidationUtils.checkArgument(numRecords == expRecords,
+        "Expecting " + expRecords + " records, Got " + numRecords);
+
     if (tableType.equals(HoodieTableType.COPY_ON_WRITE.name())) {
       /**
        * Consume incrementally, only changes in commit 2 above. Currently only supported for COPY_ON_WRITE TABLE
        */
-      Dataset<Row> hoodieIncViewDF = spark.read().format("org.apache.hudi")
+      Dataset<Row> hoodieIncViewDF = spark.read().format("hudi")
           .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL())
           // Only changes in write 2 above
           .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(), commitInstantTime1)
@@ -214,6 +339,7 @@ public class HoodieJavaStreamingApp {
       LOG.info("You will only see records from : " + commitInstantTime2);
       hoodieIncViewDF.groupBy(hoodieIncViewDF.col("_hoodie_commit_time")).count().show();
     }
+    return numExpCommits;
   }
 
   /**
@@ -222,19 +348,23 @@ public class HoodieJavaStreamingApp {
    * @param streamingInput
    * @throws Exception
    */
-  public void stream(Dataset<Row> streamingInput) throws Exception {
+  public void stream(Dataset<Row> streamingInput, String operationType, String checkpointLocation) throws Exception {
 
     DataStreamWriter<Row> writer = streamingInput.writeStream().format("org.apache.hudi")
         .option("hoodie.insert.shuffle.parallelism", "2").option("hoodie.upsert.shuffle.parallelism", "2")
+        .option(DataSourceWriteOptions.OPERATION_OPT_KEY(), operationType)
         .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), tableType)
         .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "_row_key")
         .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), "partition")
         .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "timestamp")
-        .option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", streamingCheckpointingPath)
+        .option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS_PROP, "1")
+        .option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE_KEY(), "true")
+        .option(HoodieWriteConfig.TABLE_NAME, tableName).option("checkpointLocation", checkpointLocation)
         .outputMode(OutputMode.Append());
 
     updateHiveSyncConfig(writer);
-    writer.trigger(new ProcessingTime(500)).start(tablePath).awaitTermination(streamingDurationInMs);
+    StreamingQuery query = writer.trigger(new ProcessingTime(500)).start(tablePath);
+    query.awaitTermination(streamingDurationInMs);
   }
 
   /**
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 7f26481..feeec96 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -50,7 +50,8 @@ class HoodieSparkSqlWriterSuite extends FunSuite with Matchers {
     try {
       val sqlContext = session.sqlContext
       val options = Map("path" -> "hoodie/test/path", HoodieWriteConfig.TABLE_NAME -> "hoodie_test_tbl")
-      val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options, session.emptyDataFrame))
+      val e = intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, SaveMode.ErrorIfExists, options,
+        session.emptyDataFrame))
       assert(e.getMessage.contains("spark.serializer"))
     } finally {
       session.stop()
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
index 07e8704..48582c1 100644
--- a/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
+++ b/hudi-spark/src/test/scala/org/apache/hudi/functional/TestDataSource.scala
@@ -17,12 +17,16 @@
 
 package org.apache.hudi.functional
 
+
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator
 import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.TableNotFoundException
 import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
+import org.apache.log4j.LogManager
 import org.apache.spark.sql._
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime}
@@ -39,6 +43,7 @@ import scala.concurrent.{Await, Future}
  * Basic tests on the spark datasource
  */
 class TestDataSource {
+  private val log = LogManager.getLogger(getClass)
 
   var spark: SparkSession = null
   var dataGen: HoodieTestDataGenerator = null
@@ -214,7 +219,7 @@ class TestDataSource {
     assertEquals(hoodieIncViewDF2.count(), insert2NewKeyCnt)
   }
 
-  //@Test (TODO: re-enable after fixing noisyness)
+  @Test
   def testStructuredStreaming(): Unit = {
     fs.delete(new Path(basePath), true)
     val sourcePath = basePath + "/source"
@@ -254,7 +259,7 @@ class TestDataSource {
     val f2 = Future {
       inputDF1.write.mode(SaveMode.Append).json(sourcePath)
       // wait for spark streaming to process one microbatch
-      Thread.sleep(3000)
+      val currNumCommits = waitTillAtleastNCommits(fs, destPath, 1, 120, 5);
       assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, destPath, "000"))
       val commitInstantTime1: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
       // Read RO View
@@ -264,9 +269,8 @@ class TestDataSource {
 
       inputDF2.write.mode(SaveMode.Append).json(sourcePath)
       // wait for spark streaming to process one microbatch
-      Thread.sleep(10000)
+      waitTillAtleastNCommits(fs, destPath, currNumCommits + 1, 120, 5);
       val commitInstantTime2: String = HoodieDataSourceHelpers.latestCommit(fs, destPath)
-
       assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, "000").size())
       // Read RO View
       val hoodieROViewDF2 = spark.read.format("org.apache.hudi")
@@ -299,8 +303,35 @@ class TestDataSource {
       assertEquals(1, countsPerCommit.length)
       assertEquals(commitInstantTime2, countsPerCommit(0).get(0))
     }
-
     Await.result(Future.sequence(Seq(f1, f2)), Duration.Inf)
+  }
 
+  @throws[InterruptedException]
+  private def waitTillAtleastNCommits(fs: FileSystem, tablePath: String,
+                               numCommits: Int, timeoutSecs: Int, sleepSecsAfterEachRun: Int): Int = {
+    val beginTime = System.currentTimeMillis
+    var currTime = beginTime
+    val timeoutMsecs = timeoutSecs * 1000
+    var numInstants = 0
+    var success: Boolean = false
+    while ({!success && (currTime - beginTime) < timeoutMsecs}) try {
+      val timeline = HoodieDataSourceHelpers.allCompletedCommitsCompactions(fs, tablePath)
+      log.info("Timeline :" + timeline.getInstants.toArray)
+      if (timeline.countInstants >= numCommits) {
+        numInstants = timeline.countInstants
+        success = true
+      }
+      val metaClient = new HoodieTableMetaClient(fs.getConf, tablePath, true)
+    } catch {
+      case te: TableNotFoundException =>
+        log.info("Got table not found exception. Retrying")
+    } finally {
+      Thread.sleep(sleepSecsAfterEachRun * 1000)
+      currTime = System.currentTimeMillis
+    }
+    if (!success) {
+      throw new IllegalStateException("Timed-out waiting for " + numCommits + " commits to appear in " + tablePath)
+    }
+    numInstants
   }
 }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index 05a1bbc..3bc51c9 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -18,8 +18,9 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
-import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.async.AbstractAsyncService;
+import org.apache.hudi.async.AsyncCompactService;
+import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
 import org.apache.hudi.common.config.TypedProperties;
@@ -62,15 +63,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.IntStream;
 
 /**
  * An Utility which can incrementally take the output from {@link HiveIncrementalPuller} and apply it to the target
@@ -97,6 +92,8 @@ public class HoodieDeltaStreamer implements Serializable {
 
   private final Option<BootstrapExecutor> bootstrapExecutor;
 
+  public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
+
   public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws IOException {
     this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration()),
         jssc.hadoopConfiguration(), null);
@@ -559,8 +556,8 @@ public class HoodieDeltaStreamer implements Serializable {
         boolean error = false;
         if (cfg.isAsyncCompactionEnabled()) {
           // set Scheduler Pool.
-          LOG.info("Setting Spark Pool name for delta-sync to " + SchedulerConfGenerator.DELTASYNC_POOL_NAME);
-          jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.DELTASYNC_POOL_NAME);
+          LOG.info("Setting Spark Pool name for delta-sync to " + DELTASYNC_POOL_NAME);
+          jssc.setLocalProperty("spark.scheduler.pool", DELTASYNC_POOL_NAME);
         }
         try {
           while (!isShutdownRequested()) {
@@ -661,100 +658,6 @@ public class HoodieDeltaStreamer implements Serializable {
     }
   }
 
-  /**
-   * Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
-   */
-  public static class AsyncCompactService extends AbstractAsyncService {
-
-    private static final long serialVersionUID = 1L;
-    private final int maxConcurrentCompaction;
-    private transient Compactor compactor;
-    private transient JavaSparkContext jssc;
-    private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
-    private transient ReentrantLock queueLock = new ReentrantLock();
-    private transient Condition consumed = queueLock.newCondition();
-
-    public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
-      this.jssc = jssc;
-      this.compactor = new Compactor(client, jssc);
-      this.maxConcurrentCompaction = 1;
-    }
-
-    /**
-     * Enqueues new Pending compaction.
-     */
-    public void enqueuePendingCompaction(HoodieInstant instant) {
-      pendingCompactions.add(instant);
-    }
-
-    /**
-     * Wait till outstanding pending compactions reduces to the passed in value.
-     *
-     * @param numPendingCompactions Maximum pending compactions allowed
-     * @throws InterruptedException
-     */
-    public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
-      try {
-        queueLock.lock();
-        while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) {
-          consumed.await();
-        }
-      } finally {
-        queueLock.unlock();
-      }
-    }
-
-    /**
-     * Fetch Next pending compaction if available.
-     *
-     * @return
-     * @throws InterruptedException
-     */
-    private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
-      LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
-      HoodieInstant instant = pendingCompactions.poll(60, TimeUnit.SECONDS);
-      if (instant != null) {
-        try {
-          queueLock.lock();
-          // Signal waiting thread
-          consumed.signal();
-        } finally {
-          queueLock.unlock();
-        }
-      }
-      return instant;
-    }
-
-    /**
-     * Start Compaction Service.
-     */
-    @Override
-    protected Pair<CompletableFuture, ExecutorService> startService() {
-      ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction);
-      return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
-        try {
-          // Set Compactor Pool Name for allowing users to prioritize compaction
-          LOG.info("Setting Spark Pool name for compaction to " + SchedulerConfGenerator.COMPACT_POOL_NAME);
-          jssc.setLocalProperty("spark.scheduler.pool", SchedulerConfGenerator.COMPACT_POOL_NAME);
-
-          while (!isShutdownRequested()) {
-            final HoodieInstant instant = fetchNextCompactionInstant();
-            if (null != instant) {
-              compactor.compact(instant);
-            }
-          }
-          LOG.info("Compactor shutting down properly!!");
-        } catch (InterruptedException ie) {
-          LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
-        } catch (IOException e) {
-          LOG.error("Compactor executor failed", e);
-          throw new HoodieIOException(e.getMessage(), e);
-        }
-        return true;
-      }, executor)).toArray(CompletableFuture[]::new)), executor);
-    }
-  }
-
   public DeltaSyncService getDeltaSyncService() {
     return deltaSyncService.get();
   }
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
index 54fcf68..f5f1f38 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/SchedulerConfGenerator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.utilities.deltastreamer;
 
+import org.apache.hudi.async.AsyncCompactService;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.util.Option;
 
@@ -41,8 +42,8 @@ public class SchedulerConfGenerator {
 
   private static final Logger LOG = LogManager.getLogger(SchedulerConfGenerator.class);
 
-  public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
-  public static final String COMPACT_POOL_NAME = "hoodiecompact";
+  public static final String DELTASYNC_POOL_NAME = HoodieDeltaStreamer.DELTASYNC_POOL_NAME;
+  public static final String COMPACT_POOL_NAME = AsyncCompactService.COMPACT_POOL_NAME;
   public static final String SPARK_SCHEDULER_MODE_KEY = "spark.scheduler.mode";
   public static final String SPARK_SCHEDULER_FAIR_MODE = "FAIR";
   public static final String SPARK_SCHEDULER_ALLOCATION_FILE_KEY = "spark.scheduler.allocation.file";