You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/02 04:20:49 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-721] Remove additional ack. Simplify watermark manager (remov…

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f42c5f8  [GOBBLIN-721] Remove additional ack. Simplify watermark manager (remov…
f42c5f8 is described below

commit f42c5f8e565f96f96adfc35b8c2ebed0c8c2e903
Author: Shirshanka Das <sd...@linkedin.com>
AuthorDate: Mon Apr 1 21:20:41 2019 -0700

    [GOBBLIN-721] Remove additional ack. Simplify watermark manager (remov…
    
    Closes #2588 from shirshanka/stream-fix
---
 .../writer/InstrumentedDataWriterDecorator.java    |  12 -
 .../apache/gobblin/writer/AsyncWriterManager.java  |  10 -
 .../writer/MultiWriterWatermarkManager.java        | 159 ----------
 .../gobblin/writer/WatermarkAwareWriter.java       |  24 +-
 .../writer/WatermarkAwareWriterWrapper.java        |   8 -
 .../writer/MultiWriterWatermarkManagerTest.java    | 350 ---------------------
 .../org/apache/gobblin/writer/ConsoleWriter.java   |  24 +-
 .../gobblin/writer/PartitionedDataWriter.java      |  37 ---
 .../gobblin/writer/PartitionedWriterTest.java      |  43 ---
 .../writer/ElasticsearchRestWriter.java            |  12 -
 .../gobblin/runtime/StreamModelTaskRunner.java     |   3 +-
 .../main/java/org/apache/gobblin/runtime/Task.java |  37 +--
 .../java/org/apache/gobblin/runtime/fork/Fork.java |   2 -
 .../apache/gobblin/runtime/TaskContinuousTest.java | 241 +++++++-------
 14 files changed, 134 insertions(+), 828 deletions(-)

diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
index efc349f..af49d15 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/instrumented/writer/InstrumentedDataWriterDecorator.java
@@ -143,18 +143,6 @@ public class InstrumentedDataWriterDecorator<D> extends InstrumentedDataWriterBa
   }
 
   @Override
-  public Map<String, CheckpointableWatermark> getCommittableWatermark() {
-    Preconditions.checkState(isWatermarkCapable());
-    return watermarkAwareWriter.get().getCommittableWatermark();
-  }
-
-  @Override
-  public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
-    Preconditions.checkState(isWatermarkCapable());
-    return watermarkAwareWriter.get().getUnacknowledgedWatermark();
-  }
-
-  @Override
   public ControlMessageHandler getMessageHandler() {
     return this.embeddedWriter.getMessageHandler();
   }
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
index a599753..4f690d1 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/AsyncWriterManager.java
@@ -117,16 +117,6 @@ public class AsyncWriterManager<D> implements WatermarkAwareWriter<D>, DataWrite
     return true;
   }
 
-  @Override
-  public Map<String, CheckpointableWatermark> getCommittableWatermark() {
-    throw new UnsupportedOperationException("This writer does not keep track of committed watermarks");
-  }
-
-  @Override
-  public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
-    throw new UnsupportedOperationException("This writer does not keep track of uncommitted watermarks");
-  }
-
   /**
    * A class to store attempts at writing a record
    **/
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/MultiWriterWatermarkManager.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/MultiWriterWatermarkManager.java
deleted file mode 100644
index c4993f7..0000000
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/MultiWriterWatermarkManager.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gobblin.writer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-
-import lombok.Getter;
-import lombok.ToString;
-
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
-import org.apache.gobblin.util.ExecutorsUtils;
-
-
-/**
- * Responsible for managing continuous commit of watermarks.
- * Periodically fetches watermarks from WatermarkAwareWriters and commits them to WatermarkStorage.
- * TODO: Add metrics monitoring
- */
-public class MultiWriterWatermarkManager implements WatermarkManager {
-
-
-  private final Queue<WatermarkAwareWriter> _watermarkAwareWriters;
-  private final WatermarkStorage _watermarkStorage;
-  private final long _commitIntervalMillis;
-  private final ScheduledExecutorService _watermarkCommitThreadPool;
-  private final Logger _logger;
-  private final RetrievalStatus _retrievalStatus;
-  private final CommitStatus _commitStatus;
-
-  @VisibleForTesting
-  final Runnable _watermarkCommitter = new
-
-      Runnable() {
-        @Override
-        public void run() {
-          long startTime = System.nanoTime();
-          Map<String, CheckpointableWatermark> watermarksToCommit = null;
-          try {
-            _retrievalStatus.onAttempt();
-            WatermarkTracker watermarkTracker = new MultiWriterWatermarkTracker();
-            for (WatermarkAwareWriter writer : _watermarkAwareWriters) {
-              Map<String, CheckpointableWatermark> writerWatermarks = writer.getCommittableWatermark();
-              _logger.debug("Retrieved from writer {} : watermark {} ", writer.getClass().getName(), writerWatermarks);
-              watermarkTracker.committedWatermarks(writerWatermarks);
-            }
-            watermarksToCommit = watermarkTracker.getAllCommitableWatermarks();
-            _retrievalStatus.onSuccess(watermarksToCommit);
-          }
-          catch (Exception e) {
-            _retrievalStatus.onFailure(e);
-            _logger.error("Failed to get watermark", e);
-          }
-          // Prevent multiple commits concurrently
-          synchronized (this) {
-            if (watermarksToCommit != null && !watermarksToCommit.isEmpty()) {
-              try {
-                _commitStatus.onAttempt();
-                _logger.info("Will commit watermark {}", watermarksToCommit.toString());
-                //TODO: Not checking if this watermark has already been committed successfully.
-                _watermarkStorage.commitWatermarks(watermarksToCommit.values());
-                _commitStatus.onSuccess(watermarksToCommit);
-              } catch (Exception e) {
-                _commitStatus.onFailure(e, watermarksToCommit);
-                _logger.error("Failed to write watermark", e);
-              }
-            } else {
-              _logger.info("Nothing to commit");
-            }
-          }
-          long duration = (System.nanoTime() - startTime)/1000000;
-          _logger.info("Duration of run {} milliseconds", duration);
-        }
-      };
-
-
-  public MultiWriterWatermarkManager(WatermarkStorage storage, long commitIntervalMillis, Optional<Logger> logger) {
-    Preconditions.checkArgument(storage != null, "WatermarkStorage cannot be null");
-    _watermarkAwareWriters = new ConcurrentLinkedQueue<>();
-    _watermarkStorage = storage;
-    _commitIntervalMillis = commitIntervalMillis;
-    _logger = logger.or(LoggerFactory.getLogger(MultiWriterWatermarkManager.class));
-    _watermarkCommitThreadPool = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(logger,
-        Optional.of("WatermarkManager-%d")));
-    _retrievalStatus = new RetrievalStatus();
-    _commitStatus = new CommitStatus();
-  }
-
-  public void registerWriter(WatermarkAwareWriter dataWriter) {
-      _watermarkAwareWriters.add(dataWriter);
-      _logger.info("Registered a watermark aware writer {}", dataWriter.getClass().getName());
-  }
-
-  @Override
-  public void start() {
-    _watermarkCommitThreadPool
-        .scheduleWithFixedDelay(_watermarkCommitter, 0, _commitIntervalMillis, TimeUnit.MILLISECONDS);
-  }
-
-  @Override
-  public void close()
-      throws IOException {
-    _logger.info("Watermark committer closing");
-    _watermarkCommitThreadPool.shutdown();
-    try {
-      long startTime = System.nanoTime();
-      _watermarkCommitThreadPool.awaitTermination(1000, TimeUnit.MILLISECONDS);
-      long duration = (System.nanoTime() - startTime)/ 1000000;
-      _logger.info("Duration of termination wait was {} milliseconds", duration);
-    }
-    catch (InterruptedException ie) {
-      throw new IOException("Interrupted while waiting for committer to shutdown", ie);
-    }
-    finally {
-      // final watermark commit
-      _logger.info("Watermark committer: one last commit before shutting down");
-      _watermarkCommitter.run();
-    }
-  }
-
-  @Override
-  public CommitStatus getCommitStatus() {
-    return _commitStatus;
-  }
-
-  @Override
-  public RetrievalStatus getRetrievalStatus() {
-    return _retrievalStatus;
-  }
-}
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriter.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriter.java
index 581fd0f..bec90a4 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriter.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriter.java
@@ -35,28 +35,18 @@ public interface WatermarkAwareWriter<D> extends DataWriter<D> {
    *
    * @return true if the writer can support watermark-bearing record envelopes
    */
-  boolean isWatermarkCapable();
+  default boolean isWatermarkCapable() {
+    return true;
+  }
 
   /**
    * Write a record (possibly asynchronously), ack the envelope on success.
    * @param recordEnvelope: a container for the record and the acknowledgable watermark
    * @throws IOException: if this write (or preceding write failures) have caused a fatal exception.
    */
-  void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException;
-
-  /**
-   * @return A Watermark per source that can safely be committed because all records associated with it
-   * and earlier watermarks have been committed to the destination. Return empty if no such watermark exists.
-   *
-   */
-  @Deprecated
-  Map<String, CheckpointableWatermark> getCommittableWatermark();
-
-  /**
-   *
-   * @return The lowest watermark out of all pending write requests
-   */
-  @Deprecated
-  Map<String, CheckpointableWatermark> getUnacknowledgedWatermark();
+  default void writeEnvelope(RecordEnvelope<D> recordEnvelope) throws IOException {
+    write(recordEnvelope.getRecord());
+    recordEnvelope.ack();
+  }
 
 }
diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriterWrapper.java b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriterWrapper.java
index a2edb3b..686e178 100644
--- a/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriterWrapper.java
+++ b/gobblin-core-base/src/main/java/org/apache/gobblin/writer/WatermarkAwareWriterWrapper.java
@@ -43,12 +43,4 @@ public abstract class WatermarkAwareWriterWrapper<D> extends WriterWrapper<D> im
     watermarkAwareWriter.get().writeEnvelope(recordEnvelope);
   }
 
-  public final Map<String, CheckpointableWatermark> getCommittableWatermark() {
-    return watermarkAwareWriter.get().getCommittableWatermark();
-  }
-
-  public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
-    return watermarkAwareWriter.get().getUnacknowledgedWatermark();
-  }
-
 }
diff --git a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/MultiWriterWatermarkManagerTest.java b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/MultiWriterWatermarkManagerTest.java
deleted file mode 100644
index fd59bc6..0000000
--- a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/MultiWriterWatermarkManagerTest.java
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gobblin.writer;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import com.google.common.base.Optional;
-
-import org.apache.gobblin.source.extractor.CheckpointableWatermark;
-import org.apache.gobblin.source.extractor.DefaultCheckpointableWatermark;
-import org.apache.gobblin.stream.RecordEnvelope;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.*;
-
-
-@Test
-public class MultiWriterWatermarkManagerTest {
-
-
-  @Test
-  public void testConstructor() {
-
-    WatermarkStorage watermarkStorage = null;
-    try {
-      MultiWriterWatermarkManager watermarkManager = new MultiWriterWatermarkManager(watermarkStorage, 0, Optional.<Logger>absent());
-      Assert.fail("Should have thrown an exception");
-    } catch (Exception e) {
-
-    }
-  }
-
-  /**
-   * Test that when no sources are registered, no side effects are observed
-   */
-  @Test
-  public void testNoRegisteredSource()
-      throws IOException, InterruptedException {
-
-    WatermarkStorage mockWatermarkStorage = mock(WatermarkStorage.class);
-    MultiWriterWatermarkManager watermarkManager = new MultiWriterWatermarkManager(mockWatermarkStorage, 1000, Optional.<Logger>absent());
-    try {
-      watermarkManager.start();
-    } catch (Exception e) {
-      Assert.fail("Should not throw exception", e);
-    }
-
-    Thread.sleep(2000);
-
-    watermarkManager.close();
-    verify(mockWatermarkStorage, times(0)).commitWatermarks(any(Iterable.class));
-
-    MultiWriterWatermarkManager.CommitStatus watermarkMgrStatus = watermarkManager.getCommitStatus();
-    Assert.assertTrue(watermarkMgrStatus.getLastCommittedWatermarks().isEmpty(),
-        "Last committed watermarks should be empty");
-    Assert.assertEquals(watermarkMgrStatus.getLastWatermarkCommitSuccessTimestampMillis(), 0 ,
-        "Last committed watermark timestamp should be 0");
-
-  }
-
-  /**
-   * Test that when we have commits failing to watermark storage, the manager continues to try
-   * at every interval and keeps track of the exception it is seeing.
-   */
-  @Test
-  public void testFailingWatermarkStorage()
-      throws IOException, InterruptedException {
-
-    WatermarkStorage reallyBadWatermarkStorage = mock(WatermarkStorage.class);
-    IOException exceptionToThrow = new IOException("Failed to write coz the programmer told me to");
-
-    doThrow(exceptionToThrow).when(reallyBadWatermarkStorage).commitWatermarks(any(Iterable.class));
-
-
-    long commitInterval = 1000;
-
-    MultiWriterWatermarkManager
-        watermarkManager = new MultiWriterWatermarkManager(reallyBadWatermarkStorage, commitInterval, Optional.<Logger>absent());
-
-    WatermarkAwareWriter mockWriter = mock(WatermarkAwareWriter.class);
-    CheckpointableWatermark watermark = new DefaultCheckpointableWatermark("default", new LongWatermark(0));
-    when(mockWriter.getCommittableWatermark()).thenReturn(Collections.singletonMap("default", watermark));
-    watermarkManager.registerWriter(mockWriter);
-    try {
-      watermarkManager.start();
-    } catch (Exception e) {
-      Assert.fail("Should not throw exception", e);
-    }
-
-    Thread.sleep(commitInterval * 2 + (commitInterval/2)); // sleep for 2.5 iterations
-    watermarkManager.close();
-    int expectedCalls = 3; // 2 calls from iterations, 1 additional attempt due to close
-    verify(reallyBadWatermarkStorage, atLeast(expectedCalls)).commitWatermarks(any(Iterable.class));
-    Assert.assertEquals(watermarkManager.getCommitStatus().getLastCommitException(), exceptionToThrow,
-        "Testing tracking of failed exceptions");
-
-  }
-
-
-  private WatermarkAwareWriter getFlakyWatermarkWriter(final long failEvery) {
-    WatermarkAwareWriter mockWatermarkWriter = new WatermarkAwareWriter() {
-
-      private long watermark = 0;
-
-      @Override
-      public boolean isWatermarkCapable() {
-        return true;
-      }
-
-      @Override
-      public void writeEnvelope(RecordEnvelope recordEnvelope)
-          throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Map<String, CheckpointableWatermark> getCommittableWatermark() {
-        watermark++;
-        if (watermark % failEvery == 0) {
-          throw new RuntimeException("Failed because you asked me to");
-        }
-        return Collections.singletonMap("default",
-            (CheckpointableWatermark) new DefaultCheckpointableWatermark("default", new LongWatermark(watermark)));
-      }
-
-      @Override
-      public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
-        return null;
-      }
-
-      @Override
-      public void write(Object record)
-          throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void commit()
-          throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void cleanup()
-          throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public long recordsWritten() {
-        return 0;
-      }
-
-      @Override
-      public long bytesWritten()
-          throws IOException {
-        return 0;
-      }
-
-      @Override
-      public void close()
-          throws IOException {
-
-      }
-    };
-
-    return mockWatermarkWriter;
-
-  }
-
-  /**
-   * Test that in the presence of flaky Watermark writers, we continue to log retrieval status correctly
-   */
-  @Test
-  public void testRetrievalStatus()
-      throws InterruptedException, IOException {
-
-    WatermarkStorage mockWatermarkStorage = mock(WatermarkStorage.class);
-
-    MultiWriterWatermarkManager watermarkManager = new MultiWriterWatermarkManager(mockWatermarkStorage, 1000, Optional.<Logger>absent());
-
-    watermarkManager.registerWriter(getFlakyWatermarkWriter(2));
-
-
-    try {
-      watermarkManager.start();
-    } catch (Exception e) {
-      Assert.fail("Should not throw exception", e);
-    }
-
-    Thread.sleep(2000);
-
-    watermarkManager.close();
-
-    MultiWriterWatermarkManager.RetrievalStatus retrievalStatus = watermarkManager.getRetrievalStatus();
-    Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalAttemptTimestampMillis() > 0);
-    Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalSuccessTimestampMillis() > 0);
-    Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalFailureTimestampMillis() > 0);
-    System.out.println(retrievalStatus);
-
-  }
-
-  /**
-   * Test that in the presence of intermittent commit successes and failures, we continue to make progress
-   */
-  @Test
-  public void testFlakyWatermarkStorage()
-      throws IOException, InterruptedException {
-
-    final int failEvery = 2;
-
-    WatermarkStorage mockWatermarkStorage = new WatermarkStorage() {
-      private int watermarkInstance = 0;
-      private List<CheckpointableWatermark> checkpointed = new ArrayList<>();
-      @Override
-      public void commitWatermarks(java.lang.Iterable<CheckpointableWatermark> watermarks)
-          throws IOException {
-        ++watermarkInstance;
-        if (watermarkInstance % failEvery == 0) {
-          throw new IOException("Failed to write");
-        } else {
-          checkpointed.clear();
-          for (CheckpointableWatermark watermark: watermarks) {
-            checkpointed.add(watermark);
-          }
-        }
-      }
-
-      @Override
-      public Map<String, CheckpointableWatermark> getCommittedWatermarks(
-          Class<? extends CheckpointableWatermark> watermarkClass, Iterable<String> sourcePartitions)
-          throws IOException {
-        return null;
-      }
-    };
-
-
-    WatermarkAwareWriter mockWatermarkWriter = new WatermarkAwareWriter() {
-
-      private long watermark = 0;
-
-      @Override
-      public boolean isWatermarkCapable() {
-        return true;
-      }
-
-      @Override
-      public void writeEnvelope(RecordEnvelope recordEnvelope)
-          throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public Map<String, CheckpointableWatermark> getCommittableWatermark() {
-        watermark++;
-        return Collections.singletonMap("default",
-            (CheckpointableWatermark) new DefaultCheckpointableWatermark("default", new LongWatermark(watermark)));
-      }
-
-      @Override
-      public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
-        return null;
-      }
-
-      @Override
-      public void write(Object record)
-          throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void commit()
-          throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void cleanup()
-          throws IOException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public long recordsWritten() {
-        return 0;
-      }
-
-      @Override
-      public long bytesWritten()
-          throws IOException {
-        return 0;
-      }
-
-      @Override
-      public void close()
-          throws IOException {
-
-      }
-    };
-
-    MultiWriterWatermarkManager watermarkManager = new MultiWriterWatermarkManager(mockWatermarkStorage, 1000, Optional.<Logger>absent());
-
-    watermarkManager.registerWriter(mockWatermarkWriter);
-
-
-    try {
-      watermarkManager.start();
-    } catch (Exception e) {
-      Assert.fail("Should not throw exception", e);
-    }
-
-    Thread.sleep(2000);
-
-    watermarkManager.close();
-
-    MultiWriterWatermarkManager.CommitStatus commitStatus = watermarkManager.getCommitStatus();
-    System.out.println(commitStatus);
-    MultiWriterWatermarkManager.RetrievalStatus retrievalStatus = watermarkManager.getRetrievalStatus();
-    Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalAttemptTimestampMillis() > 0);
-    Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalSuccessTimestampMillis() > 0);
-    Assert.assertTrue(retrievalStatus.getLastWatermarkRetrievalFailureTimestampMillis() == 0);
-    System.out.println(retrievalStatus);
-
-  }
-
-}
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/ConsoleWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/ConsoleWriter.java
index 858bff8..a846e0a 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/ConsoleWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/ConsoleWriter.java
@@ -78,29 +78,6 @@ public class ConsoleWriter<D> implements WatermarkAwareWriter<D> {
     log.debug("Close called");
   }
 
-  @Override
-  public boolean isWatermarkCapable() {
-    return true;
-  }
-
-
-  @Override
-  public void writeEnvelope(RecordEnvelope<D> recordEnvelope)
-      throws IOException {
-    write(recordEnvelope.getRecord());
-    recordEnvelope.ack();
-  }
-
-  @Override
-  public Map<String, CheckpointableWatermark> getCommittableWatermark() {
-    throw new UnsupportedOperationException("This writer does not keep track of committed watermarks");
-  }
-
-  @Override
-  public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
-    throw new UnsupportedOperationException("This writer does not keep track of uncommitted watermarks");
-  }
-
   /**
    * Flush console output
    */
@@ -108,5 +85,6 @@ public class ConsoleWriter<D> implements WatermarkAwareWriter<D> {
   public void flush() throws IOException {
     System.out.flush();
   }
+
 }
 
diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
index cdfc11b..3dad0ef 100644
--- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
+++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java
@@ -305,43 +305,6 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
   }
 
   @Override
-  public Map<String, CheckpointableWatermark> getCommittableWatermark() {
-    // The committable watermark from a collection of commitable and unacknowledged watermarks is the highest
-    // committable watermark that is less than the lowest unacknowledged watermark
-
-    WatermarkTracker watermarkTracker = new MultiWriterWatermarkTracker();
-    for (Map.Entry<GenericRecord, DataWriter<D>> entry : this.partitionWriters.asMap().entrySet()) {
-      if (entry.getValue() instanceof WatermarkAwareWriter) {
-        Map<String, CheckpointableWatermark> commitableWatermarks =
-            ((WatermarkAwareWriter) entry.getValue()).getCommittableWatermark();
-        if (!commitableWatermarks.isEmpty()) {
-          watermarkTracker.committedWatermarks(commitableWatermarks);
-        }
-
-        Map<String, CheckpointableWatermark> unacknowledgedWatermark =
-            ((WatermarkAwareWriter) entry.getValue()).getUnacknowledgedWatermark();
-        if (!unacknowledgedWatermark.isEmpty()) {
-          watermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
-        }
-      }
-    }
-    return watermarkTracker.getAllCommitableWatermarks(); //TODO: Change this to use List of committables instead
-  }
-
-  @Override
-  public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
-    WatermarkTracker watermarkTracker = new MultiWriterWatermarkTracker();
-    for (Map.Entry<GenericRecord, DataWriter<D>> entry : this.partitionWriters.asMap().entrySet()) {
-      Map<String, CheckpointableWatermark> unacknowledgedWatermark =
-          ((WatermarkAwareWriter) entry.getValue()).getUnacknowledgedWatermark();
-      if (!unacknowledgedWatermark.isEmpty()) {
-        watermarkTracker.unacknowledgedWatermarks(unacknowledgedWatermark);
-      }
-    }
-    return watermarkTracker.getAllUnacknowledgedWatermarks();
-  }
-
-  @Override
   public ControlMessageHandler getMessageHandler() {
     return this.controlMessageHandler;
   }
diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
index 20984ef..ddab5c2 100644
--- a/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
+++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/PartitionedWriterTest.java
@@ -140,49 +140,6 @@ public class PartitionedWriterTest {
     Assert.assertEquals(action.getType(), TestPartitionAwareWriterBuilder.Actions.COMMIT);
   }
 
-  @Test
-  public void testWatermarkComputation() throws IOException {
-    testWatermarkComputation(0L, 1L, 0L);
-    testWatermarkComputation(1L, 0L, null);
-    testWatermarkComputation(0L, 0L, null);
-    testWatermarkComputation(20L, 1L, null);
-  }
-
-  public void testWatermarkComputation(Long committed, Long unacknowledged, Long expected) throws IOException {
-    State state = new State();
-    state.setProp(ConfigurationKeys.WRITER_PARTITIONER_CLASS, TestPartitioner.class.getCanonicalName());
-
-    String defaultSource = "default";
-
-    WatermarkAwareWriter mockDataWriter = mock(WatermarkAwareWriter.class);
-    when(mockDataWriter.isWatermarkCapable()).thenReturn(true);
-    when(mockDataWriter.getCommittableWatermark()).thenReturn(Collections.singletonMap(defaultSource,
-        new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(committed))));
-    when(mockDataWriter.getUnacknowledgedWatermark()).thenReturn(Collections.singletonMap(defaultSource,
-        new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(unacknowledged))));
-
-    PartitionAwareDataWriterBuilder builder = mock(PartitionAwareDataWriterBuilder.class);
-    when(builder.validatePartitionSchema(any(Schema.class))).thenReturn(true);
-    when(builder.forPartition(any(GenericRecord.class))).thenReturn(builder);
-    when(builder.withWriterId(any(String.class))).thenReturn(builder);
-    when(builder.build()).thenReturn(mockDataWriter);
-
-    PartitionedDataWriter writer = new PartitionedDataWriter<String, String>(builder, state);
-
-    RecordEnvelope<String> recordEnvelope = new RecordEnvelope<String>("0");
-    recordEnvelope.addCallBack(
-        new AcknowledgableWatermark(new DefaultCheckpointableWatermark(defaultSource, new LongWatermark(0))));
-    writer.writeEnvelope(recordEnvelope);
-
-    Map<String, CheckpointableWatermark> watermark = writer.getCommittableWatermark();
-    System.out.println(watermark.toString());
-    if (expected == null) {
-      Assert.assertTrue(watermark.isEmpty(), "Expected watermark to be absent");
-    } else {
-      Assert.assertTrue(watermark.size() == 1);
-      Assert.assertEquals((long) expected, ((LongWatermark) watermark.values().iterator().next().getWatermark()).getValue());
-    }
-  }
 
   @Test
   public void testControlMessageHandler() throws IOException {
diff --git a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
index 7cd77da..9d99bc6 100644
--- a/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
+++ b/gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/ElasticsearchRestWriter.java
@@ -22,37 +22,25 @@ import java.io.IOException;
 import java.nio.file.Paths;
 import java.security.KeyStore;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.math3.util.Pair;
 import org.apache.gobblin.password.PasswordManager;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.Batch;
 import org.apache.gobblin.writer.BatchAsyncDataWriter;
-import org.apache.gobblin.writer.GenericWriteResponse;
 import org.apache.gobblin.writer.WriteCallback;
 import org.apache.gobblin.writer.WriteResponse;
 import org.apache.http.HttpHost;
-import org.apache.http.client.config.RequestConfig;
 import org.apache.http.conn.ssl.NoopHostnameVerifier;
 import org.apache.http.impl.nio.reactor.IOReactorConfig;
 import org.apache.http.ssl.SSLContextBuilder;
 import org.apache.http.ssl.SSLContexts;
-import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestClientBuilder;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.xcontent.XContentType;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.typesafe.config.Config;
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
index b20aeda..f490997 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/StreamModelTaskRunner.java
@@ -72,7 +72,6 @@ public class StreamModelTaskRunner {
   private final Optional<WatermarkManager> watermarkManager;
   private final Optional<WatermarkStorage> watermarkStorage;
   private final Map<Optional<Fork>, Optional<Future<?>>> forks;
-  private final String watermarkingStrategy;
 
   protected void run() throws Exception {
     long maxWaitInMinute = taskState.getPropAsLong(ConfigurationKeys.FORK_MAX_WAIT_MININUTES, ConfigurationKeys.DEFAULT_FORK_MAX_WAIT_MININUTES);
@@ -140,7 +139,7 @@ public class StreamModelTaskRunner {
         Fork fork = new Fork(this.taskContext, forkedStream.getGlobalMetadata().getSchema(), forkedStreams.getForkedStreams().size(), fidx, this.taskMode);
         fork.consumeRecordStream(forkedStream);
         this.forks.put(Optional.of(fork), Optional.of(Futures.immediateFuture(null)));
-        this.task.configureStreamingFork(fork, this.watermarkingStrategy);
+        this.task.configureStreamingFork(fork);
       }
     }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
index 8ddd1d4..db89a17 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/Task.java
@@ -87,7 +87,6 @@ import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.writer.AcknowledgableWatermark;
 import org.apache.gobblin.writer.DataWriter;
 import org.apache.gobblin.writer.FineGrainedWatermarkTracker;
-import org.apache.gobblin.writer.MultiWriterWatermarkManager;
 import org.apache.gobblin.writer.TrackerBasedWatermarkManager;
 import org.apache.gobblin.writer.WatermarkAwareWriter;
 import org.apache.gobblin.writer.WatermarkManager;
@@ -149,7 +148,6 @@ public class Task implements TaskIFace {
   private final InstrumentedExtractorBase extractor;
   private final RowLevelPolicyChecker rowChecker;
   private final ExecutionModel taskMode;
-  private final String watermarkingStrategy;
   private final Optional<WatermarkManager> watermarkManager;
   private final Optional<FineGrainedWatermarkTracker> watermarkTracker;
   private final Optional<WatermarkStorage> watermarkStorage;
@@ -235,8 +233,6 @@ public class Task implements TaskIFace {
 
     // Setup Streaming constructs
 
-    this.watermarkingStrategy = "FineGrain"; // TODO: Configure
-
     if (isStreamingTask()) {
       Extractor underlyingExtractor = this.taskContext.getRawSourceExtractor();
       if (!(underlyingExtractor instanceof StreamingExtractor)) {
@@ -259,18 +255,11 @@ public class Task implements TaskIFace {
       long commitIntervalMillis = ConfigUtils.getLong(config,
           TaskConfigurationKeys.STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS,
           TaskConfigurationKeys.DEFAULT_STREAMING_WATERMARK_COMMIT_INTERVAL_MILLIS);
-      if (watermarkingStrategy.equals("FineGrain")) { // TODO: Configure
-        this.watermarkTracker = Optional.of(this.closer.register(new FineGrainedWatermarkTracker(config)));
-        this.watermarkManager = Optional.of((WatermarkManager) this.closer.register(
-            new TrackerBasedWatermarkManager(this.watermarkStorage.get(), this.watermarkTracker.get(),
-                commitIntervalMillis, Optional.of(this.LOG))));
+      this.watermarkTracker = Optional.of(this.closer.register(new FineGrainedWatermarkTracker(config)));
+      this.watermarkManager = Optional.of((WatermarkManager) this.closer.register(
+          new TrackerBasedWatermarkManager(this.watermarkStorage.get(), this.watermarkTracker.get(),
+              commitIntervalMillis, Optional.of(this.LOG))));
 
-      } else {
-        // writer-based watermarking
-        this.watermarkManager = Optional.of((WatermarkManager) this.closer.register(
-            new MultiWriterWatermarkManager(this.watermarkStorage.get(), commitIntervalMillis, Optional.of(this.LOG))));
-        this.watermarkTracker = Optional.absent();
-      }
     } else {
       this.watermarkManager = Optional.absent();
       this.watermarkTracker = Optional.absent();
@@ -368,7 +357,7 @@ public class Task implements TaskIFace {
       } else {
         new StreamModelTaskRunner(this, this.taskState, this.closer, this.taskContext, this.extractor,
             this.converter, this.recordStreamProcessors, this.rowChecker, this.taskExecutor, this.taskMode, this.shutdownRequested,
-            this.watermarkTracker, this.watermarkManager, this.watermarkStorage, this.forks, this.watermarkingStrategy).run();
+            this.watermarkTracker, this.watermarkManager, this.watermarkStorage, this.forks).run();
       }
 
       LOG.info("Extracted " + this.recordsPulled + " data records");
@@ -434,7 +423,7 @@ public class Task implements TaskIFace {
           AsynchronousFork fork = closer.register(
               new AsynchronousFork(this.taskContext, schema instanceof Copyable ? ((Copyable) schema).copy() : schema,
                   branches, i, this.taskMode));
-          configureStreamingFork(fork, watermarkingStrategy);
+          configureStreamingFork(fork);
           // Run the Fork
           this.forks.put(Optional.<Fork>of(fork), Optional.<Future<?>>of(this.taskExecutor.submit(fork)));
         } else {
@@ -445,10 +434,11 @@ public class Task implements TaskIFace {
       SynchronousFork fork = closer.register(
           new SynchronousFork(this.taskContext, schema instanceof Copyable ? ((Copyable) schema).copy() : schema,
               branches, 0, this.taskMode));
-      configureStreamingFork(fork, watermarkingStrategy);
+      configureStreamingFork(fork);
       this.forks.put(Optional.<Fork>of(fork), Optional.<Future<?>> of(this.taskExecutor.submit(fork)));
     }
 
+    LOG.info("Task mode streaming = " + isStreamingTask());
     if (isStreamingTask()) {
 
       // Start watermark manager and tracker
@@ -531,14 +521,13 @@ public class Task implements TaskIFace {
     }
   }
 
-  protected void configureStreamingFork(Fork fork, String watermarkingStrategy) throws IOException {
+  protected void configureStreamingFork(Fork fork) throws IOException {
     if (isStreamingTask()) {
       DataWriter forkWriter = fork.getWriter();
-      if (forkWriter instanceof WatermarkAwareWriter) {
-        if (watermarkingStrategy.equals("WriterBased")) {
-          ((MultiWriterWatermarkManager) this.watermarkManager.get()).registerWriter((WatermarkAwareWriter) forkWriter);
-        }
-      } else {
+      boolean isWaterMarkAwareWriter = (forkWriter instanceof WatermarkAwareWriter)
+          && ((WatermarkAwareWriter) forkWriter).isWatermarkCapable();
+
+      if (!isWaterMarkAwareWriter) {
         String errorMessage = String.format("The Task is configured to run in continuous mode, "
             + "but the writer %s is not a WatermarkAwareWriter", forkWriter.getClass().getName());
         LOG.error(errorMessage);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
index 68f4536..7ce2bdf 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/fork/Fork.java
@@ -496,8 +496,6 @@ public class Fork<S, D> implements Closeable, FinalState, RecordStreamConsumer<S
                 recordEnvelope.withRecord(convertedRecord));
           }
         }
-        // ack this fork's processing done
-        recordEnvelope.ack();
       } else {
         buildWriterIfNotPresent();
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
index ef57684..97da4df 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/TaskContinuousTest.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.runtime;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.gobblin.runtime.util.TaskMetrics;
+import org.apache.gobblin.util.TestUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.testng.Assert;
@@ -58,8 +58,6 @@ import org.apache.gobblin.source.extractor.Extractor;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.source.extractor.StreamingExtractor;
 import org.apache.gobblin.source.extractor.extract.LongWatermark;
-import org.apache.gobblin.source.workunit.Extract;
-import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.gobblin.writer.DataWriter;
 import org.apache.gobblin.writer.WatermarkAwareWriter;
@@ -220,6 +218,15 @@ public class TaskContinuousTest {
     public void start(WatermarkStorage watermarkStorage) {
 
     }
+
+    @Override
+    public void shutdown() throws JobShutdownException {
+      try {
+        this.close();
+      } catch (Exception e) {
+        throw new JobShutdownException("Failed to close extractor during shutdown");
+      }
+    }
   }
 
 
@@ -252,90 +259,87 @@ public class TaskContinuousTest {
   public void testContinuousTaskOneRecord()
       throws Exception {
 
-    ArrayList<Object> recordCollector = new ArrayList<>(100);
+    for (Boolean taskExecutionSync: new Boolean[]{true, false}) {
 
-    String testRecord = "hello";
+      ArrayList<Object> recordCollector = new ArrayList<>(100);
 
-    OneRecordExtractor oneRecordExtractor =
-        new OneRecordExtractor(testRecord);
+      String testRecord = "hello";
 
+      OneRecordExtractor oneRecordExtractor = new OneRecordExtractor(testRecord);
 
-    TaskContext mockTaskContext =
-        getMockTaskContext(recordCollector, oneRecordExtractor);
-
-    // Create a mock TaskPublisher
-    TaskPublisher mockTaskPublisher = mock(TaskPublisher.class);
-    when(mockTaskPublisher.canPublish()).thenReturn(TaskPublisher.PublisherState.SUCCESS);
-    when(mockTaskContext.getTaskPublisher(any(TaskState.class), any(TaskLevelPolicyCheckResults.class))).thenReturn(mockTaskPublisher);
+      TaskContext mockTaskContext = getMockTaskContext(recordCollector, oneRecordExtractor, taskExecutionSync);
 
-    // Create a mock TaskStateTracker
-    TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+      // Create a mock TaskPublisher
+      TaskPublisher mockTaskPublisher = mock(TaskPublisher.class);
+      when(mockTaskPublisher.canPublish()).thenReturn(TaskPublisher.PublisherState.SUCCESS);
+      when(mockTaskContext.getTaskPublisher(any(TaskState.class), any(TaskLevelPolicyCheckResults.class))).thenReturn(mockTaskPublisher);
 
-    // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
-    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+      // Create a mock TaskStateTracker
+      TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
 
-    // Create the Task
-    Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
+      // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
+      TaskExecutor taskExecutor = new TaskExecutor(new Properties());
 
-    ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
+      // Create the Task
+      Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
 
-    taskRunner.execute(task);
+      ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
 
+      taskRunner.execute(task);
 
-    // Let the task run for 2 seconds
-    int sleepIterations = 2;
-    int currentIteration = 0;
-    WatermarkStorage mockWatermarkStorage = mockTaskContext.getWatermarkStorage();
-    Map<String, CheckpointableWatermark> externalWatermarkStorage;
-    while (currentIteration < sleepIterations) {
-      Thread.sleep(1000);
-      currentIteration++;
-      externalWatermarkStorage =
-          mockWatermarkStorage.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList
-          .of("default"));
-      if (!externalWatermarkStorage.isEmpty()) {
-        for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
-          log.info("Observed committed watermark: {}", watermark);
+      // Let the task run for 2 seconds
+      int sleepIterations = 2;
+      int currentIteration = 0;
+      WatermarkStorage mockWatermarkStorage = mockTaskContext.getWatermarkStorage();
+      Map<String, CheckpointableWatermark> externalWatermarkStorage;
+      while (currentIteration < sleepIterations) {
+        Thread.sleep(1000);
+        currentIteration++;
+        externalWatermarkStorage =
+            mockWatermarkStorage.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"));
+        if (!externalWatermarkStorage.isEmpty()) {
+          for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
+            log.info("Observed committed watermark: {}", watermark);
+          }
+          log.info("Task progress: {}", task.getProgress());
+          // Ensure that watermarks seem reasonable at each step
+          Assert.assertTrue(oneRecordExtractor.validateWatermarks(false, externalWatermarkStorage));
         }
-        log.info("Task progress: {}", task.getProgress());
-        // Ensure that watermarks seem reasonable at each step
-        Assert.assertTrue(oneRecordExtractor.validateWatermarks(false, externalWatermarkStorage));
       }
-    }
-
-    // Let's try to shutdown the task
-    task.shutdown();
-    log.info("Shutting down task now");
-    boolean success = task.awaitShutdown(3000);
-    Assert.assertTrue(success, "Task should shutdown in 3 seconds");
-    log.info("Task done waiting to shutdown {}", success);
 
-    externalWatermarkStorage =
-        mockWatermarkStorage.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList
-            .of("0"));
+      // Let's try to shutdown the task
+      task.shutdown();
+      log.info("Shutting down task now");
+      boolean success = task.awaitShutdown(3000);
+      Assert.assertTrue(success, "Task should shutdown in 3 seconds");
+      log.info("Task done waiting to shutdown {}", success);
 
-    // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
-    Assert.assertTrue(oneRecordExtractor.validateWatermarks(true, externalWatermarkStorage));
+      externalWatermarkStorage =
+          mockWatermarkStorage.getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("0"));
 
-    // Ensure that the record made it to the writer correctly
-    Assert.assertEquals(recordCollector.size(), 1);
-    Assert.assertEquals(recordCollector.get(0), testRecord);
+      // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
+      Assert.assertTrue(oneRecordExtractor.validateWatermarks(true, externalWatermarkStorage));
 
+      // Ensure that the record made it to the writer correctly
+      Assert.assertEquals(recordCollector.size(), 1);
+      Assert.assertEquals(recordCollector.get(0), testRecord);
 
-    task.commit();
+      task.commit();
+      Assert.assertTrue(mockTaskContext.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL);
 
-    // Shutdown the executor
-    taskRunner.shutdown();
-    taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+      // Shutdown the executor
+      taskRunner.shutdown();
+      taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+    }
 
 
   }
 
   private TaskContext getMockTaskContext(ArrayList<Object> recordCollector,
-      Extractor mockExtractor)
+      Extractor mockExtractor, Boolean taskExecutionSync)
       throws Exception {
 
-    TaskState taskState = getStreamingTaskState();
+    TaskState taskState = getStreamingTaskState(taskExecutionSync);
     // Create a mock RowLevelPolicyChecker
     RowLevelPolicyChecker mockRowLevelPolicyChecker =
         new RowLevelPolicyChecker(Lists.newArrayList(), "stateId", FileSystem.getLocal(new Configuration()));
@@ -364,14 +368,15 @@ public class TaskContinuousTest {
     return mockTaskContext;
   }
 
-  private TaskState getStreamingTaskState() {
-    WorkUnitState workUnitState = new WorkUnitState(WorkUnit.create(
-        new Extract(Extract.TableType.SNAPSHOT_ONLY, this.getClass().getName(), this.getClass().getSimpleName())));
+  private TaskState getStreamingTaskState(Boolean taskExecutionSync) {
+    WorkUnitState workUnitState = TestUtils.createTestWorkUnitState();
+
     workUnitState.setProp(ConfigurationKeys.TASK_KEY_KEY, "1234");
 
     TaskState taskState = new TaskState(workUnitState);
     taskState.setProp(ConfigurationKeys.METRICS_ENABLED_KEY, Boolean.toString(false));
     taskState.setProp(TaskConfigurationKeys.TASK_EXECUTION_MODE, ExecutionModel.STREAMING.name());
+    taskState.setProp(ConfigurationKeys.TASK_SYNCHRONOUS_EXECUTION_MODEL_KEY, Boolean.toString(taskExecutionSync));
     taskState.setJobId("1234");
     taskState.setTaskId("testContinuousTaskId");
     return taskState;
@@ -387,70 +392,65 @@ public class TaskContinuousTest {
   @Test
   public void testContinuousTask()
       throws Exception {
-    // Create a TaskState
-    TaskState taskState = getStreamingTaskState();
 
-    ArrayList<Object> recordCollector = new ArrayList<>(100);
-    long perRecordExtractLatencyMillis = 1000; // 1 second per record
+    for (Boolean taskExecutionSync: new Boolean[]{true, false}) {
+      ArrayList<Object> recordCollector = new ArrayList<>(100);
+      long perRecordExtractLatencyMillis = 1000; // 1 second per record
 
+      ContinuousExtractor continuousExtractor = new ContinuousExtractor(perRecordExtractLatencyMillis);
 
+      TaskContext mockTaskContext = getMockTaskContext(recordCollector, continuousExtractor, taskExecutionSync);
 
-    ContinuousExtractor continuousExtractor =
-        new ContinuousExtractor(perRecordExtractLatencyMillis);
+      // Create a mock TaskStateTracker
+      TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
 
-    TaskContext mockTaskContext = getMockTaskContext(recordCollector, continuousExtractor);
+      // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
+      TaskExecutor taskExecutor = new TaskExecutor(new Properties());
 
+      // Create the Task
+      Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
 
-    // Create a mock TaskStateTracker
-    TaskStateTracker mockTaskStateTracker = mock(TaskStateTracker.class);
+      ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
 
-    // Create a TaskExecutor - a real TaskExecutor must be created so a Fork is run in a separate thread
-    TaskExecutor taskExecutor = new TaskExecutor(new Properties());
+      taskRunner.execute(task);
 
-    // Create the Task
-    Task task = new Task(mockTaskContext, mockTaskStateTracker, taskExecutor, Optional.<CountDownLatch>absent());
+      // Let the task run for 10 seconds
+      int sleepIterations = 10;
+      int currentIteration = 0;
 
-    ScheduledExecutorService taskRunner = new ScheduledThreadPoolExecutor(1, ExecutorsUtils.newThreadFactory(Optional.of(log)));
-
-    taskRunner.execute(task);
-
-
-    // Let the task run for 10 seconds
-    int sleepIterations = 10;
-    int currentIteration = 0;
-
-    while (currentIteration < sleepIterations) {
-      Thread.sleep(1000);
-      currentIteration++;
-      Map<String, CheckpointableWatermark> externalWatermarkStorage = mockTaskContext.getWatermarkStorage()
-          .getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"));
-      if (!externalWatermarkStorage.isEmpty()) {
-        for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
-          log.info("Observed committed watermark: {}", watermark);
+      while (currentIteration < sleepIterations) {
+        Thread.sleep(1000);
+        currentIteration++;
+        Map<String, CheckpointableWatermark> externalWatermarkStorage = mockTaskContext.getWatermarkStorage()
+            .getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"));
+        if (!externalWatermarkStorage.isEmpty()) {
+          for (CheckpointableWatermark watermark : externalWatermarkStorage.values()) {
+            log.info("Observed committed watermark: {}", watermark);
+          }
+          log.info("Task progress: {}", task.getProgress());
+          // Ensure that watermarks seem reasonable at each step
+          Assert.assertTrue(continuousExtractor.validateWatermarks(false, externalWatermarkStorage));
         }
-        log.info("Task progress: {}", task.getProgress());
-        // Ensure that watermarks seem reasonable at each step
-        Assert.assertTrue(continuousExtractor.validateWatermarks(false, externalWatermarkStorage));
       }
-    }
 
-    // Let's try to shutdown the task
-    task.shutdown();
-    log.info("Shutting down task now");
-    boolean success = task.awaitShutdown(30000);
-    Assert.assertTrue(success, "Task should shutdown in 3 seconds");
-    log.info("Task done waiting to shutdown {}", success);
+      // Let's try to shutdown the task
+      task.shutdown();
+      log.info("Shutting down task now");
+      boolean success = task.awaitShutdown(30000);
+      Assert.assertTrue(success, "Task should shutdown in 3 seconds");
+      log.info("Task done waiting to shutdown {}", success);
 
-    // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
-    Assert.assertTrue(continuousExtractor.validateWatermarks(true, mockTaskContext.getWatermarkStorage()
-        .getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"))));
+      // Ensure that committed watermarks match exactly the input rows because we shutdown in an orderly manner.
+      Assert.assertTrue(continuousExtractor.validateWatermarks(true, mockTaskContext.getWatermarkStorage()
+          .getCommittedWatermarks(CheckpointableWatermark.class, ImmutableList.of("default"))));
 
-    task.commit();
-
-    // Shutdown the executor
-    taskRunner.shutdown();
-    taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+      task.commit();
 
+      Assert.assertTrue(mockTaskContext.getTaskState().getWorkingState() == WorkUnitState.WorkingState.SUCCESSFUL);
+      // Shutdown the executor
+      taskRunner.shutdown();
+      taskRunner.awaitTermination(100, TimeUnit.MILLISECONDS);
+    }
 
   }
 
@@ -491,23 +491,6 @@ public class TaskContinuousTest {
         }
 
         @Override
-        public Map<String, CheckpointableWatermark> getCommittableWatermark() {
-          CheckpointableWatermark committable = lastWatermark.get();
-          if (committable != null) {
-            Map<String, CheckpointableWatermark> singletonMap = new HashMap<>();
-            singletonMap.put(source.get(), committable);
-            return singletonMap;
-          } else {
-            return Collections.EMPTY_MAP;
-          }
-        }
-
-        @Override
-        public Map<String, CheckpointableWatermark> getUnacknowledgedWatermark() {
-          return Collections.EMPTY_MAP;
-        }
-
-        @Override
         public void commit()
             throws IOException {