You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fj...@apache.org on 2019/05/23 16:25:41 UTC

[incubator-druid] branch master updated: Remove LegacyKafkaIndexTaskRunner (#7735)

This is an automated email from the ASF dual-hosted git repository.

fjy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new eff2be4  Remove LegacyKafkaIndexTaskRunner (#7735)
eff2be4 is described below

commit eff2be4f8f9d7aad0f01516f5425f3ebccaa006c
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Thu May 23 09:25:35 2019 -0700

    Remove LegacyKafkaIndexTaskRunner (#7735)
---
 .../druid/indexing/kafka/KafkaIndexTask.java       |   32 +-
 .../indexing/kafka/LegacyKafkaIndexTaskRunner.java | 1239 --------------------
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   82 +-
 .../indexing/kinesis/KinesisIndexTaskTest.java     |    2 -
 .../supervisor/SeekableStreamSupervisor.java       |    2 -
 5 files changed, 19 insertions(+), 1338 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 314f99c..b0b0c6b 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -28,7 +28,6 @@ import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
 import org.apache.druid.indexing.common.task.TaskResource;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
 import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
-import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -129,28 +128,15 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<Integer, Long>
   @Override
   protected SeekableStreamIndexTaskRunner<Integer, Long> createTaskRunner()
   {
-    if (context != null && context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null
-        && ((boolean) context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) {
-      //noinspection unchecked
-      return new IncrementalPublishingKafkaIndexTaskRunner(
-          this,
-          dataSchema.getParser(),
-          authorizerMapper,
-          chatHandlerProvider,
-          savedParseExceptions,
-          rowIngestionMetersFactory
-      );
-    } else {
-      //noinspection unchecked
-      return new LegacyKafkaIndexTaskRunner(
-          this,
-          dataSchema.getParser(),
-          authorizerMapper,
-          chatHandlerProvider,
-          savedParseExceptions,
-          rowIngestionMetersFactory
-      );
-    }
+    //noinspection unchecked
+    return new IncrementalPublishingKafkaIndexTaskRunner(
+        this,
+        dataSchema.getParser(),
+        authorizerMapper,
+        chatHandlerProvider,
+        savedParseExceptions,
+        rowIngestionMetersFactory
+    );
   }
 
   @Override
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
deleted file mode 100644
index 5b186c4..0000000
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java
+++ /dev/null
@@ -1,1239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.kafka;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
-import org.apache.druid.data.input.Committer;
-import org.apache.druid.data.input.InputRow;
-import org.apache.druid.data.input.impl.InputRowParser;
-import org.apache.druid.discovery.DiscoveryDruidNode;
-import org.apache.druid.discovery.LookupNodeService;
-import org.apache.druid.discovery.NodeType;
-import org.apache.druid.indexer.IngestionState;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
-import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
-import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
-import org.apache.druid.indexing.common.TaskReport;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction;
-import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction;
-import org.apache.druid.indexing.common.stats.RowIngestionMeters;
-import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
-import org.apache.druid.indexing.common.task.IndexTaskUtils;
-import org.apache.druid.indexing.common.task.RealtimeIndexTask;
-import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
-import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
-import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
-import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
-import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
-import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
-import org.apache.druid.indexing.seekablestream.SequenceMetadata;
-import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
-import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
-import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
-import org.apache.druid.indexing.seekablestream.common.StreamPartition;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.collect.Utils;
-import org.apache.druid.java.util.common.parsers.ParseException;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.segment.indexing.RealtimeIOConfig;
-import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.apache.druid.segment.realtime.appenderator.Appenderator;
-import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
-import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
-import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata;
-import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
-import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
-import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
-import org.apache.druid.server.security.Access;
-import org.apache.druid.server.security.Action;
-import org.apache.druid.server.security.AuthorizerMapper;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.utils.CircularBuffer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
-import org.apache.kafka.common.TopicPartition;
-import org.joda.time.DateTime;
-
-import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-/**
- * Kafka index task runner which doesn't support incremental segment publishing. We keep this to support rolling update.
- * This class will be removed in a future release.
- */
-@Deprecated
-public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner<Integer, Long>
-{
-  private static final EmittingLogger log = new EmittingLogger(LegacyKafkaIndexTaskRunner.class);
-  private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
-
-  private final ConcurrentMap<Integer, Long> endOffsets = new ConcurrentHashMap<>();
-  private final ConcurrentMap<Integer, Long> nextOffsets = new ConcurrentHashMap<>();
-
-  // The pause lock and associated conditions are to support coordination between the Jetty threads and the main
-  // ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully
-  // the ingestion loop has been stopped at the returned offsets and will not ingest any more data until resumed. The
-  // fields are used as follows (every step requires acquiring [pauseLock]):
-  //   Pausing:
-  //   - In pause(), [pauseRequested] is set to true and then execution waits for [status] to change to PAUSED, with the
-  //     condition checked when [hasPaused] is signalled.
-  //   - In possiblyPause() called from the main loop, if [pauseRequested] is true, [status] is set to PAUSED,
-  //     [hasPaused] is signalled, and execution pauses until [pauseRequested] becomes false, either by being set or by
-  //     the [pauseMillis] timeout elapsing. [pauseRequested] is checked when [shouldResume] is signalled.
-  //   Resuming:
-  //   - In resume(), [pauseRequested] is set to false, [shouldResume] is signalled, and execution waits for [status] to
-  //     change to something other than PAUSED, with the condition checked when [shouldResume] is signalled.
-  //   - In possiblyPause(), when [shouldResume] is signalled, if [pauseRequested] has become false the pause loop ends,
-  //     [status] is changed to STARTING and [shouldResume] is signalled.
-
-  private final Lock pauseLock = new ReentrantLock();
-  private final Condition hasPaused = pauseLock.newCondition();
-  private final Condition shouldResume = pauseLock.newCondition();
-
-  private final AtomicBoolean stopRequested = new AtomicBoolean(false);
-  private final AtomicBoolean publishOnStop = new AtomicBoolean(false);
-
-  // [statusLock] is used to synchronize the Jetty thread calling stopGracefully() with the main run thread. It prevents
-  // the main run thread from switching into a publishing state while the stopGracefully() thread thinks it's still in
-  // a pre-publishing state. This is important because stopGracefully() will try to use the [stopRequested] flag to stop
-  // the main thread where possible, but this flag is not honored once publishing has begun so in this case we must
-  // interrupt the thread. The lock ensures that if the run thread is about to transition into publishing state, it
-  // blocks until after stopGracefully() has set [stopRequested] and then does a final check on [stopRequested] before
-  // transitioning to publishing state.
-  private final Object statusLock = new Object();
-
-  private final Lock pollRetryLock = new ReentrantLock();
-  private final Condition isAwaitingRetry = pollRetryLock.newCondition();
-
-  private final KafkaIndexTask task;
-  private final KafkaIndexTaskIOConfig ioConfig;
-  private final KafkaIndexTaskTuningConfig tuningConfig;
-  private final InputRowParser<ByteBuffer> parser;
-  private final AuthorizerMapper authorizerMapper;
-  private final Optional<ChatHandlerProvider> chatHandlerProvider;
-  private final CircularBuffer<Throwable> savedParseExceptions;
-  private final RowIngestionMeters rowIngestionMeters;
-
-  private volatile DateTime startTime;
-  private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread)
-  private volatile ObjectMapper objectMapper;
-  private volatile Thread runThread;
-  private volatile Appenderator appenderator;
-  private volatile StreamAppenderatorDriver driver;
-  private volatile FireDepartmentMetrics fireDepartmentMetrics;
-  private volatile IngestionState ingestionState;
-
-  private volatile boolean pauseRequested;
-
-  LegacyKafkaIndexTaskRunner(
-      KafkaIndexTask task,
-      InputRowParser<ByteBuffer> parser,
-      AuthorizerMapper authorizerMapper,
-      Optional<ChatHandlerProvider> chatHandlerProvider,
-      CircularBuffer<Throwable> savedParseExceptions,
-      RowIngestionMetersFactory rowIngestionMetersFactory
-  )
-  {
-    super(
-        task,
-        parser,
-        authorizerMapper,
-        chatHandlerProvider,
-        savedParseExceptions,
-        rowIngestionMetersFactory
-    );
-    this.task = task;
-    this.ioConfig = task.getIOConfig();
-    this.tuningConfig = task.getTuningConfig();
-    this.parser = parser;
-    this.authorizerMapper = authorizerMapper;
-    this.chatHandlerProvider = chatHandlerProvider;
-    this.savedParseExceptions = savedParseExceptions;
-    this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters();
-
-    this.endOffsets.putAll(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
-    this.ingestionState = IngestionState.NOT_STARTED;
-  }
-
-  @Override
-  public TaskStatus run(TaskToolbox toolbox)
-  {
-    try {
-      return runInternal(toolbox);
-    }
-    catch (Exception e) {
-      log.error(e, "Encountered exception while running task.");
-      final String errorMsg = Throwables.getStackTraceAsString(e);
-      toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(errorMsg));
-      return TaskStatus.failure(
-          task.getId(),
-          errorMsg
-      );
-    }
-  }
-
-  @Override
-  public Appenderator getAppenderator()
-  {
-    return appenderator;
-  }
-
-  @Override
-  public RowIngestionMeters getRowIngestionMeters()
-  {
-    return rowIngestionMeters;
-  }
-
-  private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
-  {
-    log.info("Starting up!");
-    startTime = DateTimes.nowUtc();
-    status = Status.STARTING;
-    objectMapper = toolbox.getObjectMapper();
-
-    if (chatHandlerProvider.isPresent()) {
-      log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
-      chatHandlerProvider.get().register(task.getId(), this, false);
-    } else {
-      log.warn("No chat handler detected");
-    }
-
-    runThread = Thread.currentThread();
-
-    // Set up FireDepartmentMetrics
-    final FireDepartment fireDepartmentForMetrics = new FireDepartment(
-        task.getDataSchema(),
-        new RealtimeIOConfig(null, null, null),
-        null
-    );
-    fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
-    toolbox.getMonitorScheduler()
-           .addMonitor(TaskRealtimeMetricsMonitorBuilder.build(task, fireDepartmentForMetrics, rowIngestionMeters));
-
-    final String lookupTier = task.getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER);
-    LookupNodeService lookupNodeService = lookupTier == null ?
-                                          toolbox.getLookupNodeService() :
-                                          new LookupNodeService(lookupTier);
-    DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode(
-        toolbox.getDruidNode(),
-        NodeType.PEON,
-        ImmutableMap.of(
-            toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(),
-            lookupNodeService.getName(), lookupNodeService
-        )
-    );
-
-    ingestionState = IngestionState.BUILD_SEGMENTS;
-
-    try (
-        final Appenderator appenderator0 = task.newAppenderator(fireDepartmentMetrics, toolbox);
-        final StreamAppenderatorDriver driver = task.newDriver(appenderator0, toolbox, fireDepartmentMetrics);
-        final KafkaConsumer<byte[], byte[]> consumer = task.newConsumer()
-    ) {
-      toolbox.getDataSegmentServerAnnouncer().announce();
-      toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode);
-
-      appenderator = appenderator0;
-
-      final String topic = ioConfig.getStartSequenceNumbers().getStream();
-
-      // Start up, set up initial offsets.
-      final Object restoredMetadata = driver.startJob();
-      if (restoredMetadata == null) {
-        nextOffsets.putAll(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
-      } else {
-        final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
-        final SeekableStreamEndSequenceNumbers<Integer, Long> restoredNextPartitions = toolbox.getObjectMapper().convertValue(
-            restoredMetadataMap.get(METADATA_NEXT_PARTITIONS),
-            toolbox.getObjectMapper().getTypeFactory().constructParametrizedType(
-                SeekableStreamStartSequenceNumbers.class,
-                SeekableStreamStartSequenceNumbers.class,
-                Integer.class,
-                Long.class
-            )
-        );
-        nextOffsets.putAll(restoredNextPartitions.getPartitionSequenceNumberMap());
-
-        // Sanity checks.
-        if (!restoredNextPartitions.getStream().equals(ioConfig.getStartSequenceNumbers().getStream())) {
-          throw new ISE(
-              "WTF?! Restored topic[%s] but expected topic[%s]",
-              restoredNextPartitions.getStream(),
-              ioConfig.getStartSequenceNumbers().getStream()
-          );
-        }
-
-        if (!nextOffsets.keySet().equals(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet())) {
-          throw new ISE(
-              "WTF?! Restored partitions[%s] but expected partitions[%s]",
-              nextOffsets.keySet(),
-              ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet()
-          );
-        }
-      }
-
-      // Set up sequenceNames.
-      final Map<Integer, String> sequenceNames = new HashMap<>();
-      for (Integer partitionNum : nextOffsets.keySet()) {
-        sequenceNames.put(partitionNum, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), partitionNum));
-      }
-
-      // Set up committer.
-      final Supplier<Committer> committerSupplier = new Supplier<Committer>()
-      {
-        @Override
-        public Committer get()
-        {
-          final Map<Integer, Long> snapshot = ImmutableMap.copyOf(nextOffsets);
-
-          return new Committer()
-          {
-            @Override
-            public Object getMetadata()
-            {
-              return ImmutableMap.of(
-                  METADATA_NEXT_PARTITIONS,
-                  new SeekableStreamEndSequenceNumbers<>(
-                      ioConfig.getStartSequenceNumbers().getStream(),
-                      snapshot
-                  )
-              );
-            }
-
-            @Override
-            public void run()
-            {
-              // Do nothing.
-            }
-          };
-        }
-      };
-
-      Set<Integer> assignment = assignPartitionsAndSeekToNext(consumer, topic);
-
-      // Main loop.
-      // Could eventually support leader/follower mode (for keeping replicas more in sync)
-      boolean stillReading = !assignment.isEmpty();
-      status = Status.READING;
-      try {
-        while (stillReading) {
-          if (possiblyPause()) {
-            // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
-            // partitions upon resuming. This is safe even if the end offsets have not been modified.
-            assignment = assignPartitionsAndSeekToNext(consumer, topic);
-
-            if (assignment.isEmpty()) {
-              log.info("All partitions have been fully read");
-              publishOnStop.set(true);
-              stopRequested.set(true);
-            }
-          }
-
-          if (stopRequested.get()) {
-            break;
-          }
-
-          // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
-          // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
-          // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
-          ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
-          try {
-            records = consumer.poll(task.getIOConfig().getPollTimeout());
-          }
-          catch (OffsetOutOfRangeException e) {
-            log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
-            possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
-            stillReading = !assignment.isEmpty();
-          }
-
-          for (ConsumerRecord<byte[], byte[]> record : records) {
-            if (log.isTraceEnabled()) {
-              log.trace(
-                  "Got topic[%s] partition[%d] offset[%,d].",
-                  record.topic(),
-                  record.partition(),
-                  record.offset()
-              );
-            }
-
-            if (record.offset() < endOffsets.get(record.partition())) {
-
-              try {
-                final byte[] valueBytes = record.value();
-                final List<InputRow> rows = valueBytes == null
-                                            ? Utils.nullableListOf((InputRow) null)
-                                            : parser.parseBatch(ByteBuffer.wrap(valueBytes));
-                boolean isPersistRequired = false;
-                final Map<String, Set<SegmentIdWithShardSpec>> segmentsToMoveOut = new HashMap<>();
-
-                for (InputRow row : rows) {
-                  if (row != null && task.withinMinMaxRecordTime(row)) {
-                    final String sequenceName = sequenceNames.get(record.partition());
-                    final AppenderatorDriverAddResult addResult = driver.add(
-                        row,
-                        sequenceName,
-                        committerSupplier,
-                        false,
-                        false
-                    );
-
-                    if (addResult.isOk()) {
-                      // If the number of rows in the segment exceeds the threshold after adding a row,
-                      // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
-                      if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
-                        segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>())
-                                         .add(addResult.getSegmentIdentifier());
-                      }
-                      isPersistRequired |= addResult.isPersistRequired();
-                    } else {
-                      // Failure to allocate segment puts determinism at risk, bail out to be safe.
-                      // May want configurable behavior here at some point.
-                      // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
-                      throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
-                    }
-
-                    if (addResult.getParseException() != null) {
-                      handleParseException(addResult.getParseException(), record);
-                    } else {
-                      rowIngestionMeters.incrementProcessed();
-                    }
-                  } else {
-                    rowIngestionMeters.incrementThrownAway();
-                  }
-                }
-
-                if (isPersistRequired) {
-                  driver.persist(committerSupplier.get());
-                }
-                segmentsToMoveOut.forEach((String sequence, Set<SegmentIdWithShardSpec> segments) -> {
-                  driver.moveSegmentOut(sequence, new ArrayList<>(segments));
-                });
-              }
-              catch (ParseException e) {
-                handleParseException(e, record);
-              }
-
-              nextOffsets.put(record.partition(), record.offset() + 1);
-            }
-
-            if (nextOffsets.get(record.partition()) >= (endOffsets.get(record.partition()))
-                && assignment.remove(record.partition())) {
-              log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
-              KafkaIndexTask.assignPartitions(consumer, topic, assignment);
-              stillReading = !assignment.isEmpty();
-            }
-          }
-        }
-        ingestionState = IngestionState.COMPLETED;
-      }
-      catch (Exception e) {
-        log.error(e, "Encountered exception in runLegacy() before persisting.");
-        throw e;
-      }
-      finally {
-        driver.persist(committerSupplier.get()); // persist pending data
-      }
-
-      synchronized (statusLock) {
-        if (stopRequested.get() && !publishOnStop.get()) {
-          throw new InterruptedException("Stopping without publishing");
-        }
-
-        status = Status.PUBLISHING;
-      }
-
-      final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> {
-        final SeekableStreamEndSequenceNumbers<Integer, Long> finalPartitions = toolbox.getObjectMapper().convertValue(
-            ((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_NEXT_PARTITIONS),
-            toolbox.getObjectMapper()
-                   .getTypeFactory()
-                   .constructParametrizedType(
-                       SeekableStreamEndSequenceNumbers.class,
-                       SeekableStreamEndSequenceNumbers.class,
-                       Integer.class,
-                       Long.class
-                   )
-        );
-
-        // Sanity check, we should only be publishing things that match our desired end state.
-        if (!endOffsets.equals(finalPartitions.getPartitionSequenceNumberMap())) {
-          throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
-        }
-
-        final SegmentTransactionalInsertAction action;
-
-        if (ioConfig.isUseTransaction()) {
-          action = new SegmentTransactionalInsertAction(
-              segments,
-              new KafkaDataSourceMetadata(ioConfig.getStartSequenceNumbers()),
-              new KafkaDataSourceMetadata(finalPartitions)
-          );
-        } else {
-          action = new SegmentTransactionalInsertAction(segments, null, null);
-        }
-
-        log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());
-
-        return toolbox.getTaskActionClient().submit(action);
-      };
-
-      // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting
-      // for hand off. See KafkaSupervisorIOConfig.completionTimeout.
-      final SegmentsAndMetadata published = driver.publish(
-          publisher,
-          committerSupplier.get(),
-          sequenceNames.values()
-      ).get();
-
-      List<?> publishedSegmentIds = Lists.transform(published.getSegments(), DataSegment::getId);
-      log.info(
-          "Published segments %s with metadata[%s].",
-          publishedSegmentIds,
-          Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata")
-      );
-
-      final Future<SegmentsAndMetadata> handoffFuture = driver.registerHandoff(published);
-      SegmentsAndMetadata handedOff = null;
-      if (tuningConfig.getHandoffConditionTimeout() == 0) {
-        handedOff = handoffFuture.get();
-      } else {
-        try {
-          handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS);
-        }
-        catch (TimeoutException e) {
-          log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout())
-             .addData("TaskId", task.getId())
-             .emit();
-        }
-      }
-
-      if (handedOff == null) {
-        log.warn("Failed to handoff segments %s", publishedSegmentIds);
-      } else {
-        log.info(
-            "Handoff completed for segments %s with metadata[%s]",
-            Lists.transform(handedOff.getSegments(), DataSegment::getId),
-            Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata")
-        );
-      }
-    }
-    catch (InterruptedException | RejectedExecutionException e) {
-      // handle the InterruptedException that gets wrapped in a RejectedExecutionException
-      if (e instanceof RejectedExecutionException
-          && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) {
-        throw e;
-      }
-
-      // if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow
-      if (!stopRequested.get()) {
-        Thread.currentThread().interrupt();
-        throw e;
-      }
-
-      log.info("The task was asked to stop before completing");
-    }
-    finally {
-      if (chatHandlerProvider.isPresent()) {
-        chatHandlerProvider.get().unregister(task.getId());
-      }
-
-      toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode);
-      toolbox.getDataSegmentServerAnnouncer().unannounce();
-    }
-
-    toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null));
-    return TaskStatus.success(
-        task.getId(),
-        null
-    );
-  }
-
-  @Override
-  protected boolean isEndOfShard(Long seqNum)
-  {
-    return false;
-  }
-
-  @Override
-  public TypeReference<List<SequenceMetadata<Integer, Long>>> getSequenceMetadataTypeReference()
-  {
-    return new TypeReference<List<SequenceMetadata<Integer, Long>>>()
-    {
-    };
-  }
-
-  @Nonnull
-  @Override
-  protected List<OrderedPartitionableRecord<Integer, Long>> getRecords(
-      RecordSupplier<Integer, Long> recordSupplier, TaskToolbox toolbox
-  )
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  private Set<Integer> assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic)
-  {
-    // Initialize consumer assignment.
-    final Set<Integer> assignment = new HashSet<>();
-    for (Map.Entry<Integer, Long> entry : nextOffsets.entrySet()) {
-      final long endOffset = endOffsets.get(entry.getKey());
-      if (entry.getValue() < endOffset) {
-        assignment.add(entry.getKey());
-      } else if (entry.getValue() == endOffset) {
-        log.info("Finished reading partition[%d].", entry.getKey());
-      } else {
-        throw new ISE(
-            "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
-            entry.getValue(),
-            endOffset
-        );
-      }
-    }
-
-    KafkaIndexTask.assignPartitions(consumer, topic, assignment);
-
-    // Seek to starting offsets.
-    for (final int partition : assignment) {
-      final long offset = nextOffsets.get(partition);
-      log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
-      consumer.seek(new TopicPartition(topic, partition), offset);
-    }
-
-    return assignment;
-  }
-
-  /**
-   * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared.
-   * <p/>
-   * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted.
-   * <p/>
-   *
-   * @return true if a pause request was handled, false otherwise
-   */
-  private boolean possiblyPause() throws InterruptedException
-  {
-    pauseLock.lockInterruptibly();
-    try {
-      if (pauseRequested) {
-        status = Status.PAUSED;
-        hasPaused.signalAll();
-
-        while (pauseRequested) {
-          log.info("Pausing ingestion until resumed");
-          shouldResume.await();
-        }
-
-        status = Status.READING;
-        shouldResume.signalAll();
-        log.info("Ingestion loop resumed");
-        return true;
-      }
-    }
-    finally {
-      pauseLock.unlock();
-    }
-
-    return false;
-  }
-
-  @Override
-  protected void possiblyResetDataSourceMetadata(
-      TaskToolbox toolbox,
-      RecordSupplier<Integer, Long> recordSupplier,
-      Set<StreamPartition<Integer>> assignment
-  )
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected boolean isEndOffsetExclusive()
-  {
-    return true;
-  }
-
-  @Override
-  protected SeekableStreamEndSequenceNumbers<Integer, Long> deserializePartitionsFromMetadata(
-      ObjectMapper mapper,
-      Object object
-  )
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  private void possiblyResetOffsetsOrWait(
-      Map<TopicPartition, Long> outOfRangePartitions,
-      KafkaConsumer<byte[], byte[]> consumer,
-      TaskToolbox taskToolbox
-  ) throws InterruptedException, IOException
-  {
-    final Map<TopicPartition, Long> resetPartitions = new HashMap<>();
-    boolean doReset = false;
-    if (tuningConfig.isResetOffsetAutomatically()) {
-      for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
-        final TopicPartition topicPartition = outOfRangePartition.getKey();
-        final long nextOffset = outOfRangePartition.getValue();
-        // seek to the beginning to get the least available offset
-        consumer.seekToBeginning(Collections.singletonList(topicPartition));
-        final long leastAvailableOffset = consumer.position(topicPartition);
-        // reset the seek
-        consumer.seek(topicPartition, nextOffset);
-        // Reset consumer offset if resetOffsetAutomatically is set to true
-        // and the current message offset in the kafka partition is more than the
-        // next message offset that we are trying to fetch
-        if (leastAvailableOffset > nextOffset) {
-          doReset = true;
-          resetPartitions.put(topicPartition, nextOffset);
-        }
-      }
-    }
-
-    if (doReset) {
-      sendResetRequestAndWaitLegacy(resetPartitions, taskToolbox);
-    } else {
-      log.warn("Retrying in %dms", task.getPollRetryMs());
-      pollRetryLock.lockInterruptibly();
-      try {
-        long nanos = TimeUnit.MILLISECONDS.toNanos(task.getPollRetryMs());
-        while (nanos > 0L && !pauseRequested && !stopRequested.get()) {
-          nanos = isAwaitingRetry.awaitNanos(nanos);
-        }
-      }
-      finally {
-        pollRetryLock.unlock();
-      }
-    }
-  }
-
-  private void sendResetRequestAndWaitLegacy(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox)
-      throws IOException
-  {
-    Map<Integer, Long> partitionOffsetMap = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
-      partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
-    }
-    boolean result = taskToolbox.getTaskActionClient()
-                                .submit(new ResetDataSourceMetadataAction(
-                                    task.getDataSource(),
-                                    new KafkaDataSourceMetadata(
-                                        new SeekableStreamStartSequenceNumbers<>(
-                                            ioConfig.getStartSequenceNumbers().getStream(),
-                                            partitionOffsetMap,
-                                            Collections.emptySet()
-                                        )
-                                    )
-                                ));
-
-    if (result) {
-      log.makeAlert("Resetting Kafka offsets for datasource [%s]", task.getDataSource())
-         .addData("partitions", partitionOffsetMap.keySet())
-         .emit();
-      // wait for being killed by supervisor
-      requestPause();
-    } else {
-      log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
-    }
-  }
-
-  private void requestPause()
-  {
-    pauseRequested = true;
-  }
-
-  @Override
-  protected Long getNextStartOffset(Long sequenceNumber)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  private void handleParseException(ParseException pe, ConsumerRecord<byte[], byte[]> record)
-  {
-    if (pe.isFromPartiallyValidRow()) {
-      rowIngestionMeters.incrementProcessedWithError();
-    } else {
-      rowIngestionMeters.incrementUnparseable();
-    }
-
-    if (tuningConfig.isLogParseExceptions()) {
-      log.error(
-          pe,
-          "Encountered parse exception on row from partition[%d] offset[%d]",
-          record.partition(),
-          record.offset()
-      );
-    }
-
-    if (savedParseExceptions != null) {
-      savedParseExceptions.add(pe);
-    }
-
-    if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError()
-        > tuningConfig.getMaxParseExceptions()) {
-      log.error("Max parse exceptions exceeded, terminating task...");
-      throw new RuntimeException("Max parse exceptions exceeded, terminating task...");
-    }
-  }
-
-  private Map<String, TaskReport> getTaskCompletionReports(@Nullable String errorMsg)
-  {
-    return TaskReport.buildTaskReports(
-        new IngestionStatsAndErrorsTaskReport(
-            task.getId(),
-            new IngestionStatsAndErrorsTaskReportData(
-                ingestionState,
-                getTaskCompletionUnparseableEvents(),
-                getTaskCompletionRowStats(),
-                errorMsg
-            )
-        )
-    );
-  }
-
-  private Map<String, Object> getTaskCompletionUnparseableEvents()
-  {
-    Map<String, Object> unparseableEventsMap = new HashMap<>();
-    List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(
-        savedParseExceptions
-    );
-    if (buildSegmentsParseExceptionMessages != null) {
-      unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages);
-    }
-    return unparseableEventsMap;
-  }
-
-  private Map<String, Object> getTaskCompletionRowStats()
-  {
-    Map<String, Object> metrics = new HashMap<>();
-    metrics.put(
-        RowIngestionMeters.BUILD_SEGMENTS,
-        rowIngestionMeters.getTotals()
-    );
-    return metrics;
-  }
-
-  @Override
-  public void stopGracefully()
-  {
-    log.info("Stopping gracefully (status: [%s])", status);
-    stopRequested.set(true);
-
-    synchronized (statusLock) {
-      if (status == Status.PUBLISHING) {
-        runThread.interrupt();
-        return;
-      }
-    }
-
-    try {
-      if (pauseLock.tryLock(SeekableStreamIndexTask.LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
-        try {
-          if (pauseRequested) {
-            pauseRequested = false;
-            shouldResume.signalAll();
-          }
-        }
-        finally {
-          pauseLock.unlock();
-        }
-      } else {
-        log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread");
-        runThread.interrupt();
-        return;
-      }
-
-      if (pollRetryLock.tryLock(SeekableStreamIndexTask.LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
-        try {
-          isAwaitingRetry.signalAll();
-        }
-        finally {
-          pollRetryLock.unlock();
-        }
-      } else {
-        log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread");
-        runThread.interrupt();
-      }
-    }
-    catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Authorizes action to be performed on this task's datasource
-   *
-   * @return authorization result
-   */
-  private Access authorizationCheck(final HttpServletRequest req, Action action)
-  {
-    return IndexTaskUtils.datasourceAuthorizationCheck(req, action, task.getDataSource(), authorizerMapper);
-  }
-
-  @Override
-  @POST
-  @Path("/stop")
-  public Response stop(@Context final HttpServletRequest req)
-  {
-    authorizationCheck(req, Action.WRITE);
-    stopGracefully();
-    return Response.status(Response.Status.OK).build();
-  }
-
-  @Override
-  @GET
-  @Path("/status")
-  @Produces(MediaType.APPLICATION_JSON)
-  public Status getStatusHTTP(@Context final HttpServletRequest req)
-  {
-    authorizationCheck(req, Action.READ);
-    return status;
-  }
-
-  @Override
-  public Status getStatus()
-  {
-    return status;
-  }
-
-  @Override
-  @GET
-  @Path("/offsets/current")
-  @Produces(MediaType.APPLICATION_JSON)
-  public Map<Integer, Long> getCurrentOffsets(@Context final HttpServletRequest req)
-  {
-    authorizationCheck(req, Action.READ);
-    return getCurrentOffsets();
-  }
-
-  @Override
-  public ConcurrentMap<Integer, Long> getCurrentOffsets()
-  {
-    return nextOffsets;
-  }
-
-  @Override
-  @GET
-  @Path("/offsets/end")
-  @Produces(MediaType.APPLICATION_JSON)
-  public Map<Integer, Long> getEndOffsetsHTTP(@Context final HttpServletRequest req)
-  {
-    authorizationCheck(req, Action.READ);
-    return getEndOffsets();
-  }
-
-  @Override
-  public Map<Integer, Long> getEndOffsets()
-  {
-    return endOffsets;
-  }
-
-  @Override
-  public Response setEndOffsets(Map<Integer, Long> sequenceNumbers, boolean finish) throws InterruptedException
-  {
-    // finish is not used in this mode
-    return setEndOffsets(sequenceNumbers);
-  }
-
-  @POST
-  @Path("/offsets/end")
-  @Consumes(MediaType.APPLICATION_JSON)
-  @Produces(MediaType.APPLICATION_JSON)
-  public Response setEndOffsetsHTTP(
-      Map<Integer, Long> offsets,
-      @Context final HttpServletRequest req
-  ) throws InterruptedException
-  {
-    authorizationCheck(req, Action.WRITE);
-    return setEndOffsets(offsets);
-  }
-
-  @Override
-  @GET
-  @Path("/rowStats")
-  @Produces(MediaType.APPLICATION_JSON)
-  public Response getRowStats(
-      @Context final HttpServletRequest req
-  )
-  {
-    authorizationCheck(req, Action.READ);
-    Map<String, Object> returnMap = new HashMap<>();
-    Map<String, Object> totalsMap = new HashMap<>();
-    Map<String, Object> averagesMap = new HashMap<>();
-
-    totalsMap.put(
-        RowIngestionMeters.BUILD_SEGMENTS,
-        rowIngestionMeters.getTotals()
-    );
-    averagesMap.put(
-        RowIngestionMeters.BUILD_SEGMENTS,
-        rowIngestionMeters.getMovingAverages()
-    );
-
-    returnMap.put("movingAverages", averagesMap);
-    returnMap.put("totals", totalsMap);
-    return Response.ok(returnMap).build();
-  }
-
-  @Override
-  @GET
-  @Path("/unparseableEvents")
-  @Produces(MediaType.APPLICATION_JSON)
-  public Response getUnparseableEvents(
-      @Context final HttpServletRequest req
-  )
-  {
-    authorizationCheck(req, Action.READ);
-    List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
-    return Response.ok(events).build();
-  }
-
-  public Response setEndOffsets(
-      Map<Integer, Long> offsets
-  ) throws InterruptedException
-  {
-    if (offsets == null) {
-      return Response.status(Response.Status.BAD_REQUEST)
-                     .entity("Request body must contain a map of { partition:endOffset }")
-                     .build();
-    } else if (!endOffsets.keySet().containsAll(offsets.keySet())) {
-      return Response.status(Response.Status.BAD_REQUEST)
-                     .entity(
-                         StringUtils.format(
-                             "Request contains partitions not being handled by this task, my partitions: %s",
-                             endOffsets.keySet()
-                         )
-                     )
-                     .build();
-    }
-
-    pauseLock.lockInterruptibly();
-    try {
-      if (!isPaused()) {
-        return Response.status(Response.Status.BAD_REQUEST)
-                       .entity("Task must be paused before changing the end offsets")
-                       .build();
-      }
-
-      for (Map.Entry<Integer, Long> entry : offsets.entrySet()) {
-        if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) {
-          return Response.status(Response.Status.BAD_REQUEST)
-                         .entity(
-                             StringUtils.format(
-                                 "End offset must be >= current offset for partition [%s] (current: %s)",
-                                 entry.getKey(),
-                                 nextOffsets.get(entry.getKey())
-                             )
-                         )
-                         .build();
-        }
-      }
-
-      endOffsets.putAll(offsets);
-      log.info("endOffsets changed to %s", endOffsets);
-    }
-    finally {
-      pauseLock.unlock();
-    }
-
-    resume();
-
-    return Response.ok(endOffsets).build();
-  }
-
-  private boolean isPaused()
-  {
-    return status == Status.PAUSED;
-  }
-
-  /**
-   * Signals the ingestion loop to pause.
-   *
-   * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the
-   * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets
-   * in the response body if the task successfully paused
-   */
-  @Override
-  @POST
-  @Path("/pause")
-  @Produces(MediaType.APPLICATION_JSON)
-  public Response pauseHTTP(
-      @Context final HttpServletRequest req
-  ) throws InterruptedException
-  {
-    authorizationCheck(req, Action.WRITE);
-    return pause();
-  }
-
-  @Override
-  public Response pause() throws InterruptedException
-  {
-    if (!(status == Status.PAUSED || status == Status.READING)) {
-      return Response.status(Response.Status.BAD_REQUEST)
-                     .entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", status))
-                     .build();
-    }
-
-    pauseLock.lockInterruptibly();
-    try {
-      pauseRequested = true;
-
-      pollRetryLock.lockInterruptibly();
-      try {
-        isAwaitingRetry.signalAll();
-      }
-      finally {
-        pollRetryLock.unlock();
-      }
-
-      if (isPaused()) {
-        shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis
-      }
-
-      long nanos = TimeUnit.SECONDS.toNanos(2);
-      while (!isPaused()) {
-        if (nanos <= 0L) {
-          return Response.status(Response.Status.ACCEPTED)
-                         .entity("Request accepted but task has not yet paused")
-                         .build();
-        }
-        nanos = hasPaused.awaitNanos(nanos);
-      }
-    }
-    finally {
-      pauseLock.unlock();
-    }
-
-    try {
-      return Response.ok().entity(objectMapper.writeValueAsString(getCurrentOffsets())).build();
-    }
-    catch (JsonProcessingException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  @Override
-  @POST
-  @Path("/resume")
-  public Response resumeHTTP(@Context final HttpServletRequest req) throws InterruptedException
-  {
-    authorizationCheck(req, Action.WRITE);
-    resume();
-    return Response.status(Response.Status.OK).build();
-  }
-
-  @Override
-  public void resume() throws InterruptedException
-  {
-    pauseLock.lockInterruptibly();
-    try {
-      pauseRequested = false;
-      shouldResume.signalAll();
-
-      long nanos = TimeUnit.SECONDS.toNanos(5);
-      while (isPaused()) {
-        if (nanos <= 0L) {
-          throw new RuntimeException("Resume command was not accepted within 5 seconds");
-        }
-        nanos = shouldResume.awaitNanos(nanos);
-      }
-    }
-    finally {
-      pauseLock.unlock();
-    }
-  }
-
-  @Override
-  protected SeekableStreamDataSourceMetadata<Integer, Long> createDataSourceMetadata(
-      SeekableStreamSequenceNumbers<Integer, Long> partitions
-  )
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected OrderedSequenceNumber<Long> createSequenceNumber(Long sequenceNumber)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  @GET
-  @Path("/time/start")
-  @Produces(MediaType.APPLICATION_JSON)
-  public DateTime getStartTime(@Context final HttpServletRequest req)
-  {
-    authorizationCheck(req, Action.WRITE);
-    return startTime;
-  }
-
-  @Nullable
-  @Override
-  protected TreeMap<Integer, Map<Integer, Long>> getCheckPointsFromContext(
-      TaskToolbox toolbox,
-      String checkpointsString
-  )
-  {
-    throw new UnsupportedOperationException();
-  }
-
-}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 4c9b39b..783af47 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -140,8 +140,6 @@ import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
-import org.apache.druid.segment.loading.SegmentLoaderConfig;
-import org.apache.druid.segment.loading.StorageLocationConfig;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
 import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
@@ -165,8 +163,6 @@ import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
 import java.io.File;
@@ -191,7 +187,6 @@ import java.util.concurrent.TimeoutException;
 
 import static org.apache.druid.query.QueryPlus.wrap;
 
-@RunWith(Parameterized.class)
 public class KafkaIndexTaskTest
 {
   private static final Logger log = new Logger(KafkaIndexTaskTest.class);
@@ -228,24 +223,10 @@ public class KafkaIndexTaskTest
   private File directory;
   private String topic;
   private List<ProducerRecord<byte[], byte[]>> records;
-  private final boolean isIncrementalHandoffSupported;
   private final Set<Integer> checkpointRequestsHash = new HashSet<>();
   private File reportsFile;
   private RowIngestionMetersFactory rowIngestionMetersFactory;
 
-  // This should be removed in versions greater that 0.12.x
-  // isIncrementalHandoffSupported should always be set to true in those later versions
-  @Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}")
-  public static Iterable<Object[]> constructorFeeder()
-  {
-    return ImmutableList.of(new Object[]{true}, new Object[]{false});
-  }
-
-  public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported)
-  {
-    this.isIncrementalHandoffSupported = isIncrementalHandoffSupported;
-  }
-
   private static final DataSchema DATA_SCHEMA = new DataSchema(
       "test_ds",
       OBJECT_MAPPER.convertValue(
@@ -502,9 +483,6 @@ public class KafkaIndexTaskTest
   @Test(timeout = 60_000L)
   public void testIncrementalHandOff() throws Exception
   {
-    if (!isIncrementalHandoffSupported) {
-      return;
-    }
     final String baseSequenceName = "sequence0";
     // as soon as any segment has more than one record, incremental publishing should happen
     maxRowsPerSegment = 2;
@@ -608,9 +586,6 @@ public class KafkaIndexTaskTest
   @Test(timeout = 60_000L)
   public void testIncrementalHandOffMaxTotalRows() throws Exception
   {
-    if (!isIncrementalHandoffSupported) {
-      return;
-    }
     final String baseSequenceName = "sequence0";
     // incremental publish should happen every 3 records
     maxRowsPerSegment = Integer.MAX_VALUE;
@@ -763,9 +738,6 @@ public class KafkaIndexTaskTest
   @Test(timeout = 60_000L)
   public void testTimeBasedIncrementalHandOff() throws Exception
   {
-    if (!isIncrementalHandoffSupported) {
-      return;
-    }
     final String baseSequenceName = "sequence0";
     // as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen
     maxRowsPerSegment = Integer.MAX_VALUE;
@@ -853,9 +825,6 @@ public class KafkaIndexTaskTest
   @Test(timeout = 60_000L)
   public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
   {
-    if (!isIncrementalHandoffSupported) {
-      return;
-    }
     records = generateSinglePartitionRecords(topic);
 
     final String baseSequenceName = "sequence0";
@@ -1675,9 +1644,7 @@ public class KafkaIndexTaskTest
     // desc3 will not be created in KafkaIndexTask (0.12.x) as it does not create per Kafka partition Druid segments
     SegmentDescriptor desc3 = sd(task, "2011/P1D", 1);
     SegmentDescriptor desc4 = sd(task, "2012/P1D", 0);
-    Assert.assertEquals(isIncrementalHandoffSupported
-                        ? ImmutableSet.of(desc1, desc2, desc4)
-                        : ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors());
+    Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc4), publishedDescriptors());
     Assert.assertEquals(
         new KafkaDataSourceMetadata(
             new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L, 1, 2L))
@@ -1691,12 +1658,8 @@ public class KafkaIndexTaskTest
 
     // Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically
     Assert.assertEquals(
-        isIncrementalHandoffSupported
-        ? ImmutableSet.of(ImmutableList.of("d", "e", "h"))
-        : ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")),
-        isIncrementalHandoffSupported
-        ? ImmutableSet.of(readSegmentColumn("dim1", desc2))
-        : ImmutableSet.of(readSegmentColumn("dim1", desc2), readSegmentColumn("dim1", desc3))
+        ImmutableSet.of(ImmutableList.of("d", "e", "h")),
+        ImmutableSet.of(readSegmentColumn("dim1", desc2))
     );
   }
 
@@ -1867,10 +1830,6 @@ public class KafkaIndexTaskTest
   @Test(timeout = 60_000L)
   public void testRestoreAfterPersistingSequences() throws Exception
   {
-    if (!isIncrementalHandoffSupported) {
-      return;
-    }
-
     records = generateSinglePartitionRecords(topic);
     maxRowsPerSegment = 2;
     Map<String, Object> consumerProps = kafkaServer.consumerProperties();
@@ -2136,12 +2095,6 @@ public class KafkaIndexTaskTest
   @Test(timeout = 60_000L)
   public void testRunContextSequenceAheadOfStartingOffsets() throws Exception
   {
-    // This tests the case when a replacement task is created in place of a failed test
-    // which has done some incremental handoffs, thus the context will contain starting
-    // sequence offsets from which the task should start reading and ignore the start offsets
-    if (!isIncrementalHandoffSupported) {
-      return;
-    }
     // Insert data
     insertData();
 
@@ -2344,9 +2297,6 @@ public class KafkaIndexTaskTest
   @Test(timeout = 60_000L)
   public void testCanStartFromLaterThanEarliestOffset() throws Exception
   {
-    if (!isIncrementalHandoffSupported) {
-      return;
-    }
     final String baseSequenceName = "sequence0";
     maxRowsPerSegment = Integer.MAX_VALUE;
     maxTotalRows = null;
@@ -2542,17 +2492,13 @@ public class KafkaIndexTaskTest
         maxParseExceptions,
         maxSavedParseExceptions
     );
-    if (isIncrementalHandoffSupported) {
-      context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
-
-      if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
-        final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
-        checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
-        final String checkpointsJson = OBJECT_MAPPER
-            .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
-            .writeValueAsString(checkpoints);
-        context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
-      }
+    if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
+      final TreeMap<Integer, Map<Integer, Long>> checkpoints = new TreeMap<>();
+      checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
+      final String checkpointsJson = OBJECT_MAPPER
+          .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF)
+          .writeValueAsString(checkpoints);
+      context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson);
     }
 
     final KafkaIndexTask task = new KafkaIndexTask(
@@ -2736,14 +2682,6 @@ public class KafkaIndexTaskTest
     final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig();
     dataSegmentPusherConfig.storageDirectory = getSegmentDirectory();
     final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig);
-    SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig()
-    {
-      @Override
-      public List<StorageLocationConfig> getLocations()
-      {
-        return new ArrayList<>();
-      }
-    };
     toolboxFactory = new TaskToolboxFactory(
         taskConfig,
         taskActionClientFactory,
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index a04556d..ef5c254 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -2664,8 +2664,6 @@ public class KinesisIndexTaskTest extends EasyMockSupport
   ) throws JsonProcessingException
   {
     if (context != null) {
-      context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
-
       if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
         final TreeMap<Integer, Map<String, String>> checkpoints = new TreeMap<>();
         checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 966b76c..755f630 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -121,7 +121,6 @@ import java.util.stream.Stream;
  */
 public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor
 {
-  public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED";
   public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
 
   private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
@@ -2752,7 +2751,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
   protected Map<String, Object> createBaseTaskContexts()
   {
     final Map<String, Object> contexts = new HashMap<>();
-    contexts.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true);
     if (spec.getContext() != null) {
       contexts.putAll(spec.getContext());
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org