You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by GitBox <gi...@apache.org> on 2018/07/09 20:49:30 UTC

[GitHub] jihoonson closed pull request #5983: [Backport] Fix the broken Appenderator contract in KafkaIndexTask

jihoonson closed pull request #5983: [Backport] Fix the broken Appenderator contract in KafkaIndexTask
URL: https://github.com/apache/incubator-druid/pull/5983
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
index d9a7fb20ee8..5d48fe19405 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -31,7 +31,6 @@
 public class KafkaIOConfig implements IOConfig
 {
   private static final boolean DEFAULT_USE_TRANSACTION = true;
-  private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
   private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
 
   private final String baseSequenceName;
@@ -39,7 +38,6 @@
   private final KafkaPartitions endPartitions;
   private final Map<String, String> consumerProperties;
   private final boolean useTransaction;
-  private final boolean pauseAfterRead;
   private final Optional<DateTime> minimumMessageTime;
   private final Optional<DateTime> maximumMessageTime;
   private final boolean skipOffsetGaps;
@@ -51,7 +49,6 @@ public KafkaIOConfig(
       @JsonProperty("endPartitions") KafkaPartitions endPartitions,
       @JsonProperty("consumerProperties") Map<String, String> consumerProperties,
       @JsonProperty("useTransaction") Boolean useTransaction,
-      @JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
       @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
       @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
       @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
@@ -62,7 +59,6 @@ public KafkaIOConfig(
     this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
     this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
     this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
-    this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
     this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
     this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
     this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS;
@@ -117,12 +113,6 @@ public boolean isUseTransaction()
     return useTransaction;
   }
 
-  @JsonProperty
-  public boolean isPauseAfterRead()
-  {
-    return pauseAfterRead;
-  }
-
   @JsonProperty
   public Optional<DateTime> getMaximumMessageTime()
   {
@@ -150,7 +140,6 @@ public String toString()
            ", endPartitions=" + endPartitions +
            ", consumerProperties=" + consumerProperties +
            ", useTransaction=" + useTransaction +
-           ", pauseAfterRead=" + pauseAfterRead +
            ", minimumMessageTime=" + minimumMessageTime +
            ", maximumMessageTime=" + maximumMessageTime +
            ", skipOffsetGaps=" + skipOffsetGaps +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 62aca366e48..16a67ec6b06 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -33,7 +33,6 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -42,8 +41,7 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.impl.InputRowParser;
@@ -68,7 +66,6 @@
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.collect.Utils;
-import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.guava.Sequence;
 import io.druid.java.util.common.parsers.ParseException;
 import io.druid.java.util.emitter.EmittingLogger;
@@ -127,23 +124,21 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -179,7 +174,7 @@
   private final AuthorizerMapper authorizerMapper;
   private final Optional<ChatHandlerProvider> chatHandlerProvider;
 
-  private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
+  private final Map<Integer, Long> endOffsets;
   private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
   private final Map<Integer, Long> maxEndOffsets = new HashMap<>();
   private final Map<Integer, Long> lastPersistedOffsets = new ConcurrentHashMap<>();
@@ -192,7 +187,6 @@
   private volatile DateTime startTime;
   private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread)
   private volatile Thread runThread = null;
-  private volatile File sequencesPersistFile = null;
   private final AtomicBoolean stopRequested = new AtomicBoolean(false);
   private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
 
@@ -232,20 +226,17 @@
   private final Object statusLock = new Object();
 
   private volatile boolean pauseRequested = false;
-  private volatile long pauseMillis = 0;
 
   // This value can be tuned in some tests
   private long pollRetryMs = 30000;
 
   private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
-  private final BlockingQueue<SequenceMetadata> publishQueue = new LinkedBlockingQueue<>();
-  private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new CopyOnWriteArrayList<>(); // to prevent concurrency visibility issue
-  private final CountDownLatch waitForPublishes = new CountDownLatch(1);
-  private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
+  private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new LinkedList<>();
+  private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new LinkedList<>();
   private final String topic;
 
   private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
-  private ListeningExecutorService publishExecService;
+  private volatile Throwable backgroundThreadException;
   private final boolean useLegacy;
 
   @JsonCreator
@@ -274,7 +265,7 @@ public KafkaIndexTask(
     this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
     this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
     this.authorizerMapper = authorizerMapper;
-    this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
+    this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndPartitions().getPartitionOffsetMap());
     this.maxEndOffsets.putAll(endOffsets.entrySet()
                                         .stream()
                                         .collect(Collectors.toMap(
@@ -343,75 +334,6 @@ public KafkaIOConfig getIOConfig()
     return ioConfig;
   }
 
-  private void createAndStartPublishExecutor()
-  {
-    publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver"));
-    publishExecService.submit(
-        (Runnable) () -> {
-          while (true) {
-            try {
-              final SequenceMetadata sequenceMetadata = publishQueue.take();
-
-              Preconditions.checkNotNull(driver);
-
-              if (sequenceMetadata.isSentinel()) {
-                waitForPublishes.countDown();
-                break;
-              }
-
-              log.info("Publishing segments for sequence [%s]", sequenceMetadata);
-
-              final SegmentsAndMetadata result = driver.publish(
-                  sequenceMetadata.getPublisher(toolbox, ioConfig.isUseTransaction()),
-                  sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(),
-                  ImmutableList.of(sequenceMetadata.getSequenceName())
-              ).get();
-
-              if (result == null) {
-                throw new ISE(
-                    "Transaction failure publishing segments for sequence [%s]",
-                    sequenceMetadata
-                );
-              } else {
-                log.info(
-                    "Published segments[%s] with metadata[%s].",
-                    Joiner.on(", ").join(
-                        result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList())
-                    ),
-                    Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata")
-                );
-              }
-
-              sequences.remove(sequenceMetadata);
-              publishingSequences.remove(sequenceMetadata.getSequenceName());
-              try {
-                persistSequences();
-              }
-              catch (IOException e) {
-                log.error(e, "Unable to persist state, dying");
-                Throwables.propagate(e);
-              }
-
-              final ListenableFuture<SegmentsAndMetadata> handOffFuture = driver.registerHandoff(result);
-              handOffWaitList.add(handOffFuture);
-            }
-            catch (Throwable t) {
-              if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException
-                                                         && t.getCause() instanceof InterruptedException))) {
-                log.warn("Stopping publish thread as we are interrupted, probably we are shutting down");
-              } else {
-                log.makeAlert(t, "Error in publish thread, dying").emit();
-                throwableAtomicReference.set(t);
-              }
-              Futures.allAsList(handOffWaitList).cancel(true);
-              waitForPublishes.countDown();
-              break;
-            }
-          }
-        }
-    );
-  }
-
   @Override
   public TaskStatus run(final TaskToolbox toolbox) throws Exception
   {
@@ -426,47 +348,40 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
     status = Status.STARTING;
     this.toolbox = toolbox;
 
-    if (getContext() != null && getContext().get("checkpoints") != null) {
-      log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints"));
-      final TreeMap<Integer, Map<Integer, Long>> checkpoints = toolbox.getObjectMapper().readValue(
-          (String) getContext().get("checkpoints"),
-          new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
-          {
-          }
-      );
-
-      Iterator<Map.Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
-      Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
-      while (sequenceOffsets.hasNext()) {
-        Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next();
+    if (!restoreSequences()) {
+      final TreeMap<Integer, Map<Integer, Long>> checkpoints = getCheckPointsFromContext(toolbox, this);
+      if (checkpoints != null) {
+        Iterator<Map.Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
+        Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
+        while (sequenceOffsets.hasNext()) {
+          Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next();
+          sequences.add(new SequenceMetadata(
+              previous.getKey(),
+              StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
+              previous.getValue(),
+              current.getValue(),
+              true
+          ));
+          previous = current;
+        }
         sequences.add(new SequenceMetadata(
             previous.getKey(),
             StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
             previous.getValue(),
-            current.getValue(),
-            true
+            maxEndOffsets,
+            false
+        ));
+      } else {
+        sequences.add(new SequenceMetadata(
+            0,
+            StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
+            ioConfig.getStartPartitions().getPartitionOffsetMap(),
+            maxEndOffsets,
+            false
         ));
-        previous = current;
       }
-      sequences.add(new SequenceMetadata(
-          previous.getKey(),
-          StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
-          previous.getValue(),
-          maxEndOffsets,
-          false
-      ));
-    } else {
-      sequences.add(new SequenceMetadata(
-          0,
-          StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
-          ioConfig.getStartPartitions().getPartitionOffsetMap(),
-          maxEndOffsets,
-          false
-      ));
-    }
 
-    sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json");
-    restoreSequences();
+    }
     log.info("Starting with sequences:  %s", sequences);
 
     if (chatHandlerProvider.isPresent()) {
@@ -504,15 +419,12 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
         )
     );
 
-    try (
-        final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
-    ) {
+    try (final KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
       toolbox.getDataSegmentServerAnnouncer().announce();
       toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
 
       appenderator = newAppenderator(fireDepartmentMetrics, toolbox);
       driver = newDriver(appenderator, toolbox, fireDepartmentMetrics);
-      createAndStartPublishExecutor();
 
       final String topic = ioConfig.getStartPartitions().getTopic();
 
@@ -602,7 +514,7 @@ public void run()
       status = Status.READING;
       try {
         while (stillReading) {
-          if (possiblyPause(assignment)) {
+          if (possiblyPause()) {
             // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
             // partitions upon resuming. This is safe even if the end offsets have not been modified.
             assignment = assignPartitionsAndSeekToNext(consumer, topic);
@@ -615,8 +527,7 @@ public void run()
           }
 
           // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
-          if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed()
-                                      && !ioConfig.isPauseAfterRead())) {
+          if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
             status = Status.PUBLISHING;
           }
 
@@ -624,11 +535,12 @@ public void run()
             break;
           }
 
-          checkAndMaybeThrowException();
-
-          if (!ioConfig.isPauseAfterRead()) {
-            maybePersistAndPublishSequences(committerSupplier);
+          if (backgroundThreadException != null) {
+            throw new RuntimeException(backgroundThreadException);
           }
+          checkPublishAndHandoffFailure();
+
+          maybePersistAndPublishSequences(committerSupplier);
 
           // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
           // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
@@ -640,19 +552,17 @@ public void run()
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
             possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
-            stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+            stillReading = !assignment.isEmpty();
           }
 
           SequenceMetadata sequenceToCheckpoint = null;
           for (ConsumerRecord<byte[], byte[]> record : records) {
-            if (log.isTraceEnabled()) {
-              log.trace(
-                  "Got topic[%s] partition[%d] offset[%,d].",
-                  record.topic(),
-                  record.partition(),
-                  record.offset()
-              );
-            }
+            log.trace(
+                "Got topic[%s] partition[%d] offset[%,d].",
+                record.topic(),
+                record.partition(),
+                record.offset()
+            );
 
             if (record.offset() < endOffsets.get(record.partition())) {
               if (record.offset() != nextOffsets.get(record.partition())) {
@@ -680,24 +590,23 @@ public void run()
                                             : parser.parseBatch(ByteBuffer.wrap(valueBytes));
                 boolean isPersistRequired = false;
 
-                for (InputRow row : rows) {
-                  if (row != null && withinMinMaxRecordTime(row)) {
-                    SequenceMetadata sequenceToUse = null;
-                    for (SequenceMetadata sequence : sequences) {
-                      if (sequence.canHandle(record)) {
-                        sequenceToUse = sequence;
-                      }
-                    }
+                final SequenceMetadata sequenceToUse = sequences
+                    .stream()
+                    .filter(sequenceMetadata -> sequenceMetadata.canHandle(record))
+                    .findFirst()
+                    .orElse(null);
 
-                    if (sequenceToUse == null) {
-                      throw new ISE(
-                          "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
-                          record.partition(),
-                          record.offset(),
-                          sequences
-                      );
-                    }
+                if (sequenceToUse == null) {
+                  throw new ISE(
+                      "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
+                      record.partition(),
+                      record.offset(),
+                      sequences
+                  );
+                }
 
+                for (InputRow row : rows) {
+                  if (row != null && withinMinMaxRecordTime(row)) {
                     final AppenderatorDriverAddResult addResult = driver.add(
                         row,
                         sequenceToUse.getSequenceName(),
@@ -751,7 +660,7 @@ public void onSuccess(@Nullable Object result)
                         public void onFailure(Throwable t)
                         {
                           log.error("Persist failed, dying");
-                          throwableAtomicReference.set(t);
+                          backgroundThreadException = t;
                         }
                       }
                   );
@@ -779,11 +688,11 @@ public void onFailure(Throwable t)
                 && assignment.remove(record.partition())) {
               log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
               assignPartitions(consumer, topic, assignment);
-              stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+              stillReading = !assignment.isEmpty();
             }
           }
 
-          if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) {
+          if (sequenceToCheckpoint != null && stillReading) {
             Preconditions.checkArgument(
                 sequences.get(sequences.size() - 1)
                          .getSequenceName()
@@ -792,7 +701,7 @@ public void onFailure(Throwable t)
                 sequenceToCheckpoint,
                 sequences
             );
-            requestPause(PAUSE_FOREVER);
+            requestPause();
             if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction(
                 getDataSource(),
                 ioConfig.getBaseSequenceName(),
@@ -804,6 +713,11 @@ public void onFailure(Throwable t)
           }
         }
       }
+      catch (Exception e) {
+        // (1) catch all exceptions while reading from kafka
+        log.error(e, "Encountered exception in run() before persisting.");
+        throw e;
+      }
       finally {
         log.info("Persisting all pending data");
         driver.persist(committerSupplier.get()); // persist pending data
@@ -824,16 +738,23 @@ public void onFailure(Throwable t)
           sequenceMetadata.updateAssignments(nextOffsets);
           publishingSequences.add(sequenceMetadata.getSequenceName());
           // persist already done in finally, so directly add to publishQueue
-          publishQueue.add(sequenceMetadata);
+          publishAndRegisterHandoff(sequenceMetadata);
         }
       }
 
-      // add Sentinel SequenceMetadata to indicate end of all sequences
-      publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata());
-      waitForPublishes.await();
-      checkAndMaybeThrowException();
+      if (backgroundThreadException != null) {
+        throw new RuntimeException(backgroundThreadException);
+      }
+
+      // Wait for publish futures to complete.
+      Futures.allAsList(publishWaitList).get();
 
-      List<SegmentsAndMetadata> handedOffList = Lists.newArrayList();
+      // Wait for handoff futures to complete.
+      // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding
+      // handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it
+      // failed to persist sequences. It might also return null if handoff failed, but was recoverable.
+      // See publishAndRegisterHandoff() for details.
+      List<SegmentsAndMetadata> handedOffList = Collections.emptyList();
       if (tuningConfig.getHandoffConditionTimeout() == 0) {
         handedOffList = Futures.allAsList(handOffWaitList).get();
       } else {
@@ -842,6 +763,8 @@ public void onFailure(Throwable t)
                                  .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
         }
         catch (TimeoutException e) {
+          // Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception
+          // here.
           log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
              .addData("TaskId", this.getId())
              .emit();
@@ -861,8 +784,14 @@ public void onFailure(Throwable t)
           );
         }
       }
+
+      appenderator.close();
     }
     catch (InterruptedException | RejectedExecutionException e) {
+      // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including
+      // the final publishing.
+      Futures.allAsList(publishWaitList).cancel(true);
+      Futures.allAsList(handOffWaitList).cancel(true);
       appenderator.closeNow();
       // handle the InterruptedException that gets wrapped in a RejectedExecutionException
       if (e instanceof RejectedExecutionException
@@ -878,14 +807,14 @@ public void onFailure(Throwable t)
 
       log.info("The task was asked to stop before completing");
     }
+    catch (Exception e) {
+      // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing.
+      Futures.allAsList(publishWaitList).cancel(true);
+      Futures.allAsList(handOffWaitList).cancel(true);
+      appenderator.closeNow();
+      throw e;
+    }
     finally {
-      if (appenderator != null) {
-        if (throwableAtomicReference.get() != null) {
-          appenderator.closeNow();
-        } else {
-          appenderator.close();
-        }
-      }
       if (driver != null) {
         driver.close();
       }
@@ -893,10 +822,6 @@ public void onFailure(Throwable t)
         chatHandlerProvider.get().unregister(getId());
       }
 
-      if (publishExecService != null) {
-        publishExecService.shutdownNow();
-      }
-
       toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
       toolbox.getDataSegmentServerAnnouncer().unannounce();
     }
@@ -947,9 +872,9 @@ private TaskStatus runLegacy(final TaskToolbox toolbox) throws Exception
     );
 
     try (
-            final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
-            final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
-            final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
+        final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
+        final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
+        final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
     ) {
       toolbox.getDataSegmentServerAnnouncer().announce();
       toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@@ -1032,7 +957,7 @@ public void run()
       status = Status.READING;
       try {
         while (stillReading) {
-          if (possiblyPause(assignment)) {
+          if (possiblyPause()) {
             // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
             // partitions upon resuming. This is safe even if the end offsets have not been modified.
             assignment = assignPartitionsAndSeekToNext(consumer, topic);
@@ -1058,7 +983,7 @@ public void run()
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
             possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
-            stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+            stillReading = !assignment.isEmpty();
           }
 
           for (ConsumerRecord<byte[], byte[]> record : records) {
@@ -1158,7 +1083,7 @@ public void run()
                 && assignment.remove(record.partition())) {
               log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
               assignPartitions(consumer, topic, assignment);
-              stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+              stillReading = !assignment.isEmpty();
             }
           }
         }
@@ -1211,32 +1136,37 @@ public void run()
           sequenceNames.values()
       ).get();
 
+      final List<String> publishedSegments = published.getSegments()
+                                                      .stream()
+                                                      .map(DataSegment::getIdentifier)
+                                                      .collect(Collectors.toList());
+      log.info(
+          "Published segments[%s] with metadata[%s].",
+          publishedSegments,
+          Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata")
+      );
+
       final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
-      final SegmentsAndMetadata handedOff;
+      SegmentsAndMetadata handedOff = null;
       if (tuningConfig.getHandoffConditionTimeout() == 0) {
         handedOff = handoffFuture.get();
       } else {
-        handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
+        try {
+          handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
+        }
+        catch (TimeoutException e) {
+          log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
+             .addData("TaskId", getId())
+             .emit();
+        }
       }
 
       if (handedOff == null) {
-        throw new ISE("Transaction failure publishing segments, aborting");
+        log.warn("Failed to handoff segments[%s]", publishedSegments);
       } else {
         log.info(
-            "Published segments[%s] with metadata[%s].",
-            Joiner.on(", ").join(
-                Iterables.transform(
-                    handedOff.getSegments(),
-                    new Function<DataSegment, String>()
-                    {
-                      @Override
-                      public String apply(DataSegment input)
-                      {
-                        return input.getIdentifier();
-                      }
-                    }
-                )
-            ),
+            "Handoff completed for segments[%s] with metadata[%s]",
+            handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()),
             Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
         );
       }
@@ -1268,13 +1198,156 @@ public String apply(DataSegment input)
     return success();
   }
 
-  private void checkAndMaybeThrowException()
+  private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException
+  {
+    // Check if any publishFuture failed.
+    final List<ListenableFuture<SegmentsAndMetadata>> publishFinished = publishWaitList
+        .stream()
+        .filter(Future::isDone)
+        .collect(Collectors.toList());
+
+    for (ListenableFuture<SegmentsAndMetadata> publishFuture : publishFinished) {
+      // If publishFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3).
+      publishFuture.get();
+    }
+
+    publishWaitList.removeAll(publishFinished);
+
+    // Check if any handoffFuture failed.
+    final List<ListenableFuture<SegmentsAndMetadata>> handoffFinished = handOffWaitList
+        .stream()
+        .filter(Future::isDone)
+        .collect(Collectors.toList());
+
+    for (ListenableFuture<SegmentsAndMetadata> handoffFuture : handoffFinished) {
+      // If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3).
+      handoffFuture.get();
+    }
+
+    handOffWaitList.removeAll(handoffFinished);
+  }
+
+  private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata)
+  {
+    log.info("Publishing segments for sequence [%s]", sequenceMetadata);
+
+    final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform(
+        driver.publish(
+            sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()),
+            sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(),
+            Collections.singletonList(sequenceMetadata.getSequenceName())
+        ),
+        (Function<SegmentsAndMetadata, SegmentsAndMetadata>) publishedSegmentsAndMetadata -> {
+          if (publishedSegmentsAndMetadata == null) {
+            throw new ISE(
+                "Transaction failure publishing segments for sequence [%s]",
+                sequenceMetadata
+            );
+          } else {
+            return publishedSegmentsAndMetadata;
+          }
+        }
+    );
+    publishWaitList.add(publishFuture);
+
+    // Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails.
+    final SettableFuture<SegmentsAndMetadata> handoffFuture = SettableFuture.create();
+    handOffWaitList.add(handoffFuture);
+
+    Futures.addCallback(
+        publishFuture,
+        new FutureCallback<SegmentsAndMetadata>()
+        {
+          @Override
+          public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata)
+          {
+            log.info(
+                "Published segments[%s] with metadata[%s].",
+                publishedSegmentsAndMetadata.getSegments()
+                                            .stream()
+                                            .map(DataSegment::getIdentifier)
+                                            .collect(Collectors.toList()),
+                Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata")
+            );
+
+            sequences.remove(sequenceMetadata);
+            publishingSequences.remove(sequenceMetadata.getSequenceName());
+            try {
+              persistSequences();
+            }
+            catch (IOException e) {
+              log.error(e, "Unable to persist state, dying");
+              handoffFuture.setException(e);
+              throw new RuntimeException(e);
+            }
+
+            Futures.transform(
+                driver.registerHandoff(publishedSegmentsAndMetadata),
+                new Function<SegmentsAndMetadata, Void>()
+                {
+                  @Nullable
+                  @Override
+                  public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata)
+                  {
+                    if (handoffSegmentsAndMetadata == null) {
+                      log.warn(
+                          "Failed to handoff segments[%s]",
+                          publishedSegmentsAndMetadata.getSegments()
+                                                      .stream()
+                                                      .map(DataSegment::getIdentifier)
+                                                      .collect(Collectors.toList())
+                      );
+                    }
+                    handoffFuture.set(handoffSegmentsAndMetadata);
+                    return null;
+                  }
+                }
+            );
+          }
+
+          @Override
+          public void onFailure(Throwable t)
+          {
+            log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata);
+            handoffFuture.setException(t);
+          }
+        }
+    );
+  }
+
+  private static File getSequencesPersistFile(TaskToolbox toolbox)
+  {
+    return new File(toolbox.getPersistDir(), "sequences.json");
+  }
+
+  private boolean restoreSequences() throws IOException
   {
-    if (throwableAtomicReference.get() != null) {
-      Throwables.propagate(throwableAtomicReference.get());
+    final File sequencesPersistFile = getSequencesPersistFile(toolbox);
+    if (sequencesPersistFile.exists()) {
+      sequences = new CopyOnWriteArrayList<>(
+          toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
+              sequencesPersistFile,
+              new TypeReference<List<SequenceMetadata>>()
+              {
+              }
+          )
+      );
+      return true;
+    } else {
+      return false;
     }
   }
 
+  private synchronized void persistSequences() throws IOException
+  {
+    log.info("Persisting Sequences Metadata [%s]", sequences);
+    toolbox.getObjectMapper().writerWithType(
+        new TypeReference<List<SequenceMetadata>>()
+        {
+        }
+    ).writeValue(getSequencesPersistFile(toolbox), sequences);
+  }
+
   private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier)
       throws InterruptedException
   {
@@ -1289,7 +1362,7 @@ private void maybePersistAndPublishSequences(Supplier<Committer> committerSuppli
               result,
               sequenceMetadata
           );
-          publishQueue.add(sequenceMetadata);
+          publishAndRegisterHandoff(sequenceMetadata);
         }
         catch (InterruptedException e) {
           log.warn("Interrupted while persisting sequence [%s]", sequenceMetadata);
@@ -1299,73 +1372,194 @@ private void maybePersistAndPublishSequences(Supplier<Committer> committerSuppli
     }
   }
 
-  private void restoreSequences() throws IOException
+  private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
   {
-    Preconditions.checkNotNull(sequencesPersistFile);
-    if (sequencesPersistFile.exists()) {
-      sequences = new CopyOnWriteArrayList<>(toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
-          sequencesPersistFile, new TypeReference<List<SequenceMetadata>>()
-          {
-          }));
+    // Initialize consumer assignment.
+    final Set<Integer> assignment = Sets.newHashSet();
+    for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
+      final long endOffset = endOffsets.get(entry.getKey());
+      if (entry.getValue() < endOffset) {
+        assignment.add(entry.getKey());
+      } else if (entry.getValue() == endOffset) {
+        log.info("Finished reading partition[%d].", entry.getKey());
+      } else {
+        throw new ISE(
+            "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
+            entry.getValue(),
+            endOffset
+        );
+      }
     }
-  }
 
-  private synchronized void persistSequences() throws IOException
-  {
-    log.info("Persisting Sequences Metadata [%s]", sequences);
-    toolbox.getObjectMapper().writerWithType(
-        new TypeReference<List<SequenceMetadata>>()
-        {
-        }
-    ).writeValue(sequencesPersistFile, sequences);
-  }
+    KafkaIndexTask.assignPartitions(consumer, topic, assignment);
 
-  @Override
-  public boolean canRestore()
-  {
-    return true;
+    // Seek to starting offsets.
+    for (final int partition : assignment) {
+      final long offset = nextOffsets.get(partition);
+      log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
+      consumer.seek(new TopicPartition(topic, partition), offset);
+    }
+
+    return assignment;
   }
 
   /**
-   * Authorizes action to be performed on this task's datasource
+   * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared.
+   * <p/>
+   * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
+   * <p/>
    *
-   * @return authorization result
+   * @return true if a pause request was handled, false otherwise
    */
-  private Access authorizationCheck(final HttpServletRequest req, Action action)
+  private boolean possiblyPause() throws InterruptedException
   {
-    ResourceAction resourceAction = new ResourceAction(
-        new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE),
-        action
-    );
+    pauseLock.lockInterruptibly();
+    try {
+      if (pauseRequested) {
+        status = Status.PAUSED;
+        hasPaused.signalAll();
 
-    Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper);
-    if (!access.isAllowed()) {
-      throw new ForbiddenException(access.toString());
-    }
+        while (pauseRequested) {
+          log.info("Pausing ingestion until resumed");
+          shouldResume.await();
+        }
 
-    return access;
-  }
+        status = Status.READING;
+        shouldResume.signalAll();
+        log.info("Ingestion loop resumed");
+        return true;
+      }
+    }
+    finally {
+      pauseLock.unlock();
+    }
 
-  @VisibleForTesting
-  Appenderator getAppenderator()
-  {
-    return appenderator;
+    return false;
   }
 
-  @Override
-  public void stopGracefully()
+  private void possiblyResetOffsetsOrWait(
+      Map<TopicPartition, Long> outOfRangePartitions,
+      KafkaConsumer<byte[], byte[]> consumer,
+      TaskToolbox taskToolbox
+  ) throws InterruptedException, IOException
   {
-    log.info("Stopping gracefully (status: [%s])", status);
-    stopRequested.set(true);
-
-    synchronized (statusLock) {
-      if (status == Status.PUBLISHING) {
-        runThread.interrupt();
-        return;
+    final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap();
+    boolean doReset = false;
+    if (tuningConfig.isResetOffsetAutomatically()) {
+      for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
+        final TopicPartition topicPartition = outOfRangePartition.getKey();
+        final long nextOffset = outOfRangePartition.getValue();
+        // seek to the beginning to get the least available offset
+        consumer.seekToBeginning(Collections.singletonList(topicPartition));
+        final long leastAvailableOffset = consumer.position(topicPartition);
+        // reset the seek
+        consumer.seek(topicPartition, nextOffset);
+        // Reset consumer offset if resetOffsetAutomatically is set to true
+        // and the current message offset in the kafka partition is more than the
+        // next message offset that we are trying to fetch
+        if (leastAvailableOffset > nextOffset) {
+          doReset = true;
+          resetPartitions.put(topicPartition, nextOffset);
+        }
       }
     }
 
-    try {
+    if (doReset) {
+      sendResetRequestAndWait(resetPartitions, taskToolbox);
+    } else {
+      log.warn("Retrying in %dms", pollRetryMs);
+      pollRetryLock.lockInterruptibly();
+      try {
+        long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs);
+        while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
+          nanos = isAwaitingRetry.awaitNanos(nanos);
+        }
+      }
+      finally {
+        pollRetryLock.unlock();
+      }
+    }
+  }
+
+  private void requestPause()
+  {
+    pauseRequested = true;
+  }
+
+  private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
+      throws IOException
+  {
+    Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
+    for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
+      partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
+    }
+    boolean result = taskToolbox.getTaskActionClient()
+                                .submit(new ResetDataSourceMetadataAction(
+                                    getDataSource(),
+                                    new KafkaDataSourceMetadata(new KafkaPartitions(
+                                        ioConfig.getStartPartitions()
+                                                .getTopic(),
+                                        partitionOffsetMap
+                                    ))
+                                ));
+
+    if (result) {
+      log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource())
+         .addData("partitions", partitionOffsetMap.keySet())
+         .emit();
+      // wait for being killed by supervisor
+      requestPause();
+    } else {
+      log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
+    }
+  }
+
+  @Override
+  public boolean canRestore()
+  {
+    return true;
+  }
+
+  /**
+   * Authorizes action to be performed on this task's datasource
+   *
+   * @return authorization result
+   */
+  private Access authorizationCheck(final HttpServletRequest req, Action action)
+  {
+    ResourceAction resourceAction = new ResourceAction(
+        new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE),
+        action
+    );
+
+    Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper);
+    if (!access.isAllowed()) {
+      throw new ForbiddenException(access.toString());
+    }
+
+    return access;
+  }
+
+  @VisibleForTesting
+  Appenderator getAppenderator()
+  {
+    return appenderator;
+  }
+
+  @Override
+  public void stopGracefully()
+  {
+    log.info("Stopping gracefully (status: [%s])", status);
+    stopRequested.set(true);
+
+    synchronized (statusLock) {
+      if (status == Status.PUBLISHING) {
+        runThread.interrupt();
+        return;
+      }
+    }
+
+    try {
       if (pauseLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
         try {
           if (pauseRequested) {
@@ -1474,25 +1668,23 @@ public Status getStatus()
   @Produces(MediaType.APPLICATION_JSON)
   public Response setEndOffsetsHTTP(
       Map<Integer, Long> offsets,
-      @QueryParam("resume") @DefaultValue("false") final boolean resume,
       @QueryParam("finish") @DefaultValue("true") final boolean finish,
       // this field is only for internal purposes, shouldn't be usually set by users
       @Context final HttpServletRequest req
   ) throws InterruptedException
   {
     authorizationCheck(req, Action.WRITE);
-    return setEndOffsets(offsets, resume, finish);
+    return setEndOffsets(offsets, finish);
   }
 
   public Response setEndOffsets(
       Map<Integer, Long> offsets,
-      final boolean resume,
       final boolean finish // this field is only for internal purposes, shouldn't be usually set by users
   ) throws InterruptedException
   {
     // for backwards compatibility, should be removed from versions greater than 0.12.x
     if (useLegacy) {
-      return setEndOffsetsLegacy(offsets, resume);
+      return setEndOffsetsLegacy(offsets);
     }
 
     if (offsets == null) {
@@ -1520,7 +1712,7 @@ public Response setEndOffsets(
             (latestSequence.getEndOffsets().equals(offsets) && finish)) {
           log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequences);
           return Response.ok(offsets).build();
-        } else if (latestSequence.isCheckpointed() && !ioConfig.isPauseAfterRead()) {
+        } else if (latestSequence.isCheckpointed()) {
           return Response.status(Response.Status.BAD_REQUEST)
                          .entity(StringUtils.format(
                              "WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]",
@@ -1553,13 +1745,12 @@ public Response setEndOffsets(
           log.info("Updating endOffsets from [%s] to [%s]", endOffsets, offsets);
           endOffsets.putAll(offsets);
         } else {
-          Preconditions.checkState(!ioConfig.isPauseAfterRead());
           // create new sequence
           final SequenceMetadata newSequence = new SequenceMetadata(
               latestSequence.getSequenceId() + 1,
               StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
               offsets,
-              maxEndOffsets,
+              endOffsets,
               false
           );
           sequences.add(newSequence);
@@ -1569,77 +1760,23 @@ public Response setEndOffsets(
       }
       catch (Exception e) {
         log.error(e, "Unable to set end offsets, dying");
-        throwableAtomicReference.set(e);
-        Throwables.propagate(e);
+        backgroundThreadException = e;
+        // should resume to immediately finish kafka index task as failed
+        resume();
+        return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                       .entity(Throwables.getStackTraceAsString(e))
+                       .build();
       }
       finally {
         pauseLock.unlock();
       }
     }
 
-    if (resume) {
-      resume();
-    }
+    resume();
 
     return Response.ok(offsets).build();
   }
 
-  private Response setEndOffsetsLegacy(
-      Map<Integer, Long> offsets,
-      final boolean resume
-  ) throws InterruptedException
-  {
-    if (offsets == null) {
-      return Response.status(Response.Status.BAD_REQUEST)
-                     .entity("Request body must contain a map of { partition:endOffset }")
-                     .build();
-    } else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
-      return Response.status(Response.Status.BAD_REQUEST)
-                     .entity(
-                         StringUtils.format(
-                             "Request contains partitions not being handled by this task, my partitions: %s",
-                             endOffsets.keySet()
-                         )
-                     )
-                     .build();
-    }
-
-    pauseLock.lockInterruptibly();
-    try {
-      if (!isPaused()) {
-        return Response.status(Response.Status.BAD_REQUEST)
-                       .entity("Task must be paused before changing the end offsets")
-                       .build();
-      }
-
-      for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
-        if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
-          return Response.status(Response.Status.BAD_REQUEST)
-                         .entity(
-                             StringUtils.format(
-                                 "End offset must be >= current offset for partition [%s] (current: %s)",
-                                 entry.getKey(),
-                                 nextOffsets.get(entry.getKey())
-                             )
-                         )
-                         .build();
-        }
-      }
-
-      endOffsets.putAll(offsets);
-      log.info("endOffsets changed to %s", endOffsets);
-    }
-    finally {
-      pauseLock.unlock();
-    }
-
-    if (resume) {
-      resume();
-    }
-
-    return Response.ok(endOffsets).build();
-  }
-
   @GET
   @Path("/checkpoints")
   @Produces(MediaType.APPLICATION_JSON)
@@ -1649,7 +1786,7 @@ private Response setEndOffsetsLegacy(
     return getCheckpoints();
   }
 
-  public Map<Integer, Map<Integer, Long>> getCheckpoints()
+  private Map<Integer, Map<Integer, Long>> getCheckpoints()
   {
     TreeMap<Integer, Map<Integer, Long>> result = new TreeMap<>();
     result.putAll(
@@ -1661,8 +1798,6 @@ private Response setEndOffsetsLegacy(
   /**
    * Signals the ingestion loop to pause.
    *
-   * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely
-   *
    * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the
    * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets
    * in the response body if the task successfully paused
@@ -1671,15 +1806,14 @@ private Response setEndOffsetsLegacy(
   @Path("/pause")
   @Produces(MediaType.APPLICATION_JSON)
   public Response pauseHTTP(
-      @QueryParam("timeout") @DefaultValue("0") final long timeout,
       @Context final HttpServletRequest req
   ) throws InterruptedException
   {
     authorizationCheck(req, Action.WRITE);
-    return pause(timeout);
+    return pause();
   }
 
-  public Response pause(final long timeout) throws InterruptedException
+  public Response pause() throws InterruptedException
   {
     if (!(status == Status.PAUSED || status == Status.READING)) {
       return Response.status(Response.Status.BAD_REQUEST)
@@ -1689,7 +1823,6 @@ public Response pause(final long timeout) throws InterruptedException
 
     pauseLock.lockInterruptibly();
     try {
-      pauseMillis = timeout <= 0 ? PAUSE_FOREVER : timeout;
       pauseRequested = true;
 
       pollRetryLock.lockInterruptibly();
@@ -1764,6 +1897,79 @@ public DateTime getStartTime(@Context final HttpServletRequest req)
     return startTime;
   }
 
+  @Nullable
+  private static TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
+      TaskToolbox toolbox,
+      KafkaIndexTask task
+  ) throws IOException
+  {
+    final String checkpointsString = task.getContextValue("checkpoints");
+    if (checkpointsString != null) {
+      log.info("Checkpoints [%s]", checkpointsString);
+      return toolbox.getObjectMapper().readValue(
+          checkpointsString,
+          new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
+          {
+          }
+      );
+    } else {
+      return null;
+    }
+  }
+
+  private Response setEndOffsetsLegacy(
+      Map<Integer, Long> offsets
+  ) throws InterruptedException
+  {
+    if (offsets == null) {
+      return Response.status(Response.Status.BAD_REQUEST)
+                     .entity("Request body must contain a map of { partition:endOffset }")
+                     .build();
+    } else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
+      return Response.status(Response.Status.BAD_REQUEST)
+                     .entity(
+                         StringUtils.format(
+                             "Request contains partitions not being handled by this task, my partitions: %s",
+                             endOffsets.keySet()
+                         )
+                     )
+                     .build();
+    }
+
+    pauseLock.lockInterruptibly();
+    try {
+      if (!isPaused()) {
+        return Response.status(Response.Status.BAD_REQUEST)
+                       .entity("Task must be paused before changing the end offsets")
+                       .build();
+      }
+
+      for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
+        if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
+          return Response.status(Response.Status.BAD_REQUEST)
+                         .entity(
+                             StringUtils.format(
+                                 "End offset must be >= current offset for partition [%s] (current: %s)",
+                                 entry.getKey(),
+                                 nextOffsets.get(entry.getKey())
+                             )
+                         )
+                         .build();
+        }
+      }
+
+      endOffsets.putAll(offsets);
+      log.info("endOffsets changed to %s", endOffsets);
+    }
+    finally {
+      pauseLock.unlock();
+    }
+
+    resume();
+
+    return Response.ok(endOffsets).build();
+  }
+
   @VisibleForTesting
   FireDepartmentMetrics getFireDepartmentMetrics()
   {
@@ -1775,12 +1981,6 @@ private boolean isPaused()
     return status == Status.PAUSED;
   }
 
-  private void requestPause(long pauseMillis)
-  {
-    this.pauseMillis = pauseMillis;
-    pauseRequested = true;
-  }
-
   private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
   {
     return Appenderators.createRealtime(
@@ -1854,169 +2054,6 @@ private static void assignPartitions(
     );
   }
 
-  private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
-  {
-    // Initialize consumer assignment.
-    final Set<Integer> assignment = Sets.newHashSet();
-    for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
-      final long endOffset = endOffsets.get(entry.getKey());
-      if (entry.getValue() < endOffset) {
-        assignment.add(entry.getKey());
-      } else if (entry.getValue() == endOffset) {
-        log.info("Finished reading partition[%d].", entry.getKey());
-      } else {
-        throw new ISE(
-            "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
-            entry.getValue(),
-            endOffset
-        );
-      }
-    }
-
-    assignPartitions(consumer, topic, assignment);
-
-    // Seek to starting offsets.
-    for (final int partition : assignment) {
-      final long offset = nextOffsets.get(partition);
-      log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
-      consumer.seek(new TopicPartition(topic, partition), offset);
-    }
-
-    return assignment;
-  }
-
-  /**
-   * Checks if the pauseRequested flag was set and if so blocks:
-   * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared
-   * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared
-   * <p/>
-   * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the
-   * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume
-   * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal
-   * shouldResume after adjusting pauseMillis for the new value to take effect.
-   * <p/>
-   * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
-   * <p/>
-   * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set.
-   *
-   * @return true if a pause request was handled, false otherwise
-   */
-  private boolean possiblyPause(Set<Integer> assignment) throws InterruptedException
-  {
-    pauseLock.lockInterruptibly();
-    try {
-      if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) {
-        pauseMillis = PAUSE_FOREVER;
-        pauseRequested = true;
-      }
-
-      if (pauseRequested) {
-        status = Status.PAUSED;
-        long nanos = 0;
-        hasPaused.signalAll();
-
-        while (pauseRequested) {
-          if (pauseMillis == PAUSE_FOREVER) {
-            log.info("Pausing ingestion until resumed");
-            shouldResume.await();
-          } else {
-            if (pauseMillis > 0) {
-              log.info("Pausing ingestion for [%,d] ms", pauseMillis);
-              nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis);
-              pauseMillis = 0;
-            }
-            if (nanos <= 0L) {
-              pauseRequested = false; // timeout elapsed
-            }
-            nanos = shouldResume.awaitNanos(nanos);
-          }
-        }
-
-        status = Status.READING;
-        shouldResume.signalAll();
-        log.info("Ingestion loop resumed");
-        return true;
-      }
-    }
-    finally {
-      pauseLock.unlock();
-    }
-
-    return false;
-  }
-
-  private void possiblyResetOffsetsOrWait(
-      Map<TopicPartition, Long> outOfRangePartitions,
-      KafkaConsumer<byte[], byte[]> consumer,
-      TaskToolbox taskToolbox
-  ) throws InterruptedException, IOException
-  {
-    final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap();
-    boolean doReset = false;
-    if (tuningConfig.isResetOffsetAutomatically()) {
-      for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
-        final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
-        consumer.seekToBeginning(Collections.singletonList(topicPartition));
-        final long leastAvailableOffset = consumer.position(topicPartition);
-        // reset the seek
-        consumer.seek(topicPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
-        }
-      }
-    }
-
-    if (doReset) {
-      sendResetRequestAndWait(resetPartitions, taskToolbox);
-    } else {
-      log.warn("Retrying in %dms", pollRetryMs);
-      pollRetryLock.lockInterruptibly();
-      try {
-        long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs);
-        while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
-          nanos = isAwaitingRetry.awaitNanos(nanos);
-        }
-      }
-      finally {
-        pollRetryLock.unlock();
-      }
-    }
-  }
-
-  private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
-      throws IOException
-  {
-    Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
-    for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
-      partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
-    }
-    boolean result = taskToolbox.getTaskActionClient()
-                                .submit(new ResetDataSourceMetadataAction(
-                                    getDataSource(),
-                                    new KafkaDataSourceMetadata(new KafkaPartitions(
-                                        ioConfig.getStartPartitions()
-                                                .getTopic(),
-                                        partitionOffsetMap
-                                    ))
-                                ));
-
-    if (result) {
-      log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource())
-         .addData("partitions", partitionOffsetMap.keySet())
-         .emit();
-      // wait for being killed by supervisor
-      requestPause(PAUSE_FOREVER);
-    } else {
-      log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
-    }
-  }
-
   private boolean withinMinMaxRecordTime(final InputRow row)
   {
     final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent()
@@ -2142,7 +2179,7 @@ public boolean isSentinel()
       return sentinel;
     }
 
-    public void setEndOffsets(Map<Integer, Long> newEndOffsets)
+    void setEndOffsets(Map<Integer, Long> newEndOffsets)
     {
       lock.lock();
       try {
@@ -2154,15 +2191,14 @@ public void setEndOffsets(Map<Integer, Long> newEndOffsets)
       }
     }
 
-    public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
+    void updateAssignments(Map<Integer, Long> nextPartitionOffset)
     {
       lock.lock();
       try {
         assignments.clear();
-        nextPartitionOffset.entrySet().forEach(partitionOffset -> {
-          if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
-              > 0) {
-            assignments.add(partitionOffset.getKey());
+        nextPartitionOffset.forEach((key, value) -> {
+          if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) {
+            assignments.add(key);
           }
         });
       }
@@ -2171,7 +2207,7 @@ public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
       }
     }
 
-    public boolean isOpen()
+    boolean isOpen()
     {
       return !assignments.isEmpty();
     }
@@ -2180,32 +2216,17 @@ boolean canHandle(ConsumerRecord<byte[], byte[]> record)
     {
       lock.lock();
       try {
+        final Long partitionEndOffset = endOffsets.get(record.partition());
         return isOpen()
-               && endOffsets.get(record.partition()) != null
+               && partitionEndOffset != null
                && record.offset() >= startOffsets.get(record.partition())
-               && record.offset() < endOffsets.get(record.partition());
+               && record.offset() < partitionEndOffset;
       }
       finally {
         lock.unlock();
       }
     }
 
-    private SequenceMetadata()
-    {
-      this.sequenceId = -1;
-      this.sequenceName = null;
-      this.startOffsets = null;
-      this.endOffsets = null;
-      this.assignments = null;
-      this.checkpointed = true;
-      this.sentinel = true;
-    }
-
-    public static SequenceMetadata getSentinelSequenceMetadata()
-    {
-      return new SequenceMetadata();
-    }
-
     @Override
     public String toString()
     {
@@ -2226,8 +2247,7 @@ public String toString()
       }
     }
 
-
-    public Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets)
+    Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets)
     {
       // Set up committer.
       return () ->
@@ -2280,7 +2300,7 @@ public void run()
           };
     }
 
-    public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction)
+    TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction)
     {
       return (segments, commitMetadata) -> {
         final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
index 6525d127631..1c209805505 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
@@ -29,22 +29,22 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
-import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.indexer.TaskLocation;
 import io.druid.indexing.common.RetryPolicy;
 import io.druid.indexing.common.RetryPolicyConfig;
 import io.druid.indexing.common.RetryPolicyFactory;
 import io.druid.indexing.common.TaskInfoProvider;
-import io.druid.indexer.TaskLocation;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.java.util.common.IAE;
 import io.druid.java.util.common.IOE;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
 import io.druid.segment.realtime.firehose.ChatHandlerResource;
 import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -166,19 +166,14 @@ public boolean resume(final String id)
 
   public Map<Integer, Long> pause(final String id)
   {
-    return pause(id, 0);
-  }
-
-  public Map<Integer, Long> pause(final String id, final long timeout)
-  {
-    log.debug("Pause task[%s] timeout[%d]", id, timeout);
+    log.debug("Pause task[%s]", id);
 
     try {
       final FullResponseHolder response = submitRequest(
           id,
           HttpMethod.POST,
           "pause",
-          timeout > 0 ? StringUtils.format("timeout=%d", timeout) : null,
+          null,
           true
       );
 
@@ -320,18 +315,17 @@ public DateTime getStartTime(final String id)
   public boolean setEndOffsets(
       final String id,
       final Map<Integer, Long> endOffsets,
-      final boolean resume,
       final boolean finalize
   )
   {
-    log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize);
+    log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize);
 
     try {
       final FullResponseHolder response = submitRequest(
           id,
           HttpMethod.POST,
           "offsets/end",
-          StringUtils.format("resume=%s&finish=%s", resume, finalize),
+          StringUtils.format("finish=%s", finalize),
           jsonMapper.writeValueAsBytes(endOffsets),
           true
       );
@@ -374,11 +368,6 @@ public Boolean call() throws Exception
   }
 
   public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id)
-  {
-    return pauseAsync(id, 0);
-  }
-
-  public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id, final long timeout)
   {
     return executorService.submit(
         new Callable<Map<Integer, Long>>()
@@ -386,7 +375,7 @@ public Boolean call() throws Exception
           @Override
           public Map<Integer, Long> call() throws Exception
           {
-            return pause(id, timeout);
+            return pause(id);
           }
         }
     );
@@ -449,7 +438,7 @@ public DateTime call() throws Exception
   }
 
   public ListenableFuture<Boolean> setEndOffsetsAsync(
-      final String id, final Map<Integer, Long> endOffsets, final boolean resume, final boolean finalize
+      final String id, final Map<Integer, Long> endOffsets, final boolean finalize
   )
   {
     return executorService.submit(
@@ -458,7 +447,7 @@ public DateTime call() throws Exception
           @Override
           public Boolean call() throws Exception
           {
-            return setEndOffsets(id, endOffsets, resume, finalize);
+            return setEndOffsets(id, endOffsets, finalize);
           }
         }
     );
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f79b297d064..454deffe886 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -1517,7 +1517,7 @@ private void checkTaskDuration() throws InterruptedException, ExecutionException
 
               log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
               for (final String taskId : setEndOffsetTaskIds) {
-                setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize));
+                setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
               }
 
               List<Boolean> results = Futures.successfulAsList(setEndOffsetFutures)
@@ -1764,7 +1764,6 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
         consumerProperties,
         true,
-        false,
         minimumMessageTime,
         maximumMessageTime,
         ioConfig.isSkipOffsetGaps()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
index 49a9b90033d..22a792f7e6f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -71,8 +71,7 @@ public void testSerdeWithDefaults() throws Exception
     Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
     Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
-    Assert.assertEquals(true, config.isUseTransaction());
-    Assert.assertEquals(false, config.isPauseAfterRead());
+    Assert.assertTrue(config.isUseTransaction());
     Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
     Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent());
     Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
@@ -88,7 +87,6 @@ public void testSerdeWithNonDefaults() throws Exception
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n"
                      + "  \"skipOffsetGaps\": true\n"
@@ -109,8 +107,7 @@ public void testSerdeWithNonDefaults() throws Exception
     Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
     Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
-    Assert.assertEquals(false, config.isUseTransaction());
-    Assert.assertEquals(true, config.isPauseAfterRead());
+    Assert.assertFalse(config.isUseTransaction());
     Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get());
     Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get());
     Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
@@ -125,7 +122,6 @@ public void testBaseSequenceNameRequired() throws Exception
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -145,7 +141,6 @@ public void testStartPartitionsRequired() throws Exception
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -165,7 +160,6 @@ public void testEndPartitionsRequired() throws Exception
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -185,7 +179,6 @@ public void testConsumerPropertiesRequired() throws Exception
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -206,7 +199,6 @@ public void testStartAndEndTopicMatch() throws Exception
                      + "  \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -227,7 +219,6 @@ public void testStartAndEndPartitionSetMatch() throws Exception
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -248,7 +239,6 @@ public void testEndOffsetGreaterThanStart() throws Exception
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
index 2834cc838a8..b63563b8eda 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
@@ -27,17 +27,17 @@
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
-import io.druid.java.util.http.client.response.FullResponseHolder;
-import io.druid.indexing.common.TaskInfoProvider;
 import io.druid.indexer.TaskLocation;
+import io.druid.indexing.common.TaskInfoProvider;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.IAE;
 import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMockSupport;
@@ -56,6 +56,7 @@
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -139,13 +140,13 @@ public void testNoTaskLocation() throws Exception
     Assert.assertEquals(false, client.stop(TEST_ID, true));
     Assert.assertEquals(false, client.resume(TEST_ID));
     Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
-    Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID, 10));
+    Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
     Assert.assertEquals(KafkaIndexTask.Status.NOT_STARTED, client.getStatus(TEST_ID));
     Assert.assertEquals(null, client.getStartTime(TEST_ID));
     Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true));
     Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID));
-    Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), false, true));
-    Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), true, true));
+    Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true));
+    Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true));
 
     verifyAll();
   }
@@ -431,33 +432,6 @@ public void testPause() throws Exception
     Assert.assertEquals(10, (long) results.get(1));
   }
 
-  @Test
-  public void testPauseWithTimeout() throws Exception
-  {
-    Capture<Request> captured = Capture.newInstance();
-    expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
-    expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
-    expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
-        Futures.immediateFuture(responseHolder)
-    );
-    replayAll();
-
-    Map<Integer, Long> results = client.pause(TEST_ID, 101);
-    verifyAll();
-
-    Request request = captured.getValue();
-    Assert.assertEquals(HttpMethod.POST, request.getMethod());
-    Assert.assertEquals(
-        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/pause?timeout=101"),
-        request.getUrl()
-    );
-    Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
-
-    Assert.assertEquals(2, results.size());
-    Assert.assertEquals(1, (long) results.get(0));
-    Assert.assertEquals(10, (long) results.get(1));
-  }
-
   @Test
   public void testPauseWithSubsequentGetOffsets() throws Exception
   {
@@ -544,13 +518,13 @@ public void testSetEndOffsets() throws Exception
     );
     replayAll();
 
-    client.setEndOffsets(TEST_ID, endOffsets, false, true);
+    client.setEndOffsets(TEST_ID, endOffsets, true);
     verifyAll();
 
     Request request = captured.getValue();
     Assert.assertEquals(HttpMethod.POST, request.getMethod());
     Assert.assertEquals(
-        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"),
+        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"),
         request.getUrl()
     );
     Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@@ -569,13 +543,13 @@ public void testSetEndOffsetsAndResume() throws Exception
     );
     replayAll();
 
-    client.setEndOffsets(TEST_ID, endOffsets, true, true);
+    client.setEndOffsets(TEST_ID, endOffsets, true);
     verifyAll();
 
     Request request = captured.getValue();
     Assert.assertEquals(HttpMethod.POST, request.getMethod());
     Assert.assertEquals(
-        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"),
+        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"),
         request.getUrl()
     );
     Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@@ -723,39 +697,6 @@ public void testPauseAsync() throws Exception
     }
   }
 
-  @Test
-  public void testPauseAsyncWithTimeout() throws Exception
-  {
-    final int numRequests = TEST_IDS.size();
-    Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
-    expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
-    expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
-    expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
-        Futures.immediateFuture(responseHolder)
-    ).times(numRequests);
-    replayAll();
-
-    List<URL> expectedUrls = Lists.newArrayList();
-    List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
-    for (String testId : TEST_IDS) {
-      expectedUrls.add(new URL(StringUtils.format(URL_FORMATTER, TEST_HOST, TEST_PORT, testId, "pause?timeout=9")));
-      futures.add(client.pauseAsync(testId, 9));
-    }
-
-    List<Map<Integer, Long>> responses = Futures.allAsList(futures).get();
-
-    verifyAll();
-    List<Request> requests = captured.getValues();
-
-    Assert.assertEquals(numRequests, requests.size());
-    Assert.assertEquals(numRequests, responses.size());
-    for (int i = 0; i < numRequests; i++) {
-      Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
-      Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
-      Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i));
-    }
-  }
-
   @Test
   public void testGetStatusAsync() throws Exception
   {
@@ -909,9 +850,9 @@ public void testSetEndOffsetsAsync() throws Exception
           TEST_HOST,
           TEST_PORT,
           testId,
-          StringUtils.format("offsets/end?resume=%s&finish=%s", false, true)
+          StringUtils.format("offsets/end?finish=%s", true)
       )));
-      futures.add(client.setEndOffsetsAsync(testId, endOffsets, false, true));
+      futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
     }
 
     List<Boolean> responses = Futures.allAsList(futures).get();
@@ -950,11 +891,11 @@ public void testSetEndOffsetsAsyncWithResume() throws Exception
                   TEST_HOST,
                   TEST_PORT,
                   testId,
-                  "offsets/end?resume=true&finish=true"
+                  "offsets/end?finish=true"
               )
           )
       );
-      futures.add(client.setEndOffsetsAsync(testId, endOffsets, true, true));
+      futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
     }
 
     List<Boolean> responses = Futures.allAsList(futures).get();
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 45f3003638f..4232b582217 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -38,10 +38,6 @@
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.core.NoopEmitter;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.client.cache.CacheConfig;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.impl.DimensionsSpec;
@@ -85,6 +81,10 @@
 import io.druid.java.util.common.logger.Logger;
 import io.druid.java.util.common.parsers.JSONPathFieldSpec;
 import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.core.NoopEmitter;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.math.expr.ExprMacroTable;
 import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import io.druid.metadata.EntryExistsException;
@@ -350,7 +350,6 @@ public void testRunAfterDataInserted() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -392,7 +391,6 @@ public void testRunBeforeDataInserted() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -468,7 +466,6 @@ public void testIncrementalHandOff() throws Exception
             endPartitions,
             consumerProps,
             true,
-            false,
             null,
             null,
             false
@@ -481,7 +478,7 @@ public void testIncrementalHandOff() throws Exception
     final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets());
     Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap()
                                                                                                .equals(currentOffsets));
-    task.setEndOffsets(currentOffsets, true, false);
+    task.setEndOffsets(currentOffsets, false);
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     Assert.assertEquals(1, checkpointRequestsHash.size());
@@ -536,7 +533,6 @@ public void testRunWithMinimumMessageTime() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             DateTimes.of("2010"),
             null,
             false
@@ -590,7 +586,6 @@ public void testRunWithMaximumMessageTime() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             DateTimes.of("2010"),
             false
@@ -654,7 +649,6 @@ public void testRunWithTransformSpec() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -714,7 +708,6 @@ public void testRunOnNothing() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -755,7 +748,6 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -807,7 +799,6 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -817,10 +808,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio
     final ListenableFuture<TaskStatus> future = runTask(task);
 
     // Wait for task to exit
-    Assert.assertEquals(
-        isIncrementalHandoffSupported ? TaskState.SUCCESS : TaskState.FAILED,
-        future.get().getStatusCode()
-    );
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     // Check metrics
     Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
@@ -861,7 +849,6 @@ public void testReportParseExceptions() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -894,7 +881,6 @@ public void testRunReplicas() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -908,7 +894,6 @@ public void testRunReplicas() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -962,7 +947,6 @@ public void testRunConflicting() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -976,7 +960,6 @@ public void testRunConflicting() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1031,7 +1014,6 @@ public void testRunConflictingWithoutTransactions() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             false,
-            false,
             null,
             null,
             false
@@ -1045,7 +1027,6 @@ public void testRunConflictingWithoutTransactions() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
             kafkaServer.consumerProperties(),
             false,
-            false,
             null,
             null,
             false
@@ -1105,7 +1086,6 @@ public void testRunOneTaskTwoPartitions() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1170,7 +1150,6 @@ public void testRunTwoTasksTwoPartitions() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1184,7 +1163,6 @@ public void testRunTwoTasksTwoPartitions() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1240,7 +1218,6 @@ public void testRestore() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1277,7 +1254,6 @@ public void testRestore() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1329,7 +1305,6 @@ public void testRunWithPauseAndResume() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1354,7 +1329,7 @@ public void testRunWithPauseAndResume() throws Exception
     Assert.assertEquals(KafkaIndexTask.Status.READING, task.getStatus());
 
     Map<Integer, Long> currentOffsets = objectMapper.readValue(
-        task.pause(0).getEntity().toString(),
+        task.pause().getEntity().toString(),
         new TypeReference<Map<Integer, Long>>()
         {
         }
@@ -1402,93 +1377,6 @@ public void testRunWithPauseAndResume() throws Exception
     Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
   }
 
-  @Test(timeout = 60_000L)
-  public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception
-  {
-    final KafkaIndexTask task = createTask(
-        null,
-        new KafkaIOConfig(
-            "sequence0",
-            new KafkaPartitions(topic, ImmutableMap.of(0, 1L)),
-            new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
-            kafkaServer.consumerProperties(),
-            true,
-            true,
-            null,
-            null,
-            false
-        )
-    );
-
-    final ListenableFuture<TaskStatus> future = runTask(task);
-
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
-
-    while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
-      Thread.sleep(25);
-    }
-
-    // reached the end of the assigned offsets and paused instead of publishing
-    Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets());
-    Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
-    Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets());
-    Map<Integer, Long> newEndOffsets = ImmutableMap.of(0, 4L);
-    task.setEndOffsets(newEndOffsets, false, true);
-    Assert.assertEquals(newEndOffsets, task.getEndOffsets());
-    Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-    task.resume();
-
-    while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
-      Thread.sleep(25);
-    }
-
-    // reached the end of the updated offsets and paused
-    Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
-    Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
-    // try again but with resume flag == true
-    newEndOffsets = ImmutableMap.of(0, 7L);
-    task.setEndOffsets(newEndOffsets, true, true);
-    Assert.assertEquals(newEndOffsets, task.getEndOffsets());
-    Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
-    while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
-      Thread.sleep(25);
-    }
-
-    Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
-    Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
-    task.resume();
-
-    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-
-    // Check metrics
-    Assert.assertEquals(4, task.getFireDepartmentMetrics().processed());
-    Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable());
-    Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway());
-
-    // Check published metadata
-    SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
-    SegmentDescriptor desc2 = SD(task, "2010/P1D", 0);
-    SegmentDescriptor desc3 = SD(task, "2011/P1D", 0);
-    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
-    Assert.assertEquals(
-        new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))),
-        metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
-    );
-
-    // Check segments in deep storage
-    Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1));
-    Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2));
-    Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3));
-  }
-
   @Test(timeout = 30_000L)
   public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
   {
@@ -1500,7 +1388,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1513,7 +1400,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
       Thread.sleep(2000);
     }
 
-    task.pause(0);
+    task.pause();
 
     while (!task.getStatus().equals(KafkaIndexTask.Status.PAUSED)) {
       Thread.sleep(25);
@@ -1539,7 +1426,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva
             new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1593,7 +1479,6 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c429265c697..60a47f0d58c 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -254,7 +254,6 @@ public void testNoInitialState() throws Exception
     Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
-    Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
     Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
     Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
     Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
@@ -1046,7 +1045,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception
         taskClient.setEndOffsetsAsync(
             EasyMock.contains("sequenceName-0"),
             EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
-            EasyMock.eq(true),
             EasyMock.eq(true)
         )
     ).andReturn(Futures.immediateFuture(true)).times(2);
@@ -1074,7 +1072,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception
       KafkaIOConfig taskConfig = kafkaIndexTask.getIOConfig();
       Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
       Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
-      Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
 
       Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic());
       Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
@@ -1164,7 +1161,6 @@ public void testDiscoverExistingPublishingTask() throws Exception
     Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
-    Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
 
     // check that the new task was created with starting offsets matching where the publishing task finished
     Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
@@ -1255,7 +1251,6 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation()
     Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
-    Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
 
     // check that the new task was created with starting offsets matching where the publishing task finished
     Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
@@ -1570,7 +1565,6 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception
         taskClient.setEndOffsetsAsync(
             EasyMock.contains("sequenceName-0"),
             EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
-            EasyMock.eq(true),
             EasyMock.eq(true)
         )
     ).andReturn(Futures.<Boolean>immediateFailedFuture(new RuntimeException())).times(2);
@@ -1695,7 +1689,7 @@ public void testStopGracefully() throws Exception
     expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
     expect(taskClient.pauseAsync("id2"))
         .andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
-    expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true))
+    expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
         .andReturn(Futures.immediateFuture(true));
     taskQueue.shutdown("id3");
     expectLastCall().times(2);
@@ -2125,7 +2119,6 @@ private KafkaIndexTask createKafkaIndexTask(
             endPartitions,
             ImmutableMap.<String, String>of(),
             true,
-            false,
             minimumMessageTime,
             maximumMessageTime,
             false
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index 8e18d725e43..aeb05781ec5 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -179,6 +179,7 @@ default int getPriority()
    */
   TaskStatus run(TaskToolbox toolbox) throws Exception;
 
+  @Nullable
   Map<String, Object> getContext();
 
   @Nullable
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
index 72a7f8b325c..74a1011abc5 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
@@ -23,12 +23,13 @@
 import io.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
 public class SegmentsAndMetadata
 {
-  private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(ImmutableList.<DataSegment>of(), null);
+  private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(Collections.emptyList(), null);
 
   private final Object commitMetadata;
   private final ImmutableList<DataSegment> segments;
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 6d1109202ea..33b823df88f 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -23,7 +23,6 @@
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -221,7 +220,7 @@ public Object persist(final Committer committer) throws InterruptedException
       throw e;
     }
     catch (Exception e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org