You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@druid.apache.org by ji...@apache.org on 2018/07/09 20:49:31 UTC

[incubator-druid] branch 0.12.2 updated: Fix the broken Appenderator contract in KafkaIndexTask (#5905) (#5983)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.12.2
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.12.2 by this push:
     new c637b3e  Fix the broken Appenderator contract in KafkaIndexTask (#5905) (#5983)
c637b3e is described below

commit c637b3eb4a823f1f3c64a6366979df10db34f3aa
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Mon Jul 9 13:49:28 2018 -0700

    Fix the broken Appenderator contract in KafkaIndexTask (#5905) (#5983)
    
    * Fix broken Appenderator contract in KafkaIndexTask
    
    * fix build
    
    * add publishFuture
    
    * reuse sequenceToUse if possible
---
 .../io/druid/indexing/kafka/KafkaIOConfig.java     |   11 -
 .../io/druid/indexing/kafka/KafkaIndexTask.java    | 1068 ++++++++++----------
 .../druid/indexing/kafka/KafkaIndexTaskClient.java |   37 +-
 .../indexing/kafka/supervisor/KafkaSupervisor.java |    3 +-
 .../io/druid/indexing/kafka/KafkaIOConfigTest.java |   14 +-
 .../indexing/kafka/KafkaIndexTaskClientTest.java   |   93 +-
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |  131 +--
 .../kafka/supervisor/KafkaSupervisorTest.java      |    9 +-
 .../java/io/druid/indexing/common/task/Task.java   |    1 +
 .../realtime/appenderator/SegmentsAndMetadata.java |    3 +-
 .../appenderator/StreamAppenderatorDriver.java     |    3 +-
 11 files changed, 590 insertions(+), 783 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
index d9a7fb2..5d48fe1 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -31,7 +31,6 @@ import java.util.Map;
 public class KafkaIOConfig implements IOConfig
 {
   private static final boolean DEFAULT_USE_TRANSACTION = true;
-  private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
   private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false;
 
   private final String baseSequenceName;
@@ -39,7 +38,6 @@ public class KafkaIOConfig implements IOConfig
   private final KafkaPartitions endPartitions;
   private final Map<String, String> consumerProperties;
   private final boolean useTransaction;
-  private final boolean pauseAfterRead;
   private final Optional<DateTime> minimumMessageTime;
   private final Optional<DateTime> maximumMessageTime;
   private final boolean skipOffsetGaps;
@@ -51,7 +49,6 @@ public class KafkaIOConfig implements IOConfig
       @JsonProperty("endPartitions") KafkaPartitions endPartitions,
       @JsonProperty("consumerProperties") Map<String, String> consumerProperties,
       @JsonProperty("useTransaction") Boolean useTransaction,
-      @JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
       @JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
       @JsonProperty("maximumMessageTime") DateTime maximumMessageTime,
       @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps
@@ -62,7 +59,6 @@ public class KafkaIOConfig implements IOConfig
     this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
     this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
     this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
-    this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
     this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
     this.maximumMessageTime = Optional.fromNullable(maximumMessageTime);
     this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS;
@@ -118,12 +114,6 @@ public class KafkaIOConfig implements IOConfig
   }
 
   @JsonProperty
-  public boolean isPauseAfterRead()
-  {
-    return pauseAfterRead;
-  }
-
-  @JsonProperty
   public Optional<DateTime> getMaximumMessageTime()
   {
     return maximumMessageTime;
@@ -150,7 +140,6 @@ public class KafkaIOConfig implements IOConfig
            ", endPartitions=" + endPartitions +
            ", consumerProperties=" + consumerProperties +
            ", useTransaction=" + useTransaction +
-           ", pauseAfterRead=" + pauseAfterRead +
            ", minimumMessageTime=" + minimumMessageTime +
            ", maximumMessageTime=" + maximumMessageTime +
            ", skipOffsetGaps=" + skipOffsetGaps +
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
index 62aca36..16a67ec 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -33,7 +33,6 @@ import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -42,8 +41,7 @@ import com.google.common.primitives.Longs;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
 import io.druid.data.input.Committer;
 import io.druid.data.input.InputRow;
 import io.druid.data.input.impl.InputRowParser;
@@ -68,7 +66,6 @@ import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.Intervals;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.collect.Utils;
-import io.druid.java.util.common.concurrent.Execs;
 import io.druid.java.util.common.guava.Sequence;
 import io.druid.java.util.common.parsers.ParseException;
 import io.druid.java.util.emitter.EmittingLogger;
@@ -127,23 +124,21 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -179,7 +174,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   private final AuthorizerMapper authorizerMapper;
   private final Optional<ChatHandlerProvider> chatHandlerProvider;
 
-  private final Map<Integer, Long> endOffsets = new ConcurrentHashMap<>();
+  private final Map<Integer, Long> endOffsets;
   private final Map<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
   private final Map<Integer, Long> maxEndOffsets = new HashMap<>();
   private final Map<Integer, Long> lastPersistedOffsets = new ConcurrentHashMap<>();
@@ -192,7 +187,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   private volatile DateTime startTime;
   private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread)
   private volatile Thread runThread = null;
-  private volatile File sequencesPersistFile = null;
   private final AtomicBoolean stopRequested = new AtomicBoolean(false);
   private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
 
@@ -232,20 +226,17 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   private final Object statusLock = new Object();
 
   private volatile boolean pauseRequested = false;
-  private volatile long pauseMillis = 0;
 
   // This value can be tuned in some tests
   private long pollRetryMs = 30000;
 
   private final Set<String> publishingSequences = Sets.newConcurrentHashSet();
-  private final BlockingQueue<SequenceMetadata> publishQueue = new LinkedBlockingQueue<>();
-  private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new CopyOnWriteArrayList<>(); // to prevent concurrency visibility issue
-  private final CountDownLatch waitForPublishes = new CountDownLatch(1);
-  private final AtomicReference<Throwable> throwableAtomicReference = new AtomicReference<>();
+  private final List<ListenableFuture<SegmentsAndMetadata>> publishWaitList = new LinkedList<>();
+  private final List<ListenableFuture<SegmentsAndMetadata>> handOffWaitList = new LinkedList<>();
   private final String topic;
 
   private volatile CopyOnWriteArrayList<SequenceMetadata> sequences;
-  private ListeningExecutorService publishExecService;
+  private volatile Throwable backgroundThreadException;
   private final boolean useLegacy;
 
   @JsonCreator
@@ -274,7 +265,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
     this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider);
     this.authorizerMapper = authorizerMapper;
-    this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap());
+    this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndPartitions().getPartitionOffsetMap());
     this.maxEndOffsets.putAll(endOffsets.entrySet()
                                         .stream()
                                         .collect(Collectors.toMap(
@@ -343,75 +334,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     return ioConfig;
   }
 
-  private void createAndStartPublishExecutor()
-  {
-    publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver"));
-    publishExecService.submit(
-        (Runnable) () -> {
-          while (true) {
-            try {
-              final SequenceMetadata sequenceMetadata = publishQueue.take();
-
-              Preconditions.checkNotNull(driver);
-
-              if (sequenceMetadata.isSentinel()) {
-                waitForPublishes.countDown();
-                break;
-              }
-
-              log.info("Publishing segments for sequence [%s]", sequenceMetadata);
-
-              final SegmentsAndMetadata result = driver.publish(
-                  sequenceMetadata.getPublisher(toolbox, ioConfig.isUseTransaction()),
-                  sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(),
-                  ImmutableList.of(sequenceMetadata.getSequenceName())
-              ).get();
-
-              if (result == null) {
-                throw new ISE(
-                    "Transaction failure publishing segments for sequence [%s]",
-                    sequenceMetadata
-                );
-              } else {
-                log.info(
-                    "Published segments[%s] with metadata[%s].",
-                    Joiner.on(", ").join(
-                        result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList())
-                    ),
-                    Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata")
-                );
-              }
-
-              sequences.remove(sequenceMetadata);
-              publishingSequences.remove(sequenceMetadata.getSequenceName());
-              try {
-                persistSequences();
-              }
-              catch (IOException e) {
-                log.error(e, "Unable to persist state, dying");
-                Throwables.propagate(e);
-              }
-
-              final ListenableFuture<SegmentsAndMetadata> handOffFuture = driver.registerHandoff(result);
-              handOffWaitList.add(handOffFuture);
-            }
-            catch (Throwable t) {
-              if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException
-                                                         && t.getCause() instanceof InterruptedException))) {
-                log.warn("Stopping publish thread as we are interrupted, probably we are shutting down");
-              } else {
-                log.makeAlert(t, "Error in publish thread, dying").emit();
-                throwableAtomicReference.set(t);
-              }
-              Futures.allAsList(handOffWaitList).cancel(true);
-              waitForPublishes.countDown();
-              break;
-            }
-          }
-        }
-    );
-  }
-
   @Override
   public TaskStatus run(final TaskToolbox toolbox) throws Exception
   {
@@ -426,47 +348,40 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     status = Status.STARTING;
     this.toolbox = toolbox;
 
-    if (getContext() != null && getContext().get("checkpoints") != null) {
-      log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints"));
-      final TreeMap<Integer, Map<Integer, Long>> checkpoints = toolbox.getObjectMapper().readValue(
-          (String) getContext().get("checkpoints"),
-          new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
-          {
-          }
-      );
-
-      Iterator<Map.Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
-      Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
-      while (sequenceOffsets.hasNext()) {
-        Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next();
+    if (!restoreSequences()) {
+      final TreeMap<Integer, Map<Integer, Long>> checkpoints = getCheckPointsFromContext(toolbox, this);
+      if (checkpoints != null) {
+        Iterator<Map.Entry<Integer, Map<Integer, Long>>> sequenceOffsets = checkpoints.entrySet().iterator();
+        Map.Entry<Integer, Map<Integer, Long>> previous = sequenceOffsets.next();
+        while (sequenceOffsets.hasNext()) {
+          Map.Entry<Integer, Map<Integer, Long>> current = sequenceOffsets.next();
+          sequences.add(new SequenceMetadata(
+              previous.getKey(),
+              StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
+              previous.getValue(),
+              current.getValue(),
+              true
+          ));
+          previous = current;
+        }
         sequences.add(new SequenceMetadata(
             previous.getKey(),
             StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
             previous.getValue(),
-            current.getValue(),
-            true
+            maxEndOffsets,
+            false
+        ));
+      } else {
+        sequences.add(new SequenceMetadata(
+            0,
+            StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
+            ioConfig.getStartPartitions().getPartitionOffsetMap(),
+            maxEndOffsets,
+            false
         ));
-        previous = current;
       }
-      sequences.add(new SequenceMetadata(
-          previous.getKey(),
-          StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()),
-          previous.getValue(),
-          maxEndOffsets,
-          false
-      ));
-    } else {
-      sequences.add(new SequenceMetadata(
-          0,
-          StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0),
-          ioConfig.getStartPartitions().getPartitionOffsetMap(),
-          maxEndOffsets,
-          false
-      ));
-    }
 
-    sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json");
-    restoreSequences();
+    }
     log.info("Starting with sequences:  %s", sequences);
 
     if (chatHandlerProvider.isPresent()) {
@@ -504,15 +419,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
         )
     );
 
-    try (
-        final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
-    ) {
+    try (final KafkaConsumer<byte[], byte[]> consumer = newConsumer()) {
       toolbox.getDataSegmentServerAnnouncer().announce();
       toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
 
       appenderator = newAppenderator(fireDepartmentMetrics, toolbox);
       driver = newDriver(appenderator, toolbox, fireDepartmentMetrics);
-      createAndStartPublishExecutor();
 
       final String topic = ioConfig.getStartPartitions().getTopic();
 
@@ -602,7 +514,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
       status = Status.READING;
       try {
         while (stillReading) {
-          if (possiblyPause(assignment)) {
+          if (possiblyPause()) {
             // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
             // partitions upon resuming. This is safe even if the end offsets have not been modified.
             assignment = assignPartitionsAndSeekToNext(consumer, topic);
@@ -615,8 +527,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           }
 
           // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
-          if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed()
-                                      && !ioConfig.isPauseAfterRead())) {
+          if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
             status = Status.PUBLISHING;
           }
 
@@ -624,11 +535,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
             break;
           }
 
-          checkAndMaybeThrowException();
-
-          if (!ioConfig.isPauseAfterRead()) {
-            maybePersistAndPublishSequences(committerSupplier);
+          if (backgroundThreadException != null) {
+            throw new RuntimeException(backgroundThreadException);
           }
+          checkPublishAndHandoffFailure();
+
+          maybePersistAndPublishSequences(committerSupplier);
 
           // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
           // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
@@ -640,19 +552,17 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
             possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
-            stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+            stillReading = !assignment.isEmpty();
           }
 
           SequenceMetadata sequenceToCheckpoint = null;
           for (ConsumerRecord<byte[], byte[]> record : records) {
-            if (log.isTraceEnabled()) {
-              log.trace(
-                  "Got topic[%s] partition[%d] offset[%,d].",
-                  record.topic(),
-                  record.partition(),
-                  record.offset()
-              );
-            }
+            log.trace(
+                "Got topic[%s] partition[%d] offset[%,d].",
+                record.topic(),
+                record.partition(),
+                record.offset()
+            );
 
             if (record.offset() < endOffsets.get(record.partition())) {
               if (record.offset() != nextOffsets.get(record.partition())) {
@@ -680,24 +590,23 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
                                             : parser.parseBatch(ByteBuffer.wrap(valueBytes));
                 boolean isPersistRequired = false;
 
-                for (InputRow row : rows) {
-                  if (row != null && withinMinMaxRecordTime(row)) {
-                    SequenceMetadata sequenceToUse = null;
-                    for (SequenceMetadata sequence : sequences) {
-                      if (sequence.canHandle(record)) {
-                        sequenceToUse = sequence;
-                      }
-                    }
+                final SequenceMetadata sequenceToUse = sequences
+                    .stream()
+                    .filter(sequenceMetadata -> sequenceMetadata.canHandle(record))
+                    .findFirst()
+                    .orElse(null);
 
-                    if (sequenceToUse == null) {
-                      throw new ISE(
-                          "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
-                          record.partition(),
-                          record.offset(),
-                          sequences
-                      );
-                    }
+                if (sequenceToUse == null) {
+                  throw new ISE(
+                      "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
+                      record.partition(),
+                      record.offset(),
+                      sequences
+                  );
+                }
 
+                for (InputRow row : rows) {
+                  if (row != null && withinMinMaxRecordTime(row)) {
                     final AppenderatorDriverAddResult addResult = driver.add(
                         row,
                         sequenceToUse.getSequenceName(),
@@ -751,7 +660,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
                         public void onFailure(Throwable t)
                         {
                           log.error("Persist failed, dying");
-                          throwableAtomicReference.set(t);
+                          backgroundThreadException = t;
                         }
                       }
                   );
@@ -779,11 +688,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
                 && assignment.remove(record.partition())) {
               log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
               assignPartitions(consumer, topic, assignment);
-              stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+              stillReading = !assignment.isEmpty();
             }
           }
 
-          if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) {
+          if (sequenceToCheckpoint != null && stillReading) {
             Preconditions.checkArgument(
                 sequences.get(sequences.size() - 1)
                          .getSequenceName()
@@ -792,7 +701,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
                 sequenceToCheckpoint,
                 sequences
             );
-            requestPause(PAUSE_FOREVER);
+            requestPause();
             if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction(
                 getDataSource(),
                 ioConfig.getBaseSequenceName(),
@@ -804,6 +713,11 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           }
         }
       }
+      catch (Exception e) {
+        // (1) catch all exceptions while reading from kafka
+        log.error(e, "Encountered exception in run() before persisting.");
+        throw e;
+      }
       finally {
         log.info("Persisting all pending data");
         driver.persist(committerSupplier.get()); // persist pending data
@@ -824,16 +738,23 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           sequenceMetadata.updateAssignments(nextOffsets);
           publishingSequences.add(sequenceMetadata.getSequenceName());
           // persist already done in finally, so directly add to publishQueue
-          publishQueue.add(sequenceMetadata);
+          publishAndRegisterHandoff(sequenceMetadata);
         }
       }
 
-      // add Sentinel SequenceMetadata to indicate end of all sequences
-      publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata());
-      waitForPublishes.await();
-      checkAndMaybeThrowException();
+      if (backgroundThreadException != null) {
+        throw new RuntimeException(backgroundThreadException);
+      }
+
+      // Wait for publish futures to complete.
+      Futures.allAsList(publishWaitList).get();
 
-      List<SegmentsAndMetadata> handedOffList = Lists.newArrayList();
+      // Wait for handoff futures to complete.
+      // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding
+      // handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it
+      // failed to persist sequences. It might also return null if handoff failed, but was recoverable.
+      // See publishAndRegisterHandoff() for details.
+      List<SegmentsAndMetadata> handedOffList = Collections.emptyList();
       if (tuningConfig.getHandoffConditionTimeout() == 0) {
         handedOffList = Futures.allAsList(handOffWaitList).get();
       } else {
@@ -842,6 +763,8 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
                                  .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
         }
         catch (TimeoutException e) {
+          // Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception
+          // here.
           log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
              .addData("TaskId", this.getId())
              .emit();
@@ -861,8 +784,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           );
         }
       }
+
+      appenderator.close();
     }
     catch (InterruptedException | RejectedExecutionException e) {
+      // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including
+      // the final publishing.
+      Futures.allAsList(publishWaitList).cancel(true);
+      Futures.allAsList(handOffWaitList).cancel(true);
       appenderator.closeNow();
       // handle the InterruptedException that gets wrapped in a RejectedExecutionException
       if (e instanceof RejectedExecutionException
@@ -878,14 +807,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
 
       log.info("The task was asked to stop before completing");
     }
+    catch (Exception e) {
+      // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing.
+      Futures.allAsList(publishWaitList).cancel(true);
+      Futures.allAsList(handOffWaitList).cancel(true);
+      appenderator.closeNow();
+      throw e;
+    }
     finally {
-      if (appenderator != null) {
-        if (throwableAtomicReference.get() != null) {
-          appenderator.closeNow();
-        } else {
-          appenderator.close();
-        }
-      }
       if (driver != null) {
         driver.close();
       }
@@ -893,10 +822,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
         chatHandlerProvider.get().unregister(getId());
       }
 
-      if (publishExecService != null) {
-        publishExecService.shutdownNow();
-      }
-
       toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
       toolbox.getDataSegmentServerAnnouncer().unannounce();
     }
@@ -947,9 +872,9 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     );
 
     try (
-            final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
-            final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
-            final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
+        final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
+        final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics);
+        final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
     ) {
       toolbox.getDataSegmentServerAnnouncer().announce();
       toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
@@ -1032,7 +957,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
       status = Status.READING;
       try {
         while (stillReading) {
-          if (possiblyPause(assignment)) {
+          if (possiblyPause()) {
             // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
             // partitions upon resuming. This is safe even if the end offsets have not been modified.
             assignment = assignPartitionsAndSeekToNext(consumer, topic);
@@ -1058,7 +983,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           catch (OffsetOutOfRangeException e) {
             log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
             possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
-            stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+            stillReading = !assignment.isEmpty();
           }
 
           for (ConsumerRecord<byte[], byte[]> record : records) {
@@ -1158,7 +1083,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
                 && assignment.remove(record.partition())) {
               log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
               assignPartitions(consumer, topic, assignment);
-              stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
+              stillReading = !assignment.isEmpty();
             }
           }
         }
@@ -1211,32 +1136,37 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           sequenceNames.values()
       ).get();
 
+      final List<String> publishedSegments = published.getSegments()
+                                                      .stream()
+                                                      .map(DataSegment::getIdentifier)
+                                                      .collect(Collectors.toList());
+      log.info(
+          "Published segments[%s] with metadata[%s].",
+          publishedSegments,
+          Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata")
+      );
+
       final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
-      final SegmentsAndMetadata handedOff;
+      SegmentsAndMetadata handedOff = null;
       if (tuningConfig.getHandoffConditionTimeout() == 0) {
         handedOff = handoffFuture.get();
       } else {
-        handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
+        try {
+          handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
+        }
+        catch (TimeoutException e) {
+          log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
+             .addData("TaskId", getId())
+             .emit();
+        }
       }
 
       if (handedOff == null) {
-        throw new ISE("Transaction failure publishing segments, aborting");
+        log.warn("Failed to handoff segments[%s]", publishedSegments);
       } else {
         log.info(
-            "Published segments[%s] with metadata[%s].",
-            Joiner.on(", ").join(
-                Iterables.transform(
-                    handedOff.getSegments(),
-                    new Function<DataSegment, String>()
-                    {
-                      @Override
-                      public String apply(DataSegment input)
-                      {
-                        return input.getIdentifier();
-                      }
-                    }
-                )
-            ),
+            "Handoff completed for segments[%s] with metadata[%s]",
+            handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()),
             Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
         );
       }
@@ -1268,13 +1198,156 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     return success();
   }
 
-  private void checkAndMaybeThrowException()
+  private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException
+  {
+    // Check if any publishFuture failed.
+    final List<ListenableFuture<SegmentsAndMetadata>> publishFinished = publishWaitList
+        .stream()
+        .filter(Future::isDone)
+        .collect(Collectors.toList());
+
+    for (ListenableFuture<SegmentsAndMetadata> publishFuture : publishFinished) {
+      // If publishFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3).
+      publishFuture.get();
+    }
+
+    publishWaitList.removeAll(publishFinished);
+
+    // Check if any handoffFuture failed.
+    final List<ListenableFuture<SegmentsAndMetadata>> handoffFinished = handOffWaitList
+        .stream()
+        .filter(Future::isDone)
+        .collect(Collectors.toList());
+
+    for (ListenableFuture<SegmentsAndMetadata> handoffFuture : handoffFinished) {
+      // If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3).
+      handoffFuture.get();
+    }
+
+    handOffWaitList.removeAll(handoffFinished);
+  }
+
+  private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata)
+  {
+    log.info("Publishing segments for sequence [%s]", sequenceMetadata);
+
+    final ListenableFuture<SegmentsAndMetadata> publishFuture = Futures.transform(
+        driver.publish(
+            sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()),
+            sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(),
+            Collections.singletonList(sequenceMetadata.getSequenceName())
+        ),
+        (Function<SegmentsAndMetadata, SegmentsAndMetadata>) publishedSegmentsAndMetadata -> {
+          if (publishedSegmentsAndMetadata == null) {
+            throw new ISE(
+                "Transaction failure publishing segments for sequence [%s]",
+                sequenceMetadata
+            );
+          } else {
+            return publishedSegmentsAndMetadata;
+          }
+        }
+    );
+    publishWaitList.add(publishFuture);
+
+    // Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails.
+    final SettableFuture<SegmentsAndMetadata> handoffFuture = SettableFuture.create();
+    handOffWaitList.add(handoffFuture);
+
+    Futures.addCallback(
+        publishFuture,
+        new FutureCallback<SegmentsAndMetadata>()
+        {
+          @Override
+          public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata)
+          {
+            log.info(
+                "Published segments[%s] with metadata[%s].",
+                publishedSegmentsAndMetadata.getSegments()
+                                            .stream()
+                                            .map(DataSegment::getIdentifier)
+                                            .collect(Collectors.toList()),
+                Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata")
+            );
+
+            sequences.remove(sequenceMetadata);
+            publishingSequences.remove(sequenceMetadata.getSequenceName());
+            try {
+              persistSequences();
+            }
+            catch (IOException e) {
+              log.error(e, "Unable to persist state, dying");
+              handoffFuture.setException(e);
+              throw new RuntimeException(e);
+            }
+
+            Futures.transform(
+                driver.registerHandoff(publishedSegmentsAndMetadata),
+                new Function<SegmentsAndMetadata, Void>()
+                {
+                  @Nullable
+                  @Override
+                  public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata)
+                  {
+                    if (handoffSegmentsAndMetadata == null) {
+                      log.warn(
+                          "Failed to handoff segments[%s]",
+                          publishedSegmentsAndMetadata.getSegments()
+                                                      .stream()
+                                                      .map(DataSegment::getIdentifier)
+                                                      .collect(Collectors.toList())
+                      );
+                    }
+                    handoffFuture.set(handoffSegmentsAndMetadata);
+                    return null;
+                  }
+                }
+            );
+          }
+
+          @Override
+          public void onFailure(Throwable t)
+          {
+            log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata);
+            handoffFuture.setException(t);
+          }
+        }
+    );
+  }
+
+  private static File getSequencesPersistFile(TaskToolbox toolbox)
+  {
+    return new File(toolbox.getPersistDir(), "sequences.json");
+  }
+
+  private boolean restoreSequences() throws IOException
   {
-    if (throwableAtomicReference.get() != null) {
-      Throwables.propagate(throwableAtomicReference.get());
+    final File sequencesPersistFile = getSequencesPersistFile(toolbox);
+    if (sequencesPersistFile.exists()) {
+      sequences = new CopyOnWriteArrayList<>(
+          toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
+              sequencesPersistFile,
+              new TypeReference<List<SequenceMetadata>>()
+              {
+              }
+          )
+      );
+      return true;
+    } else {
+      return false;
     }
   }
 
+  private synchronized void persistSequences() throws IOException
+  {
+    log.info("Persisting Sequences Metadata [%s]", sequences);
+    toolbox.getObjectMapper().writerWithType(
+        new TypeReference<List<SequenceMetadata>>()
+        {
+        }
+    ).writeValue(getSequencesPersistFile(toolbox), sequences);
+  }
+
   private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier)
       throws InterruptedException
   {
@@ -1289,7 +1362,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
               result,
               sequenceMetadata
           );
-          publishQueue.add(sequenceMetadata);
+          publishAndRegisterHandoff(sequenceMetadata);
         }
         catch (InterruptedException e) {
           log.warn("Interrupted while persisting sequence [%s]", sequenceMetadata);
@@ -1299,73 +1372,194 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     }
   }
 
-  private void restoreSequences() throws IOException
+  private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
   {
-    Preconditions.checkNotNull(sequencesPersistFile);
-    if (sequencesPersistFile.exists()) {
-      sequences = new CopyOnWriteArrayList<>(toolbox.getObjectMapper().<List<SequenceMetadata>>readValue(
-          sequencesPersistFile, new TypeReference<List<SequenceMetadata>>()
-          {
-          }));
+    // Initialize consumer assignment.
+    final Set<Integer> assignment = Sets.newHashSet();
+    for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
+      final long endOffset = endOffsets.get(entry.getKey());
+      if (entry.getValue() < endOffset) {
+        assignment.add(entry.getKey());
+      } else if (entry.getValue() == endOffset) {
+        log.info("Finished reading partition[%d].", entry.getKey());
+      } else {
+        throw new ISE(
+            "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
+            entry.getValue(),
+            endOffset
+        );
+      }
     }
-  }
 
-  private synchronized void persistSequences() throws IOException
-  {
-    log.info("Persisting Sequences Metadata [%s]", sequences);
-    toolbox.getObjectMapper().writerWithType(
-        new TypeReference<List<SequenceMetadata>>()
-        {
-        }
-    ).writeValue(sequencesPersistFile, sequences);
-  }
+    KafkaIndexTask.assignPartitions(consumer, topic, assignment);
 
-  @Override
-  public boolean canRestore()
-  {
-    return true;
+    // Seek to starting offsets.
+    for (final int partition : assignment) {
+      final long offset = nextOffsets.get(partition);
+      log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
+      consumer.seek(new TopicPartition(topic, partition), offset);
+    }
+
+    return assignment;
   }
 
   /**
-   * Authorizes action to be performed on this task's datasource
+   * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared.
+   * <p/>
+   * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
+   * <p/>
    *
-   * @return authorization result
+   * @return true if a pause request was handled, false otherwise
    */
-  private Access authorizationCheck(final HttpServletRequest req, Action action)
+  private boolean possiblyPause() throws InterruptedException
   {
-    ResourceAction resourceAction = new ResourceAction(
-        new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE),
-        action
-    );
+    pauseLock.lockInterruptibly();
+    try {
+      if (pauseRequested) {
+        status = Status.PAUSED;
+        hasPaused.signalAll();
 
-    Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper);
-    if (!access.isAllowed()) {
-      throw new ForbiddenException(access.toString());
-    }
+        while (pauseRequested) {
+          log.info("Pausing ingestion until resumed");
+          shouldResume.await();
+        }
 
-    return access;
-  }
+        status = Status.READING;
+        shouldResume.signalAll();
+        log.info("Ingestion loop resumed");
+        return true;
+      }
+    }
+    finally {
+      pauseLock.unlock();
+    }
 
-  @VisibleForTesting
-  Appenderator getAppenderator()
-  {
-    return appenderator;
+    return false;
   }
 
-  @Override
-  public void stopGracefully()
+  private void possiblyResetOffsetsOrWait(
+      Map<TopicPartition, Long> outOfRangePartitions,
+      KafkaConsumer<byte[], byte[]> consumer,
+      TaskToolbox taskToolbox
+  ) throws InterruptedException, IOException
   {
-    log.info("Stopping gracefully (status: [%s])", status);
-    stopRequested.set(true);
-
-    synchronized (statusLock) {
-      if (status == Status.PUBLISHING) {
-        runThread.interrupt();
-        return;
+    final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap();
+    boolean doReset = false;
+    if (tuningConfig.isResetOffsetAutomatically()) {
+      for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
+        final TopicPartition topicPartition = outOfRangePartition.getKey();
+        final long nextOffset = outOfRangePartition.getValue();
+        // seek to the beginning to get the least available offset
+        consumer.seekToBeginning(Collections.singletonList(topicPartition));
+        final long leastAvailableOffset = consumer.position(topicPartition);
+        // reset the seek
+        consumer.seek(topicPartition, nextOffset);
+        // Reset consumer offset if resetOffsetAutomatically is set to true
+        // and the current message offset in the kafka partition is more than the
+        // next message offset that we are trying to fetch
+        if (leastAvailableOffset > nextOffset) {
+          doReset = true;
+          resetPartitions.put(topicPartition, nextOffset);
+        }
       }
     }
 
-    try {
+    if (doReset) {
+      sendResetRequestAndWait(resetPartitions, taskToolbox);
+    } else {
+      log.warn("Retrying in %dms", pollRetryMs);
+      pollRetryLock.lockInterruptibly();
+      try {
+        long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs);
+        while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
+          nanos = isAwaitingRetry.awaitNanos(nanos);
+        }
+      }
+      finally {
+        pollRetryLock.unlock();
+      }
+    }
+  }
+
+  private void requestPause()
+  {
+    pauseRequested = true;
+  }
+
+  private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
+      throws IOException
+  {
+    Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
+    for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
+      partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
+    }
+    boolean result = taskToolbox.getTaskActionClient()
+                                .submit(new ResetDataSourceMetadataAction(
+                                    getDataSource(),
+                                    new KafkaDataSourceMetadata(new KafkaPartitions(
+                                        ioConfig.getStartPartitions()
+                                                .getTopic(),
+                                        partitionOffsetMap
+                                    ))
+                                ));
+
+    if (result) {
+      log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource())
+         .addData("partitions", partitionOffsetMap.keySet())
+         .emit();
+      // wait for being killed by supervisor
+      requestPause();
+    } else {
+      log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
+    }
+  }
+
+  @Override
+  public boolean canRestore()
+  {
+    return true;
+  }
+
+  /**
+   * Authorizes action to be performed on this task's datasource
+   *
+   * @return authorization result
+   */
+  private Access authorizationCheck(final HttpServletRequest req, Action action)
+  {
+    ResourceAction resourceAction = new ResourceAction(
+        new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE),
+        action
+    );
+
+    Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper);
+    if (!access.isAllowed()) {
+      throw new ForbiddenException(access.toString());
+    }
+
+    return access;
+  }
+
+  @VisibleForTesting
+  Appenderator getAppenderator()
+  {
+    return appenderator;
+  }
+
+  @Override
+  public void stopGracefully()
+  {
+    log.info("Stopping gracefully (status: [%s])", status);
+    stopRequested.set(true);
+
+    synchronized (statusLock) {
+      if (status == Status.PUBLISHING) {
+        runThread.interrupt();
+        return;
+      }
+    }
+
+    try {
       if (pauseLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
         try {
           if (pauseRequested) {
@@ -1474,25 +1668,23 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   @Produces(MediaType.APPLICATION_JSON)
   public Response setEndOffsetsHTTP(
       Map<Integer, Long> offsets,
-      @QueryParam("resume") @DefaultValue("false") final boolean resume,
       @QueryParam("finish") @DefaultValue("true") final boolean finish,
       // this field is only for internal purposes, shouldn't be usually set by users
       @Context final HttpServletRequest req
   ) throws InterruptedException
   {
     authorizationCheck(req, Action.WRITE);
-    return setEndOffsets(offsets, resume, finish);
+    return setEndOffsets(offsets, finish);
   }
 
   public Response setEndOffsets(
       Map<Integer, Long> offsets,
-      final boolean resume,
       final boolean finish // this field is only for internal purposes, shouldn't be usually set by users
   ) throws InterruptedException
   {
     // for backwards compatibility, should be removed from versions greater than 0.12.x
     if (useLegacy) {
-      return setEndOffsetsLegacy(offsets, resume);
+      return setEndOffsetsLegacy(offsets);
     }
 
     if (offsets == null) {
@@ -1520,7 +1712,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
             (latestSequence.getEndOffsets().equals(offsets) && finish)) {
           log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequences);
           return Response.ok(offsets).build();
-        } else if (latestSequence.isCheckpointed() && !ioConfig.isPauseAfterRead()) {
+        } else if (latestSequence.isCheckpointed()) {
           return Response.status(Response.Status.BAD_REQUEST)
                          .entity(StringUtils.format(
                              "WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]",
@@ -1553,13 +1745,12 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           log.info("Updating endOffsets from [%s] to [%s]", endOffsets, offsets);
           endOffsets.putAll(offsets);
         } else {
-          Preconditions.checkState(!ioConfig.isPauseAfterRead());
           // create new sequence
           final SequenceMetadata newSequence = new SequenceMetadata(
               latestSequence.getSequenceId() + 1,
               StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1),
               offsets,
-              maxEndOffsets,
+              endOffsets,
               false
           );
           sequences.add(newSequence);
@@ -1569,77 +1760,23 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
       }
       catch (Exception e) {
         log.error(e, "Unable to set end offsets, dying");
-        throwableAtomicReference.set(e);
-        Throwables.propagate(e);
+        backgroundThreadException = e;
+        // should resume to immediately finish kafka index task as failed
+        resume();
+        return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+                       .entity(Throwables.getStackTraceAsString(e))
+                       .build();
       }
       finally {
         pauseLock.unlock();
       }
     }
 
-    if (resume) {
-      resume();
-    }
+    resume();
 
     return Response.ok(offsets).build();
   }
 
-  private Response setEndOffsetsLegacy(
-      Map<Integer, Long> offsets,
-      final boolean resume
-  ) throws InterruptedException
-  {
-    if (offsets == null) {
-      return Response.status(Response.Status.BAD_REQUEST)
-                     .entity("Request body must contain a map of { partition:endOffset }")
-                     .build();
-    } else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
-      return Response.status(Response.Status.BAD_REQUEST)
-                     .entity(
-                         StringUtils.format(
-                             "Request contains partitions not being handled by this task, my partitions: %s",
-                             endOffsets.keySet()
-                         )
-                     )
-                     .build();
-    }
-
-    pauseLock.lockInterruptibly();
-    try {
-      if (!isPaused()) {
-        return Response.status(Response.Status.BAD_REQUEST)
-                       .entity("Task must be paused before changing the end offsets")
-                       .build();
-      }
-
-      for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
-        if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
-          return Response.status(Response.Status.BAD_REQUEST)
-                         .entity(
-                             StringUtils.format(
-                                 "End offset must be >= current offset for partition [%s] (current: %s)",
-                                 entry.getKey(),
-                                 nextOffsets.get(entry.getKey())
-                             )
-                         )
-                         .build();
-        }
-      }
-
-      endOffsets.putAll(offsets);
-      log.info("endOffsets changed to %s", endOffsets);
-    }
-    finally {
-      pauseLock.unlock();
-    }
-
-    if (resume) {
-      resume();
-    }
-
-    return Response.ok(endOffsets).build();
-  }
-
   @GET
   @Path("/checkpoints")
   @Produces(MediaType.APPLICATION_JSON)
@@ -1649,7 +1786,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     return getCheckpoints();
   }
 
-  public Map<Integer, Map<Integer, Long>> getCheckpoints()
+  private Map<Integer, Map<Integer, Long>> getCheckpoints()
   {
     TreeMap<Integer, Map<Integer, Long>> result = new TreeMap<>();
     result.putAll(
@@ -1661,8 +1798,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   /**
    * Signals the ingestion loop to pause.
    *
-   * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely
-   *
    * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the
    * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets
    * in the response body if the task successfully paused
@@ -1671,15 +1806,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
   @Path("/pause")
   @Produces(MediaType.APPLICATION_JSON)
   public Response pauseHTTP(
-      @QueryParam("timeout") @DefaultValue("0") final long timeout,
       @Context final HttpServletRequest req
   ) throws InterruptedException
   {
     authorizationCheck(req, Action.WRITE);
-    return pause(timeout);
+    return pause();
   }
 
-  public Response pause(final long timeout) throws InterruptedException
+  public Response pause() throws InterruptedException
   {
     if (!(status == Status.PAUSED || status == Status.READING)) {
       return Response.status(Response.Status.BAD_REQUEST)
@@ -1689,7 +1823,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
 
     pauseLock.lockInterruptibly();
     try {
-      pauseMillis = timeout <= 0 ? PAUSE_FOREVER : timeout;
       pauseRequested = true;
 
       pollRetryLock.lockInterruptibly();
@@ -1764,6 +1897,79 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     return startTime;
   }
 
+  @Nullable
+  private static TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
+      TaskToolbox toolbox,
+      KafkaIndexTask task
+  ) throws IOException
+  {
+    final String checkpointsString = task.getContextValue("checkpoints");
+    if (checkpointsString != null) {
+      log.info("Checkpoints [%s]", checkpointsString);
+      return toolbox.getObjectMapper().readValue(
+          checkpointsString,
+          new TypeReference<TreeMap<Integer, Map<Integer, Long>>>()
+          {
+          }
+      );
+    } else {
+      return null;
+    }
+  }
+
+  private Response setEndOffsetsLegacy(
+      Map<Integer, Long> offsets
+  ) throws InterruptedException
+  {
+    if (offsets == null) {
+      return Response.status(Response.Status.BAD_REQUEST)
+                     .entity("Request body must contain a map of { partition:endOffset }")
+                     .build();
+    } else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
+      return Response.status(Response.Status.BAD_REQUEST)
+                     .entity(
+                         StringUtils.format(
+                             "Request contains partitions not being handled by this task, my partitions: %s",
+                             endOffsets.keySet()
+                         )
+                     )
+                     .build();
+    }
+
+    pauseLock.lockInterruptibly();
+    try {
+      if (!isPaused()) {
+        return Response.status(Response.Status.BAD_REQUEST)
+                       .entity("Task must be paused before changing the end offsets")
+                       .build();
+      }
+
+      for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
+        if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
+          return Response.status(Response.Status.BAD_REQUEST)
+                         .entity(
+                             StringUtils.format(
+                                 "End offset must be >= current offset for partition [%s] (current: %s)",
+                                 entry.getKey(),
+                                 nextOffsets.get(entry.getKey())
+                             )
+                         )
+                         .build();
+        }
+      }
+
+      endOffsets.putAll(offsets);
+      log.info("endOffsets changed to %s", endOffsets);
+    }
+    finally {
+      pauseLock.unlock();
+    }
+
+    resume();
+
+    return Response.ok(endOffsets).build();
+  }
+
   @VisibleForTesting
   FireDepartmentMetrics getFireDepartmentMetrics()
   {
@@ -1775,12 +1981,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     return status == Status.PAUSED;
   }
 
-  private void requestPause(long pauseMillis)
-  {
-    this.pauseMillis = pauseMillis;
-    pauseRequested = true;
-  }
-
   private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
   {
     return Appenderators.createRealtime(
@@ -1854,169 +2054,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     );
   }
 
-  private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
-  {
-    // Initialize consumer assignment.
-    final Set<Integer> assignment = Sets.newHashSet();
-    for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
-      final long endOffset = endOffsets.get(entry.getKey());
-      if (entry.getValue() < endOffset) {
-        assignment.add(entry.getKey());
-      } else if (entry.getValue() == endOffset) {
-        log.info("Finished reading partition[%d].", entry.getKey());
-      } else {
-        throw new ISE(
-            "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
-            entry.getValue(),
-            endOffset
-        );
-      }
-    }
-
-    assignPartitions(consumer, topic, assignment);
-
-    // Seek to starting offsets.
-    for (final int partition : assignment) {
-      final long offset = nextOffsets.get(partition);
-      log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
-      consumer.seek(new TopicPartition(topic, partition), offset);
-    }
-
-    return assignment;
-  }
-
-  /**
-   * Checks if the pauseRequested flag was set and if so blocks:
-   * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared
-   * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared
-   * <p/>
-   * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the
-   * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume
-   * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal
-   * shouldResume after adjusting pauseMillis for the new value to take effect.
-   * <p/>
-   * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
-   * <p/>
-   * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set.
-   *
-   * @return true if a pause request was handled, false otherwise
-   */
-  private boolean possiblyPause(Set<Integer> assignment) throws InterruptedException
-  {
-    pauseLock.lockInterruptibly();
-    try {
-      if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) {
-        pauseMillis = PAUSE_FOREVER;
-        pauseRequested = true;
-      }
-
-      if (pauseRequested) {
-        status = Status.PAUSED;
-        long nanos = 0;
-        hasPaused.signalAll();
-
-        while (pauseRequested) {
-          if (pauseMillis == PAUSE_FOREVER) {
-            log.info("Pausing ingestion until resumed");
-            shouldResume.await();
-          } else {
-            if (pauseMillis > 0) {
-              log.info("Pausing ingestion for [%,d] ms", pauseMillis);
-              nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis);
-              pauseMillis = 0;
-            }
-            if (nanos <= 0L) {
-              pauseRequested = false; // timeout elapsed
-            }
-            nanos = shouldResume.awaitNanos(nanos);
-          }
-        }
-
-        status = Status.READING;
-        shouldResume.signalAll();
-        log.info("Ingestion loop resumed");
-        return true;
-      }
-    }
-    finally {
-      pauseLock.unlock();
-    }
-
-    return false;
-  }
-
-  private void possiblyResetOffsetsOrWait(
-      Map<TopicPartition, Long> outOfRangePartitions,
-      KafkaConsumer<byte[], byte[]> consumer,
-      TaskToolbox taskToolbox
-  ) throws InterruptedException, IOException
-  {
-    final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap();
-    boolean doReset = false;
-    if (tuningConfig.isResetOffsetAutomatically()) {
-      for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
-        final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
-        consumer.seekToBeginning(Collections.singletonList(topicPartition));
-        final long leastAvailableOffset = consumer.position(topicPartition);
-        // reset the seek
-        consumer.seek(topicPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
-        }
-      }
-    }
-
-    if (doReset) {
-      sendResetRequestAndWait(resetPartitions, taskToolbox);
-    } else {
-      log.warn("Retrying in %dms", pollRetryMs);
-      pollRetryLock.lockInterruptibly();
-      try {
-        long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs);
-        while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
-          nanos = isAwaitingRetry.awaitNanos(nanos);
-        }
-      }
-      finally {
-        pollRetryLock.unlock();
-      }
-    }
-  }
-
-  private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
-      throws IOException
-  {
-    Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
-    for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
-      partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
-    }
-    boolean result = taskToolbox.getTaskActionClient()
-                                .submit(new ResetDataSourceMetadataAction(
-                                    getDataSource(),
-                                    new KafkaDataSourceMetadata(new KafkaPartitions(
-                                        ioConfig.getStartPartitions()
-                                                .getTopic(),
-                                        partitionOffsetMap
-                                    ))
-                                ));
-
-    if (result) {
-      log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource())
-         .addData("partitions", partitionOffsetMap.keySet())
-         .emit();
-      // wait for being killed by supervisor
-      requestPause(PAUSE_FOREVER);
-    } else {
-      log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
-    }
-  }
-
   private boolean withinMinMaxRecordTime(final InputRow row)
   {
     final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent()
@@ -2142,7 +2179,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
       return sentinel;
     }
 
-    public void setEndOffsets(Map<Integer, Long> newEndOffsets)
+    void setEndOffsets(Map<Integer, Long> newEndOffsets)
     {
       lock.lock();
       try {
@@ -2154,15 +2191,14 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
       }
     }
 
-    public void updateAssignments(Map<Integer, Long> nextPartitionOffset)
+    void updateAssignments(Map<Integer, Long> nextPartitionOffset)
     {
       lock.lock();
       try {
         assignments.clear();
-        nextPartitionOffset.entrySet().forEach(partitionOffset -> {
-          if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
-              > 0) {
-            assignments.add(partitionOffset.getKey());
+        nextPartitionOffset.forEach((key, value) -> {
+          if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) {
+            assignments.add(key);
           }
         });
       }
@@ -2171,7 +2207,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
       }
     }
 
-    public boolean isOpen()
+    boolean isOpen()
     {
       return !assignments.isEmpty();
     }
@@ -2180,32 +2216,17 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
     {
       lock.lock();
       try {
+        final Long partitionEndOffset = endOffsets.get(record.partition());
         return isOpen()
-               && endOffsets.get(record.partition()) != null
+               && partitionEndOffset != null
                && record.offset() >= startOffsets.get(record.partition())
-               && record.offset() < endOffsets.get(record.partition());
+               && record.offset() < partitionEndOffset;
       }
       finally {
         lock.unlock();
       }
     }
 
-    private SequenceMetadata()
-    {
-      this.sequenceId = -1;
-      this.sequenceName = null;
-      this.startOffsets = null;
-      this.endOffsets = null;
-      this.assignments = null;
-      this.checkpointed = true;
-      this.sentinel = true;
-    }
-
-    public static SequenceMetadata getSentinelSequenceMetadata()
-    {
-      return new SequenceMetadata();
-    }
-
     @Override
     public String toString()
     {
@@ -2226,8 +2247,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
       }
     }
 
-
-    public Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets)
+    Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets)
     {
       // Set up committer.
       return () ->
@@ -2280,7 +2300,7 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler
           };
     }
 
-    public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction)
+    TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction)
     {
       return (segments, commitMetadata) -> {
         final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
index 6525d12..1c20980 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java
@@ -29,22 +29,22 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
-import io.druid.java.util.http.client.response.FullResponseHolder;
+import io.druid.indexer.TaskLocation;
 import io.druid.indexing.common.RetryPolicy;
 import io.druid.indexing.common.RetryPolicyConfig;
 import io.druid.indexing.common.RetryPolicyFactory;
 import io.druid.indexing.common.TaskInfoProvider;
-import io.druid.indexer.TaskLocation;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.java.util.common.IAE;
 import io.druid.java.util.common.IOE;
 import io.druid.java.util.common.ISE;
 import io.druid.java.util.common.StringUtils;
 import io.druid.java.util.common.concurrent.Execs;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
 import io.druid.segment.realtime.firehose.ChatHandlerResource;
 import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.handler.codec.http.HttpMethod;
@@ -166,19 +166,14 @@ public class KafkaIndexTaskClient
 
   public Map<Integer, Long> pause(final String id)
   {
-    return pause(id, 0);
-  }
-
-  public Map<Integer, Long> pause(final String id, final long timeout)
-  {
-    log.debug("Pause task[%s] timeout[%d]", id, timeout);
+    log.debug("Pause task[%s]", id);
 
     try {
       final FullResponseHolder response = submitRequest(
           id,
           HttpMethod.POST,
           "pause",
-          timeout > 0 ? StringUtils.format("timeout=%d", timeout) : null,
+          null,
           true
       );
 
@@ -320,18 +315,17 @@ public class KafkaIndexTaskClient
   public boolean setEndOffsets(
       final String id,
       final Map<Integer, Long> endOffsets,
-      final boolean resume,
       final boolean finalize
   )
   {
-    log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize);
+    log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize);
 
     try {
       final FullResponseHolder response = submitRequest(
           id,
           HttpMethod.POST,
           "offsets/end",
-          StringUtils.format("resume=%s&finish=%s", resume, finalize),
+          StringUtils.format("finish=%s", finalize),
           jsonMapper.writeValueAsBytes(endOffsets),
           true
       );
@@ -375,18 +369,13 @@ public class KafkaIndexTaskClient
 
   public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id)
   {
-    return pauseAsync(id, 0);
-  }
-
-  public ListenableFuture<Map<Integer, Long>> pauseAsync(final String id, final long timeout)
-  {
     return executorService.submit(
         new Callable<Map<Integer, Long>>()
         {
           @Override
           public Map<Integer, Long> call() throws Exception
           {
-            return pause(id, timeout);
+            return pause(id);
           }
         }
     );
@@ -449,7 +438,7 @@ public class KafkaIndexTaskClient
   }
 
   public ListenableFuture<Boolean> setEndOffsetsAsync(
-      final String id, final Map<Integer, Long> endOffsets, final boolean resume, final boolean finalize
+      final String id, final Map<Integer, Long> endOffsets, final boolean finalize
   )
   {
     return executorService.submit(
@@ -458,7 +447,7 @@ public class KafkaIndexTaskClient
           @Override
           public Boolean call() throws Exception
           {
-            return setEndOffsets(id, endOffsets, resume, finalize);
+            return setEndOffsets(id, endOffsets, finalize);
           }
         }
     );
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index f79b297..454deff 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -1517,7 +1517,7 @@ public class KafkaSupervisor implements Supervisor
 
               log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets);
               for (final String taskId : setEndOffsetTaskIds) {
-                setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize));
+                setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize));
               }
 
               List<Boolean> results = Futures.successfulAsList(setEndOffsetFutures)
@@ -1764,7 +1764,6 @@ public class KafkaSupervisor implements Supervisor
         new KafkaPartitions(ioConfig.getTopic(), endPartitions),
         consumerProperties,
         true,
-        false,
         minimumMessageTime,
         maximumMessageTime,
         ioConfig.isSkipOffsetGaps()
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
index 49a9b90..22a792f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java
@@ -71,8 +71,7 @@ public class KafkaIOConfigTest
     Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
     Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
-    Assert.assertEquals(true, config.isUseTransaction());
-    Assert.assertEquals(false, config.isPauseAfterRead());
+    Assert.assertTrue(config.isUseTransaction());
     Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
     Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent());
     Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps());
@@ -88,7 +87,6 @@ public class KafkaIOConfigTest
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n"
                      + "  \"skipOffsetGaps\": true\n"
@@ -109,8 +107,7 @@ public class KafkaIOConfigTest
     Assert.assertEquals("mytopic", config.getEndPartitions().getTopic());
     Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap());
     Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties());
-    Assert.assertEquals(false, config.isUseTransaction());
-    Assert.assertEquals(true, config.isPauseAfterRead());
+    Assert.assertFalse(config.isUseTransaction());
     Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get());
     Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get());
     Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps());
@@ -125,7 +122,6 @@ public class KafkaIOConfigTest
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -145,7 +141,6 @@ public class KafkaIOConfigTest
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -165,7 +160,6 @@ public class KafkaIOConfigTest
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -185,7 +179,6 @@ public class KafkaIOConfigTest
                      + "  \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n"
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -206,7 +199,6 @@ public class KafkaIOConfigTest
                      + "  \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -227,7 +219,6 @@ public class KafkaIOConfigTest
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
@@ -248,7 +239,6 @@ public class KafkaIOConfigTest
                      + "  \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n"
                      + "  \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n"
                      + "  \"useTransaction\": false,\n"
-                     + "  \"pauseAfterRead\": true,\n"
                      + "  \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n"
                      + "  \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n"
                      + "}";
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
index 2834cc8..b63563b 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java
@@ -27,17 +27,17 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import io.druid.java.util.http.client.HttpClient;
-import io.druid.java.util.http.client.Request;
-import io.druid.java.util.http.client.response.FullResponseHandler;
-import io.druid.java.util.http.client.response.FullResponseHolder;
-import io.druid.indexing.common.TaskInfoProvider;
 import io.druid.indexer.TaskLocation;
+import io.druid.indexing.common.TaskInfoProvider;
 import io.druid.indexing.common.TaskStatus;
 import io.druid.jackson.DefaultObjectMapper;
 import io.druid.java.util.common.DateTimes;
 import io.druid.java.util.common.IAE;
 import io.druid.java.util.common.StringUtils;
+import io.druid.java.util.http.client.HttpClient;
+import io.druid.java.util.http.client.Request;
+import io.druid.java.util.http.client.response.FullResponseHandler;
+import io.druid.java.util.http.client.response.FullResponseHolder;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMockSupport;
@@ -56,6 +56,7 @@ import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.net.URL;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -139,13 +140,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     Assert.assertEquals(false, client.stop(TEST_ID, true));
     Assert.assertEquals(false, client.resume(TEST_ID));
     Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
-    Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID, 10));
+    Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID));
     Assert.assertEquals(KafkaIndexTask.Status.NOT_STARTED, client.getStatus(TEST_ID));
     Assert.assertEquals(null, client.getStartTime(TEST_ID));
     Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true));
     Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID));
-    Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), false, true));
-    Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.<Integer, Long>of(), true, true));
+    Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true));
+    Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true));
 
     verifyAll();
   }
@@ -432,33 +433,6 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   }
 
   @Test
-  public void testPauseWithTimeout() throws Exception
-  {
-    Capture<Request> captured = Capture.newInstance();
-    expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
-    expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
-    expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
-        Futures.immediateFuture(responseHolder)
-    );
-    replayAll();
-
-    Map<Integer, Long> results = client.pause(TEST_ID, 101);
-    verifyAll();
-
-    Request request = captured.getValue();
-    Assert.assertEquals(HttpMethod.POST, request.getMethod());
-    Assert.assertEquals(
-        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/pause?timeout=101"),
-        request.getUrl()
-    );
-    Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
-
-    Assert.assertEquals(2, results.size());
-    Assert.assertEquals(1, (long) results.get(0));
-    Assert.assertEquals(10, (long) results.get(1));
-  }
-
-  @Test
   public void testPauseWithSubsequentGetOffsets() throws Exception
   {
     Capture<Request> captured = Capture.newInstance();
@@ -544,13 +518,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     );
     replayAll();
 
-    client.setEndOffsets(TEST_ID, endOffsets, false, true);
+    client.setEndOffsets(TEST_ID, endOffsets, true);
     verifyAll();
 
     Request request = captured.getValue();
     Assert.assertEquals(HttpMethod.POST, request.getMethod());
     Assert.assertEquals(
-        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"),
+        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"),
         request.getUrl()
     );
     Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@@ -569,13 +543,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
     );
     replayAll();
 
-    client.setEndOffsets(TEST_ID, endOffsets, true, true);
+    client.setEndOffsets(TEST_ID, endOffsets, true);
     verifyAll();
 
     Request request = captured.getValue();
     Assert.assertEquals(HttpMethod.POST, request.getMethod());
     Assert.assertEquals(
-        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"),
+        new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"),
         request.getUrl()
     );
     Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id"));
@@ -724,39 +698,6 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
   }
 
   @Test
-  public void testPauseAsyncWithTimeout() throws Exception
-  {
-    final int numRequests = TEST_IDS.size();
-    Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
-    expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
-    expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
-    expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn(
-        Futures.immediateFuture(responseHolder)
-    ).times(numRequests);
-    replayAll();
-
-    List<URL> expectedUrls = Lists.newArrayList();
-    List<ListenableFuture<Map<Integer, Long>>> futures = Lists.newArrayList();
-    for (String testId : TEST_IDS) {
-      expectedUrls.add(new URL(StringUtils.format(URL_FORMATTER, TEST_HOST, TEST_PORT, testId, "pause?timeout=9")));
-      futures.add(client.pauseAsync(testId, 9));
-    }
-
-    List<Map<Integer, Long>> responses = Futures.allAsList(futures).get();
-
-    verifyAll();
-    List<Request> requests = captured.getValues();
-
-    Assert.assertEquals(numRequests, requests.size());
-    Assert.assertEquals(numRequests, responses.size());
-    for (int i = 0; i < numRequests; i++) {
-      Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod());
-      Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl()));
-      Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i));
-    }
-  }
-
-  @Test
   public void testGetStatusAsync() throws Exception
   {
     final int numRequests = TEST_IDS.size();
@@ -909,9 +850,9 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
           TEST_HOST,
           TEST_PORT,
           testId,
-          StringUtils.format("offsets/end?resume=%s&finish=%s", false, true)
+          StringUtils.format("offsets/end?finish=%s", true)
       )));
-      futures.add(client.setEndOffsetsAsync(testId, endOffsets, false, true));
+      futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
     }
 
     List<Boolean> responses = Futures.allAsList(futures).get();
@@ -950,11 +891,11 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
                   TEST_HOST,
                   TEST_PORT,
                   testId,
-                  "offsets/end?resume=true&finish=true"
+                  "offsets/end?finish=true"
               )
           )
       );
-      futures.add(client.setEndOffsetsAsync(testId, endOffsets, true, true));
+      futures.add(client.setEndOffsetsAsync(testId, endOffsets, true));
     }
 
     List<Boolean> responses = Futures.allAsList(futures).get();
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
index 45f3003..4232b58 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -38,10 +38,6 @@ import com.google.common.io.Files;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import io.druid.java.util.emitter.EmittingLogger;
-import io.druid.java.util.emitter.core.NoopEmitter;
-import io.druid.java.util.emitter.service.ServiceEmitter;
-import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.client.cache.CacheConfig;
 import io.druid.client.cache.MapCache;
 import io.druid.data.input.impl.DimensionsSpec;
@@ -85,6 +81,10 @@ import io.druid.java.util.common.guava.Sequences;
 import io.druid.java.util.common.logger.Logger;
 import io.druid.java.util.common.parsers.JSONPathFieldSpec;
 import io.druid.java.util.common.parsers.JSONPathSpec;
+import io.druid.java.util.emitter.EmittingLogger;
+import io.druid.java.util.emitter.core.NoopEmitter;
+import io.druid.java.util.emitter.service.ServiceEmitter;
+import io.druid.java.util.metrics.MonitorScheduler;
 import io.druid.math.expr.ExprMacroTable;
 import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
 import io.druid.metadata.EntryExistsException;
@@ -350,7 +350,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -392,7 +391,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -468,7 +466,6 @@ public class KafkaIndexTaskTest
             endPartitions,
             consumerProps,
             true,
-            false,
             null,
             null,
             false
@@ -481,7 +478,7 @@ public class KafkaIndexTaskTest
     final Map<Integer, Long> currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets());
     Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap()
                                                                                                .equals(currentOffsets));
-    task.setEndOffsets(currentOffsets, true, false);
+    task.setEndOffsets(currentOffsets, false);
     Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     Assert.assertEquals(1, checkpointRequestsHash.size());
@@ -536,7 +533,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             DateTimes.of("2010"),
             null,
             false
@@ -590,7 +586,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             DateTimes.of("2010"),
             false
@@ -654,7 +649,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -714,7 +708,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 2L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -755,7 +748,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -807,7 +799,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -817,10 +808,7 @@ public class KafkaIndexTaskTest
     final ListenableFuture<TaskStatus> future = runTask(task);
 
     // Wait for task to exit
-    Assert.assertEquals(
-        isIncrementalHandoffSupported ? TaskState.SUCCESS : TaskState.FAILED,
-        future.get().getStatusCode()
-    );
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
 
     // Check metrics
     Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
@@ -861,7 +849,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 7L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -894,7 +881,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -908,7 +894,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -962,7 +947,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -976,7 +960,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1031,7 +1014,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             false,
-            false,
             null,
             null,
             false
@@ -1045,7 +1027,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
             kafkaServer.consumerProperties(),
             false,
-            false,
             null,
             null,
             false
@@ -1105,7 +1086,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1170,7 +1150,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1184,7 +1163,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(1, 1L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1240,7 +1218,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1277,7 +1254,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1329,7 +1305,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1354,7 +1329,7 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(KafkaIndexTask.Status.READING, task.getStatus());
 
     Map<Integer, Long> currentOffsets = objectMapper.readValue(
-        task.pause(0).getEntity().toString(),
+        task.pause().getEntity().toString(),
         new TypeReference<Map<Integer, Long>>()
         {
         }
@@ -1402,93 +1377,6 @@ public class KafkaIndexTaskTest
     Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2));
   }
 
-  @Test(timeout = 60_000L)
-  public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception
-  {
-    final KafkaIndexTask task = createTask(
-        null,
-        new KafkaIOConfig(
-            "sequence0",
-            new KafkaPartitions(topic, ImmutableMap.of(0, 1L)),
-            new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
-            kafkaServer.consumerProperties(),
-            true,
-            true,
-            null,
-            null,
-            false
-        )
-    );
-
-    final ListenableFuture<TaskStatus> future = runTask(task);
-
-    try (final KafkaProducer<byte[], byte[]> kafkaProducer = kafkaServer.newProducer()) {
-      for (ProducerRecord<byte[], byte[]> record : records) {
-        kafkaProducer.send(record).get();
-      }
-    }
-
-    while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
-      Thread.sleep(25);
-    }
-
-    // reached the end of the assigned offsets and paused instead of publishing
-    Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets());
-    Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
-    Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets());
-    Map<Integer, Long> newEndOffsets = ImmutableMap.of(0, 4L);
-    task.setEndOffsets(newEndOffsets, false, true);
-    Assert.assertEquals(newEndOffsets, task.getEndOffsets());
-    Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-    task.resume();
-
-    while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
-      Thread.sleep(25);
-    }
-
-    // reached the end of the updated offsets and paused
-    Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
-    Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
-    // try again but with resume flag == true
-    newEndOffsets = ImmutableMap.of(0, 7L);
-    task.setEndOffsets(newEndOffsets, true, true);
-    Assert.assertEquals(newEndOffsets, task.getEndOffsets());
-    Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
-    while (task.getStatus() != KafkaIndexTask.Status.PAUSED) {
-      Thread.sleep(25);
-    }
-
-    Assert.assertEquals(newEndOffsets, task.getCurrentOffsets());
-    Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
-
-    task.resume();
-
-    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
-
-    // Check metrics
-    Assert.assertEquals(4, task.getFireDepartmentMetrics().processed());
-    Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable());
-    Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway());
-
-    // Check published metadata
-    SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
-    SegmentDescriptor desc2 = SD(task, "2010/P1D", 0);
-    SegmentDescriptor desc3 = SD(task, "2011/P1D", 0);
-    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
-    Assert.assertEquals(
-        new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))),
-        metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
-    );
-
-    // Check segments in deep storage
-    Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1));
-    Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2));
-    Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3));
-  }
-
   @Test(timeout = 30_000L)
   public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception
   {
@@ -1500,7 +1388,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1513,7 +1400,7 @@ public class KafkaIndexTaskTest
       Thread.sleep(2000);
     }
 
-    task.pause(0);
+    task.pause();
 
     while (!task.getStatus().equals(KafkaIndexTask.Status.PAUSED)) {
       Thread.sleep(25);
@@ -1539,7 +1426,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 500L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
@@ -1593,7 +1479,6 @@ public class KafkaIndexTaskTest
             new KafkaPartitions(topic, ImmutableMap.of(0, 5L)),
             kafkaServer.consumerProperties(),
             true,
-            false,
             null,
             null,
             false
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c429265..60a47f0 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -254,7 +254,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
-    Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
     Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
     Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
     Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps());
@@ -1046,7 +1045,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         taskClient.setEndOffsetsAsync(
             EasyMock.contains("sequenceName-0"),
             EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
-            EasyMock.eq(true),
             EasyMock.eq(true)
         )
     ).andReturn(Futures.immediateFuture(true)).times(2);
@@ -1074,7 +1072,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
       KafkaIOConfig taskConfig = kafkaIndexTask.getIOConfig();
       Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
       Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
-      Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead());
 
       Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic());
       Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0));
@@ -1164,7 +1161,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
-    Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
 
     // check that the new task was created with starting offsets matching where the publishing task finished
     Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
@@ -1255,7 +1251,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
     Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey"));
     Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName());
     Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction());
-    Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead());
 
     // check that the new task was created with starting offsets matching where the publishing task finished
     Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic());
@@ -1570,7 +1565,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         taskClient.setEndOffsetsAsync(
             EasyMock.contains("sequenceName-0"),
             EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)),
-            EasyMock.eq(true),
             EasyMock.eq(true)
         )
     ).andReturn(Futures.<Boolean>immediateFailedFuture(new RuntimeException())).times(2);
@@ -1695,7 +1689,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
     expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes();
     expect(taskClient.pauseAsync("id2"))
         .andReturn(Futures.immediateFuture((Map<Integer, Long>) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L)));
-    expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true))
+    expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true))
         .andReturn(Futures.immediateFuture(true));
     taskQueue.shutdown("id3");
     expectLastCall().times(2);
@@ -2125,7 +2119,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
             endPartitions,
             ImmutableMap.<String, String>of(),
             true,
-            false,
             minimumMessageTime,
             maximumMessageTime,
             false
diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
index 8e18d72..aeb0578 100644
--- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
+++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java
@@ -179,6 +179,7 @@ public interface Task
    */
   TaskStatus run(TaskToolbox toolbox) throws Exception;
 
+  @Nullable
   Map<String, Object> getContext();
 
   @Nullable
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
index 72a7f8b..74a1011 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java
@@ -23,12 +23,13 @@ import com.google.common.collect.ImmutableList;
 import io.druid.timeline.DataSegment;
 
 import javax.annotation.Nullable;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 
 public class SegmentsAndMetadata
 {
-  private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(ImmutableList.<DataSegment>of(), null);
+  private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(Collections.emptyList(), null);
 
   private final Object commitMetadata;
   private final ImmutableList<DataSegment> segments;
diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 6d11092..33b823d 100644
--- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
@@ -221,7 +220,7 @@ public class StreamAppenderatorDriver extends BaseAppenderatorDriver
       throw e;
     }
     catch (Exception e) {
-      throw Throwables.propagate(e);
+      throw new RuntimeException(e);
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@druid.apache.org
For additional commands, e-mail: dev-help@druid.apache.org