You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by "panhongan (via GitHub)" <gi...@apache.org> on 2023/08/13 07:07:54 UTC

[PR] Fix sequence change race condition (druid)

panhongan opened a new pull request, #14805:
URL: https://github.com/apache/druid/pull/14805

   <!-- Thanks for trying to help us make Apache Druid be the best it can be! Please fill out as much of the following information as is possible (where relevant, and remove it when irrelevant) to help make the intention and scope of this PR clear in order to ease review. -->
   
   <!-- Please read the doc for contribution (https://github.com/apache/druid/blob/master/CONTRIBUTING.md) before making this PR. Also, once you open a PR, please _avoid using force pushes and rebasing_ since these make it difficult for reviewers to see what you've changed in response to their reviews. See [the 'If your pull request shows conflicts with master' section](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#if-your-pull-request-shows-conflicts-with-master) for more details. -->
   
   Fixes #XXXX.
   
   <!-- Replace XXXX with the id of the issue fixed in this PR. Remove this section if there is no corresponding issue. Don't reference the issue in the title of this pull-request. -->
   
   <!-- If you are a committer, follow the PR action item checklist for committers:
   https://github.com/apache/druid/blob/master/dev/committer-instructions.md#pr-and-issue-action-item-checklist-for-committers. -->
   
   ### Description
   
   <!-- Describe the goal of this PR, what problem are you fixing. If there is a corresponding issue (referenced above), it's not necessary to repeat the description here, however, you may choose to keep one summary sentence. -->
   
   <!-- Describe your patch: what did you change in code? How did you fix the problem? -->
   
   <!-- If there are several relatively logically separate changes in this PR, create a mini-section for each of them. For example: -->
   
   #### Fixed the bug ...
   #### Renamed the class ...
   #### Added a forbidden-apis entry ...
   
   <!--
   In each section, please describe design decisions made, including:
    - Choice of algorithms
    - Behavioral aspects. What configuration values are acceptable? How are corner cases and error conditions handled, such as when there are insufficient resources?
    - Class organization and design (how the logic is split between classes, inheritance, composition, design patterns)
    - Method organization and design (how the logic is split between methods, parameters and return types)
    - Naming (class, method, API, configuration, HTTP endpoint, names of emitted metrics)
   -->
   
   
   <!-- It's good to describe an alternative design (or mention an alternative name) for every design (or naming) decision point and compare the alternatives with the designs that you've implemented (or the names you've chosen) to highlight the advantages of the chosen designs and names. -->
   
   <!-- If there was a discussion of the design of the feature implemented in this PR elsewhere (e. g. a "Proposal" issue, any other issue, or a thread in the development mailing list), link to that discussion from this PR description and explain what have changed in your final design compared to your original proposal or the consensus version in the end of the discussion. If something hasn't changed since the original discussion, you can omit a detailed discussion of those aspects of the design here, perhaps apart from brief mentioning for the sake of readability of this PR description. -->
   
   <!-- Some of the aspects mentioned above may be omitted for simple and small changes. -->
   
   #### Release note
   <!-- Give your best effort to summarize your changes in a couple of sentences aimed toward Druid users. 
   
   If your change doesn't have end user impact, you can skip this section.
   
   For tips about how to write a good release note, see [Release notes](https://github.com/apache/druid/blob/master/CONTRIBUTING.md#release-notes).
   
   -->
   
   
   <hr>
   
   ##### Key changed/added classes in this PR
    * `MyFoo`
    * `OurBar`
    * `TheirBaz`
   
   <hr>
   
   <!-- Check the items by putting "x" in the brackets for the done things. Not all of these items apply to every PR. Remove the items which are not done or not relevant to the PR. None of the items from the checklist below are strictly necessary, but it would be very helpful if you at least self-review the PR. -->
   
   This PR has:
   
   - [ ] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] a release note entry in the PR description.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/dev/license.md)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for [code coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md) is met.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "panhongan (via GitHub)" <gi...@apache.org>.
panhongan commented on PR #14805:
URL: https://github.com/apache/druid/pull/14805#issuecomment-1846584771

   > Hi @panhongan. It seems like the new tests pass with or without the changes. Could you please try adding more deterministic tests if possible?
   
   @AmatyaAvadhanula @abhishekagarwal87 I improved the unit test. Also I see the https://github.com/apache/druid/pull/14995 was merged to fix the same issue. From my side, `synchronized` may be more straightforward to fix race condition issue.
   Not sure is there any chance to merge my PR? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "pranavbhole (via GitHub)" <gi...@apache.org>.
pranavbhole commented on PR #14805:
URL: https://github.com/apache/druid/pull/14805#issuecomment-1676427770

   @panhongan Thank you for PR, can you please elaborate the issue ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "panhongan (via GitHub)" <gi...@apache.org>.
panhongan closed pull request #14805: Fix sequence change race condition
URL: https://github.com/apache/druid/pull/14805


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "panhongan (via GitHub)" <gi...@apache.org>.
panhongan commented on PR #14805:
URL: https://github.com/apache/druid/pull/14805#issuecomment-1751520982

   @pranavbhole  @abhishekagarwal87  Can you help review and merge?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "panhongan (via GitHub)" <gi...@apache.org>.
panhongan commented on PR #14805:
URL: https://github.com/apache/druid/pull/14805#issuecomment-1749304696

   > Is it same as #14995? cc @AmatyaAvadhanula
   
   Yes, that's the same issue. But there was race condition, need to safely handle that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "kfaraz (via GitHub)" <gi...@apache.org>.
kfaraz commented on PR #14805:
URL: https://github.com/apache/druid/pull/14805#issuecomment-1858723039

   @panhongan , do you still see the issue happening after #14995 was merged?
   You could try updating this PR to just contain the tests that verify the fix that was done in #14995.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #14805:
URL: https://github.com/apache/druid/pull/14805#issuecomment-1951499685

   This pull request has been marked as stale due to 60 days of inactivity.
   It will be closed in 4 weeks if no further activity occurs. If you think
   that's incorrect or this pull request should instead be reviewed, please simply
   write any comment. Even if closed, you can still revive the PR at any time or
   discuss it on the dev@druid.apache.org list.
   Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "github-code-scanning[bot] (via GitHub)" <gi...@apache.org>.
github-code-scanning[bot] commented on code in PR #14805:
URL: https://github.com/apache/druid/pull/14805#discussion_r1347762477


##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java:
##########
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Binder;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.RowMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+@SuppressWarnings("unchecked")
+@RunWith(Parameterized.class)
+public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase
+{
+
+  private static final String STREAM = "stream";
+
+  private static final String DATASOURCE = "test_ds";
+
+  private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}";
+
+  private static final String BASE_PERSIST_DIR = "./tmp";
+
+  private static RecordSupplier<String, String, ByteEntity> recordSupplier;
+
+  private static ServiceEmitter emitter;
+
+  private static SeekableStreamIndexTaskRunner taskRunner;
+
+  public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity)
+  {
+    super(lockGranularity);
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+            new Object[]{LockGranularity.TIME_CHUNK},
+            new Object[]{LockGranularity.SEGMENT}
+    );
+  }
+
+  @BeforeClass
+  public static void setupClass()
+  {
+    emitter = new ServiceEmitter(
+            "service",
+            "host",
+            new NoopEmitter()
+    );
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+
+    taskExec = MoreExecutors.listeningDecorator(
+            Executors.newCachedThreadPool(
+                    Execs.makeThreadFactory("runner-task-test-%d")
+            )
+    );
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json");
+    recordSupplier = new TestRecordSupplier();
+
+    TestUtils testUtils = new TestUtils();
+    final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
+
+    for (Module module : new TestIndexTaskModule().getJacksonModules()) {
+      objectMapper.registerModule(module);
+    }
+
+    makeToolboxFactory(testUtils, emitter, false);
+  }
+
+  @After
+  public void tearDownTest() throws IOException
+  {
+    synchronized (runningTasks) {
+      for (Task task : runningTasks) {
+        task.stopGracefully(toolboxFactory.build(task).getConfig());
+      }
+
+      runningTasks.clear();
+    }
+
+    reportsFile.delete();
+    FileUtils.deleteDirectory(new File(BASE_PERSIST_DIR));
+    destroyToolboxFactory();
+  }
+
+  @Test
+  public void testRunTaskWithoutIntermediateHandOff() throws ExecutionException, InterruptedException
+  {
+    TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig(
+            0,
+            STREAM,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")),
+            null,
+            null,
+            null,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false)
+            );
+
+    TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            10L,
+            false,
+            null,
+            null,
+            Period.seconds(1),
+            new File(BASE_PERSIST_DIR),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+    );
+
+    SeekableStreamIndexTask task = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            null,
+            "0"
+    );
+
+    taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK);
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Thread.sleep(5 * 1000L);
+
+    Assert.assertEquals(0, countEvents(task));
+    Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus());
+
+    taskRunner.pause();
+    taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true);
+
+    Thread.sleep(5 * 1000L); // wait for publishing segment
+    taskRunner.stopGracefully();
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1));
+
+    publishedDescriptors();
+    publishedSegments();
+  }
+
+  @Test
+  public void testRunTaskWithIntermediateHandOff() throws ExecutionException, InterruptedException
+  {
+    TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig(
+            0,
+            STREAM,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")),
+            null,
+            null,
+            null,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false)
+    );
+
+    TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            10L,
+            false,
+            null,
+            null,
+            Period.seconds(1),
+            new File(BASE_PERSIST_DIR),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Period.seconds(5),
+            null,
+            null,
+            null
+    );
+
+
+    SeekableStreamIndexTask task = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            null,
+            "0"
+    );
+
+    taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK);
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Thread.sleep(10 * 1000L); // > intermediateHandoffPeriod
+
+    Assert.assertEquals(0, countEvents(task));
+    Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.PAUSED, task.getRunner().getStatus());
+
+    // taskRunner.pause();
+    taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true);
+    // taskRunner.possiblyResetDataSourceMetadata(this.toolbox, null, Collections.emptySet());
+
+    Thread.sleep(5 * 1000L); // wait for publishing segment
+    taskRunner.stopGracefully();
+
+    // Wait for task to exit
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1));
+
+    publishedDescriptors();
+    publishedSegments();
+  }
+
+  @Override
+  protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate()
+  {
+    return new DefaultQueryRunnerFactoryConglomerate(
+            ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
+                    .put(
+                            TimeseriesQuery.class,
+                            new TimeseriesQueryRunnerFactory(
+                                new TimeseriesQueryQueryToolChest(),
+                                new TimeseriesQueryEngine(),
+                                (query, future) -> {
+                                   // do nothing
+                                }
+                            )
+                    ).put(
+                            ScanQuery.class,
+                            new ScanQueryRunnerFactory(
+                                new ScanQueryQueryToolChest(
+                                    new ScanQueryConfig(),
+                                    new DefaultGenericQueryMetricsFactory()
+                                ),
+                                new ScanQueryEngine(),
+                                new ScanQueryConfig()
+                            )
+                    )
+                    .build()
+    );
+  }
+
+  public static class TestRecordSupplier implements RecordSupplier<String, String, ByteEntity>
+  {
+    @Override
+    public void assign(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public void seek(StreamPartition<String> partition, String sequenceNumber)
+    {
+    }
+
+    @Override
+    public void seekToEarliest(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public void seekToLatest(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public Collection<StreamPartition<String>> getAssignment()
+    {
+      return Collections.singletonList(new StreamPartition<>(STREAM, "0"));
+    }
+
+    @Override
+    public @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout)
+    {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String getLatestSequenceNumber(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public String getEarliestSequenceNumber(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset)
+    {
+      return false;
+    }
+
+    @Override
+    public String getPosition(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public Set<String> getPartitionIds(String stream)
+    {
+      return Sets.newHashSet("0");
+    }
+
+    @Override
+    public void close()
+    {
+    }
+  }
+
+  public static class TestSeekableStreamSupervisorTuningConfig implements SeekableStreamSupervisorTuningConfig
+  {
+    @Override
+    public Integer getWorkerThreads()
+    {
+      return 1;
+    }
+
+    @Override
+    public Long getChatRetries()
+    {
+      return 1L;
+    }
+
+    @Override
+    public Duration getHttpTimeout()
+    {
+      return new Period("PT1M").toStandardDuration();
+    }
+
+    @Override
+    public Duration getShutdownTimeout()
+    {
+      return new Period("PT1S").toStandardDuration();
+    }
+
+    @Override
+    public Duration getRepartitionTransitionDuration()
+    {
+      return new Period("PT2M").toStandardDuration();
+    }
+
+    @Override
+    public Duration getOffsetFetchPeriod()
+    {
+      return new Period("PT5M").toStandardDuration();
+    }
+
+    @Override
+    public TestSeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+    {
+      return new TestSeekableStreamIndexTaskTuningConfig(
+                null,
+                null,
+                10L,
+                false,
+                null,
+                null,
+                Period.seconds(1),
+                new File("./tmp"),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                Period.seconds(5),
+                null,
+                null,
+                null
+      );
+    }
+  }
+
+  public static class TestSeekableStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
+  {
+    @JsonCreator
+    public TestSeekableStreamIndexTaskTuningConfig(
+        @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
+        @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+        @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+        @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
+        @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+        @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+        @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
+        @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
+        @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+        @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+        @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+        @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
+        @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
+        @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
+        @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+        @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+        @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+        @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+        @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+    )
+    {
+      super(
+                appendableIndexSpec,
+                maxRowsInMemory,
+                maxBytesInMemory,
+                skipBytesInMemoryOverheadCheck,
+                maxRowsPerSegment,
+                maxTotalRows,
+                intermediatePersistPeriod,
+                basePersistDirectory,
+                maxPendingPersists,
+                indexSpec,
+                indexSpecForIntermediatePersists,
+                reportParseExceptions,
+                handoffConditionTimeout,
+                resetOffsetAutomatically,
+                false,
+                segmentWriteOutMediumFactory,
+                intermediateHandoffPeriod,
+                logParseExceptions,
+                maxParseExceptions,
+                maxSavedParseExceptions
+      );
+    }
+
+    @Override
+    public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
+    {
+      return new TestSeekableStreamIndexTaskTuningConfig(
+                getAppendableIndexSpec(),
+                getMaxRowsInMemory(),
+                getMaxBytesInMemory(),
+                isSkipBytesInMemoryOverheadCheck(),
+                getMaxRowsPerSegment(),
+                getMaxTotalRows(),
+                getIntermediatePersistPeriod(),
+                dir,
+                getMaxPendingPersists(),

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [SeekableStreamIndexTaskTuningConfig.getMaxPendingPersists](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/5871)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java:
##########
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Binder;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.RowMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+@SuppressWarnings("unchecked")
+@RunWith(Parameterized.class)
+public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase
+{
+
+  private static final String STREAM = "stream";
+
+  private static final String DATASOURCE = "test_ds";
+
+  private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}";
+
+  private static final String BASE_PERSIST_DIR = "./tmp";
+
+  private static RecordSupplier<String, String, ByteEntity> recordSupplier;
+
+  private static ServiceEmitter emitter;
+
+  private static SeekableStreamIndexTaskRunner taskRunner;
+
+  public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity)
+  {
+    super(lockGranularity);
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+            new Object[]{LockGranularity.TIME_CHUNK},
+            new Object[]{LockGranularity.SEGMENT}
+    );
+  }
+
+  @BeforeClass
+  public static void setupClass()
+  {
+    emitter = new ServiceEmitter(
+            "service",
+            "host",
+            new NoopEmitter()
+    );
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+
+    taskExec = MoreExecutors.listeningDecorator(
+            Executors.newCachedThreadPool(
+                    Execs.makeThreadFactory("runner-task-test-%d")
+            )
+    );
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json");

Review Comment:
   ## Local information disclosure in a temporary directory
   
   Local information disclosure vulnerability due to use of file readable by other local users.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/5874)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java:
##########
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Binder;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.RowMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+@SuppressWarnings("unchecked")
+@RunWith(Parameterized.class)
+public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase
+{
+
+  private static final String STREAM = "stream";
+
+  private static final String DATASOURCE = "test_ds";
+
+  private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}";
+
+  private static final String BASE_PERSIST_DIR = "./tmp";
+
+  private static RecordSupplier<String, String, ByteEntity> recordSupplier;
+
+  private static ServiceEmitter emitter;
+
+  private static SeekableStreamIndexTaskRunner taskRunner;
+
+  public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity)
+  {
+    super(lockGranularity);
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+            new Object[]{LockGranularity.TIME_CHUNK},
+            new Object[]{LockGranularity.SEGMENT}
+    );
+  }
+
+  @BeforeClass
+  public static void setupClass()
+  {
+    emitter = new ServiceEmitter(
+            "service",
+            "host",
+            new NoopEmitter()
+    );
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+
+    taskExec = MoreExecutors.listeningDecorator(
+            Executors.newCachedThreadPool(
+                    Execs.makeThreadFactory("runner-task-test-%d")
+            )
+    );
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json");
+    recordSupplier = new TestRecordSupplier();
+
+    TestUtils testUtils = new TestUtils();
+    final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
+
+    for (Module module : new TestIndexTaskModule().getJacksonModules()) {
+      objectMapper.registerModule(module);
+    }
+
+    makeToolboxFactory(testUtils, emitter, false);
+  }
+
+  @After
+  public void tearDownTest() throws IOException
+  {
+    synchronized (runningTasks) {
+      for (Task task : runningTasks) {
+        task.stopGracefully(toolboxFactory.build(task).getConfig());
+      }
+
+      runningTasks.clear();
+    }
+
+    reportsFile.delete();
+    FileUtils.deleteDirectory(new File(BASE_PERSIST_DIR));
+    destroyToolboxFactory();
+  }
+
+  @Test
+  public void testRunTaskWithoutIntermediateHandOff() throws ExecutionException, InterruptedException
+  {
+    TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig(
+            0,
+            STREAM,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")),
+            null,
+            null,
+            null,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false)
+            );
+
+    TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            10L,
+            false,
+            null,
+            null,
+            Period.seconds(1),
+            new File(BASE_PERSIST_DIR),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+    );
+
+    SeekableStreamIndexTask task = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            null,
+            "0"
+    );
+
+    taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK);
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Thread.sleep(5 * 1000L);
+
+    Assert.assertEquals(0, countEvents(task));
+    Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus());
+
+    taskRunner.pause();
+    taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true);
+
+    Thread.sleep(5 * 1000L); // wait for publishing segment
+    taskRunner.stopGracefully();
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1));
+
+    publishedDescriptors();
+    publishedSegments();
+  }
+
+  @Test
+  public void testRunTaskWithIntermediateHandOff() throws ExecutionException, InterruptedException
+  {
+    TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig(
+            0,
+            STREAM,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")),
+            null,
+            null,
+            null,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false)
+    );
+
+    TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            10L,
+            false,
+            null,
+            null,
+            Period.seconds(1),
+            new File(BASE_PERSIST_DIR),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Period.seconds(5),
+            null,
+            null,
+            null
+    );
+
+
+    SeekableStreamIndexTask task = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            null,
+            "0"
+    );
+
+    taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK);
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Thread.sleep(10 * 1000L); // > intermediateHandoffPeriod
+
+    Assert.assertEquals(0, countEvents(task));
+    Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.PAUSED, task.getRunner().getStatus());
+
+    // taskRunner.pause();
+    taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true);
+    // taskRunner.possiblyResetDataSourceMetadata(this.toolbox, null, Collections.emptySet());
+
+    Thread.sleep(5 * 1000L); // wait for publishing segment
+    taskRunner.stopGracefully();
+
+    // Wait for task to exit
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1));
+
+    publishedDescriptors();
+    publishedSegments();
+  }
+
+  @Override
+  protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate()
+  {
+    return new DefaultQueryRunnerFactoryConglomerate(
+            ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
+                    .put(
+                            TimeseriesQuery.class,
+                            new TimeseriesQueryRunnerFactory(
+                                new TimeseriesQueryQueryToolChest(),
+                                new TimeseriesQueryEngine(),
+                                (query, future) -> {
+                                   // do nothing
+                                }
+                            )
+                    ).put(
+                            ScanQuery.class,
+                            new ScanQueryRunnerFactory(
+                                new ScanQueryQueryToolChest(
+                                    new ScanQueryConfig(),
+                                    new DefaultGenericQueryMetricsFactory()
+                                ),
+                                new ScanQueryEngine(),
+                                new ScanQueryConfig()
+                            )
+                    )
+                    .build()
+    );
+  }
+
+  public static class TestRecordSupplier implements RecordSupplier<String, String, ByteEntity>
+  {
+    @Override
+    public void assign(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public void seek(StreamPartition<String> partition, String sequenceNumber)
+    {
+    }
+
+    @Override
+    public void seekToEarliest(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public void seekToLatest(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public Collection<StreamPartition<String>> getAssignment()
+    {
+      return Collections.singletonList(new StreamPartition<>(STREAM, "0"));
+    }
+
+    @Override
+    public @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout)
+    {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String getLatestSequenceNumber(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public String getEarliestSequenceNumber(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset)
+    {
+      return false;
+    }
+
+    @Override
+    public String getPosition(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public Set<String> getPartitionIds(String stream)
+    {
+      return Sets.newHashSet("0");
+    }
+
+    @Override
+    public void close()
+    {
+    }
+  }
+
+  public static class TestSeekableStreamSupervisorTuningConfig implements SeekableStreamSupervisorTuningConfig
+  {
+    @Override
+    public Integer getWorkerThreads()
+    {
+      return 1;
+    }
+
+    @Override
+    public Long getChatRetries()
+    {
+      return 1L;
+    }
+
+    @Override
+    public Duration getHttpTimeout()
+    {
+      return new Period("PT1M").toStandardDuration();
+    }
+
+    @Override
+    public Duration getShutdownTimeout()
+    {
+      return new Period("PT1S").toStandardDuration();
+    }
+
+    @Override
+    public Duration getRepartitionTransitionDuration()
+    {
+      return new Period("PT2M").toStandardDuration();
+    }
+
+    @Override
+    public Duration getOffsetFetchPeriod()
+    {
+      return new Period("PT5M").toStandardDuration();
+    }
+
+    @Override
+    public TestSeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+    {
+      return new TestSeekableStreamIndexTaskTuningConfig(
+                null,
+                null,
+                10L,
+                false,
+                null,
+                null,
+                Period.seconds(1),
+                new File("./tmp"),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                Period.seconds(5),
+                null,
+                null,
+                null
+      );
+    }
+  }
+
+  public static class TestSeekableStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
+  {
+    @JsonCreator
+    public TestSeekableStreamIndexTaskTuningConfig(
+        @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
+        @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+        @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+        @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
+        @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+        @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+        @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
+        @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
+        @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+        @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+        @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+        @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
+        @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
+        @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
+        @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+        @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+        @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+        @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+        @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+    )
+    {
+      super(
+                appendableIndexSpec,
+                maxRowsInMemory,
+                maxBytesInMemory,
+                skipBytesInMemoryOverheadCheck,
+                maxRowsPerSegment,
+                maxTotalRows,
+                intermediatePersistPeriod,
+                basePersistDirectory,
+                maxPendingPersists,
+                indexSpec,
+                indexSpecForIntermediatePersists,
+                reportParseExceptions,
+                handoffConditionTimeout,
+                resetOffsetAutomatically,
+                false,
+                segmentWriteOutMediumFactory,
+                intermediateHandoffPeriod,
+                logParseExceptions,
+                maxParseExceptions,
+                maxSavedParseExceptions
+      );
+    }
+
+    @Override
+    public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
+    {
+      return new TestSeekableStreamIndexTaskTuningConfig(
+                getAppendableIndexSpec(),
+                getMaxRowsInMemory(),
+                getMaxBytesInMemory(),
+                isSkipBytesInMemoryOverheadCheck(),
+                getMaxRowsPerSegment(),
+                getMaxTotalRows(),
+                getIntermediatePersistPeriod(),
+                dir,
+                getMaxPendingPersists(),
+                getIndexSpec(),
+                getIndexSpecForIntermediatePersists(),
+                isReportParseExceptions(),
+                getHandoffConditionTimeout(),
+                isResetOffsetAutomatically(),
+                getSegmentWriteOutMediumFactory(),
+                getIntermediateHandoffPeriod(),
+                isLogParseExceptions(),
+                getMaxParseExceptions(),
+                getMaxSavedParseExceptions()
+        );
+    }
+
+    @Override
+    public String toString()
+    {
+      return "TestSeekableStreamIndexTaskTuningConfig{" +
+                "maxRowsInMemory=" + getMaxRowsInMemory() +
+                ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
+                ", maxTotalRows=" + getMaxTotalRows() +
+                ", maxBytesInMemory=" + getMaxBytesInMemory() +
+                ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
+                ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
+                ", maxPendingPersists=" + getMaxPendingPersists() +

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [SeekableStreamIndexTaskTuningConfig.getMaxPendingPersists](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/5872)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java:
##########
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Binder;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.RowMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+@SuppressWarnings("unchecked")
+@RunWith(Parameterized.class)
+public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase
+{
+
+  private static final String STREAM = "stream";
+
+  private static final String DATASOURCE = "test_ds";
+
+  private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}";
+
+  private static final String BASE_PERSIST_DIR = "./tmp";
+
+  private static RecordSupplier<String, String, ByteEntity> recordSupplier;
+
+  private static ServiceEmitter emitter;
+
+  private static SeekableStreamIndexTaskRunner taskRunner;
+
+  public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity)
+  {
+    super(lockGranularity);
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+            new Object[]{LockGranularity.TIME_CHUNK},
+            new Object[]{LockGranularity.SEGMENT}
+    );
+  }
+
+  @BeforeClass
+  public static void setupClass()
+  {
+    emitter = new ServiceEmitter(
+            "service",
+            "host",
+            new NoopEmitter()
+    );
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+
+    taskExec = MoreExecutors.listeningDecorator(
+            Executors.newCachedThreadPool(
+                    Execs.makeThreadFactory("runner-task-test-%d")
+            )
+    );
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json");
+    recordSupplier = new TestRecordSupplier();
+
+    TestUtils testUtils = new TestUtils();
+    final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
+
+    for (Module module : new TestIndexTaskModule().getJacksonModules()) {
+      objectMapper.registerModule(module);
+    }
+
+    makeToolboxFactory(testUtils, emitter, false);
+  }
+
+  @After
+  public void tearDownTest() throws IOException
+  {
+    synchronized (runningTasks) {
+      for (Task task : runningTasks) {
+        task.stopGracefully(toolboxFactory.build(task).getConfig());
+      }
+
+      runningTasks.clear();
+    }
+
+    reportsFile.delete();
+    FileUtils.deleteDirectory(new File(BASE_PERSIST_DIR));
+    destroyToolboxFactory();
+  }
+
+  @Test
+  public void testRunTaskWithoutIntermediateHandOff() throws ExecutionException, InterruptedException
+  {
+    TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig(
+            0,
+            STREAM,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")),
+            null,
+            null,
+            null,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false)
+            );
+
+    TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            10L,
+            false,
+            null,
+            null,
+            Period.seconds(1),
+            new File(BASE_PERSIST_DIR),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+    );
+
+    SeekableStreamIndexTask task = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            null,
+            "0"
+    );
+
+    taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK);
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Thread.sleep(5 * 1000L);
+
+    Assert.assertEquals(0, countEvents(task));
+    Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus());
+
+    taskRunner.pause();
+    taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true);
+
+    Thread.sleep(5 * 1000L); // wait for publishing segment
+    taskRunner.stopGracefully();
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1));
+
+    publishedDescriptors();
+    publishedSegments();
+  }
+
+  @Test
+  public void testRunTaskWithIntermediateHandOff() throws ExecutionException, InterruptedException
+  {
+    TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig(
+            0,
+            STREAM,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")),
+            null,
+            null,
+            null,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false)
+    );
+
+    TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            10L,
+            false,
+            null,
+            null,
+            Period.seconds(1),
+            new File(BASE_PERSIST_DIR),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Period.seconds(5),
+            null,
+            null,
+            null
+    );
+
+
+    SeekableStreamIndexTask task = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            null,
+            "0"
+    );
+
+    taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK);
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Thread.sleep(10 * 1000L); // > intermediateHandoffPeriod
+
+    Assert.assertEquals(0, countEvents(task));
+    Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.PAUSED, task.getRunner().getStatus());
+
+    // taskRunner.pause();
+    taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true);
+    // taskRunner.possiblyResetDataSourceMetadata(this.toolbox, null, Collections.emptySet());
+
+    Thread.sleep(5 * 1000L); // wait for publishing segment
+    taskRunner.stopGracefully();
+
+    // Wait for task to exit
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1));
+
+    publishedDescriptors();
+    publishedSegments();
+  }
+
+  @Override
+  protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate()
+  {
+    return new DefaultQueryRunnerFactoryConglomerate(
+            ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
+                    .put(
+                            TimeseriesQuery.class,
+                            new TimeseriesQueryRunnerFactory(
+                                new TimeseriesQueryQueryToolChest(),
+                                new TimeseriesQueryEngine(),
+                                (query, future) -> {
+                                   // do nothing
+                                }
+                            )
+                    ).put(
+                            ScanQuery.class,
+                            new ScanQueryRunnerFactory(
+                                new ScanQueryQueryToolChest(
+                                    new ScanQueryConfig(),
+                                    new DefaultGenericQueryMetricsFactory()
+                                ),
+                                new ScanQueryEngine(),
+                                new ScanQueryConfig()
+                            )
+                    )
+                    .build()
+    );
+  }
+
+  public static class TestRecordSupplier implements RecordSupplier<String, String, ByteEntity>
+  {
+    @Override
+    public void assign(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public void seek(StreamPartition<String> partition, String sequenceNumber)
+    {
+    }
+
+    @Override
+    public void seekToEarliest(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public void seekToLatest(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public Collection<StreamPartition<String>> getAssignment()
+    {
+      return Collections.singletonList(new StreamPartition<>(STREAM, "0"));
+    }
+
+    @Override
+    public @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout)
+    {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String getLatestSequenceNumber(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public String getEarliestSequenceNumber(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset)
+    {
+      return false;
+    }
+
+    @Override
+    public String getPosition(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public Set<String> getPartitionIds(String stream)
+    {
+      return Sets.newHashSet("0");
+    }
+
+    @Override
+    public void close()
+    {
+    }
+  }
+
+  public static class TestSeekableStreamSupervisorTuningConfig implements SeekableStreamSupervisorTuningConfig
+  {
+    @Override
+    public Integer getWorkerThreads()
+    {
+      return 1;
+    }
+
+    @Override
+    public Long getChatRetries()
+    {
+      return 1L;
+    }
+
+    @Override
+    public Duration getHttpTimeout()
+    {
+      return new Period("PT1M").toStandardDuration();
+    }
+
+    @Override
+    public Duration getShutdownTimeout()
+    {
+      return new Period("PT1S").toStandardDuration();
+    }
+
+    @Override
+    public Duration getRepartitionTransitionDuration()
+    {
+      return new Period("PT2M").toStandardDuration();
+    }
+
+    @Override
+    public Duration getOffsetFetchPeriod()
+    {
+      return new Period("PT5M").toStandardDuration();
+    }
+
+    @Override
+    public TestSeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+    {
+      return new TestSeekableStreamIndexTaskTuningConfig(
+                null,
+                null,
+                10L,
+                false,
+                null,
+                null,
+                Period.seconds(1),
+                new File("./tmp"),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                Period.seconds(5),
+                null,
+                null,
+                null
+      );
+    }
+  }
+
+  public static class TestSeekableStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
+  {
+    @JsonCreator
+    public TestSeekableStreamIndexTaskTuningConfig(
+        @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
+        @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+        @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+        @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
+        @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+        @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+        @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
+        @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
+        @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+        @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+        @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+        @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
+        @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
+        @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
+        @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+        @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+        @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+        @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+        @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+    )
+    {
+      super(
+                appendableIndexSpec,
+                maxRowsInMemory,
+                maxBytesInMemory,
+                skipBytesInMemoryOverheadCheck,
+                maxRowsPerSegment,
+                maxTotalRows,
+                intermediatePersistPeriod,
+                basePersistDirectory,
+                maxPendingPersists,
+                indexSpec,
+                indexSpecForIntermediatePersists,
+                reportParseExceptions,
+                handoffConditionTimeout,
+                resetOffsetAutomatically,
+                false,
+                segmentWriteOutMediumFactory,
+                intermediateHandoffPeriod,
+                logParseExceptions,
+                maxParseExceptions,
+                maxSavedParseExceptions
+      );
+    }
+
+    @Override
+    public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
+    {
+      return new TestSeekableStreamIndexTaskTuningConfig(
+                getAppendableIndexSpec(),
+                getMaxRowsInMemory(),
+                getMaxBytesInMemory(),
+                isSkipBytesInMemoryOverheadCheck(),
+                getMaxRowsPerSegment(),
+                getMaxTotalRows(),
+                getIntermediatePersistPeriod(),
+                dir,
+                getMaxPendingPersists(),
+                getIndexSpec(),
+                getIndexSpecForIntermediatePersists(),
+                isReportParseExceptions(),
+                getHandoffConditionTimeout(),
+                isResetOffsetAutomatically(),
+                getSegmentWriteOutMediumFactory(),
+                getIntermediateHandoffPeriod(),
+                isLogParseExceptions(),
+                getMaxParseExceptions(),
+                getMaxSavedParseExceptions()
+        );
+    }
+
+    @Override
+    public String toString()
+    {
+      return "TestSeekableStreamIndexTaskTuningConfig{" +
+                "maxRowsInMemory=" + getMaxRowsInMemory() +
+                ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
+                ", maxTotalRows=" + getMaxTotalRows() +
+                ", maxBytesInMemory=" + getMaxBytesInMemory() +
+                ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
+                ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
+                ", maxPendingPersists=" + getMaxPendingPersists() +
+                ", indexSpec=" + getIndexSpec() +
+                ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
+                ", reportParseExceptions=" + isReportParseExceptions() +
+                ", handoffConditionTimeout=" + getHandoffConditionTimeout() +
+                ", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
+                ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
+                ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
+                ", logParseExceptions=" + isLogParseExceptions() +
+                ", maxParseExceptions=" + getMaxParseExceptions() +
+                ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
+                '}';
+    }
+  }
+
+  public static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
+  {
+    @JsonCreator
+    public TestSeekableStreamIndexTask(
+            @JsonProperty("id") String id,
+            @JsonProperty("resource") TaskResource taskResource,
+            @JsonProperty("dataSchema") DataSchema dataSchema,
+            @JsonProperty("tuningConfig") TestSeekableStreamIndexTaskTuningConfig tuningConfig,
+            @JsonProperty("ioConfig") TestSeekableStreamIndexTaskIOConfig ioConfig,
+            @JsonProperty("context") Map<String, Object> context,
+            @JsonProperty("groupId") String groupId
+    )
+    {
+      super(id,
+              taskResource,
+              dataSchema,
+              tuningConfig,
+              ioConfig,
+              context,
+              groupId
+      );
+    }
+
+    @Override
+    protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
+    {
+      return taskRunner;
+    }
+
+    @Override
+    protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox)
+    {
+      return recordSupplier;
+    }
+
+    @Override
+    public String getType()
+    {
+      return "index_test";
+    }
+  }
+
+  private static DataSchema getDataSchema()
+  {
+    List<DimensionSchema> dimensions = new ArrayList<>();
+    dimensions.add(StringDimensionSchema.create("id"));
+
+    return new DataSchema(
+            DATASOURCE,
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(dimensions),
+            new AggregatorFactory[]{
+                    // new CountAggregatorFactory("count")
+            },
+            new UniformGranularitySpec(
+                    Granularities.HOUR,
+                    Granularities.MINUTE,
+                    false,
+                    ImmutableList.of()
+            ),
+            null
+    );
+  }
+
+  public static class TestSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, ByteEntity>
+  {
+    public TestSeekableStreamIndexTaskRunner(SeekableStreamIndexTask<String, String, ByteEntity> task, InputRowParser<ByteBuffer> parser, AuthorizerMapper authorizerMapper, LockGranularity lockGranularityToUse)
+    {
+      super(task, parser, authorizerMapper, lockGranularityToUse);
+    }
+
+    @Override
+    protected boolean isEndOfShard(String seqNum)
+    {
+      return false;
+    }
+
+    @Override
+    protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString)
+    {
+      return null;
+    }
+
+    @Override
+    protected String getNextStartOffset(String sequenceNumber)
+    {
+      return sequenceNumber;
+    }
+
+    @Override
+    protected SeekableStreamEndSequenceNumbers<String, String> deserializePartitionsFromMetadata(ObjectMapper mapper, Object object)
+    {
+      return mapper.convertValue(object, mapper.getTypeFactory().constructParametrizedType(
+                SeekableStreamEndSequenceNumbers.class,
+                SeekableStreamEndSequenceNumbers.class,
+                String.class,
+                String.class
+      ));

Review Comment:
   ## Deprecated method or constructor invocation
   
   Invoking [TypeFactory.constructParametrizedType](1) should be avoided because it has been deprecated.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/5873)



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java:
##########
@@ -0,0 +1,855 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.datatype.joda.JodaModule;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.inject.Binder;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.data.input.impl.DimensionSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.TaskState;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.LockGranularity;
+import org.apache.druid.indexing.common.TaskToolbox;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.indexing.overlord.DataSourceMetadata;
+import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.java.util.emitter.core.NoopEmitter;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
+import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.scan.ScanQuery;
+import org.apache.druid.query.scan.ScanQueryConfig;
+import org.apache.druid.query.scan.ScanQueryEngine;
+import org.apache.druid.query.scan.ScanQueryQueryToolChest;
+import org.apache.druid.query.scan.ScanQueryRunnerFactory;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.query.timeseries.TimeseriesQueryEngine;
+import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.incremental.AppendableIndexSpec;
+import org.apache.druid.segment.incremental.RowMeters;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+
+@SuppressWarnings("unchecked")
+@RunWith(Parameterized.class)
+public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase
+{
+
+  private static final String STREAM = "stream";
+
+  private static final String DATASOURCE = "test_ds";
+
+  private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}";
+
+  private static final String BASE_PERSIST_DIR = "./tmp";
+
+  private static RecordSupplier<String, String, ByteEntity> recordSupplier;
+
+  private static ServiceEmitter emitter;
+
+  private static SeekableStreamIndexTaskRunner taskRunner;
+
+  public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity)
+  {
+    super(lockGranularity);
+  }
+
+  @Parameterized.Parameters(name = "{0}")
+  public static Iterable<Object[]> constructorFeeder()
+  {
+    return ImmutableList.of(
+            new Object[]{LockGranularity.TIME_CHUNK},
+            new Object[]{LockGranularity.SEGMENT}
+    );
+  }
+
+  @BeforeClass
+  public static void setupClass()
+  {
+    emitter = new ServiceEmitter(
+            "service",
+            "host",
+            new NoopEmitter()
+    );
+    emitter.start();
+    EmittingLogger.registerEmitter(emitter);
+
+    taskExec = MoreExecutors.listeningDecorator(
+            Executors.newCachedThreadPool(
+                    Execs.makeThreadFactory("runner-task-test-%d")
+            )
+    );
+  }
+
+  @Before
+  public void setup() throws IOException
+  {
+    reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json");
+    recordSupplier = new TestRecordSupplier();
+
+    TestUtils testUtils = new TestUtils();
+    final ObjectMapper objectMapper = testUtils.getTestObjectMapper();
+
+    for (Module module : new TestIndexTaskModule().getJacksonModules()) {
+      objectMapper.registerModule(module);
+    }
+
+    makeToolboxFactory(testUtils, emitter, false);
+  }
+
+  @After
+  public void tearDownTest() throws IOException
+  {
+    synchronized (runningTasks) {
+      for (Task task : runningTasks) {
+        task.stopGracefully(toolboxFactory.build(task).getConfig());
+      }
+
+      runningTasks.clear();
+    }
+
+    reportsFile.delete();
+    FileUtils.deleteDirectory(new File(BASE_PERSIST_DIR));
+    destroyToolboxFactory();
+  }
+
+  @Test
+  public void testRunTaskWithoutIntermediateHandOff() throws ExecutionException, InterruptedException
+  {
+    TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig(
+            0,
+            STREAM,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")),
+            null,
+            null,
+            null,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false)
+            );
+
+    TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            10L,
+            false,
+            null,
+            null,
+            Period.seconds(1),
+            new File(BASE_PERSIST_DIR),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+    );
+
+    SeekableStreamIndexTask task = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            null,
+            "0"
+    );
+
+    taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK);
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Thread.sleep(5 * 1000L);
+
+    Assert.assertEquals(0, countEvents(task));
+    Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus());
+
+    taskRunner.pause();
+    taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true);
+
+    Thread.sleep(5 * 1000L); // wait for publishing segment
+    taskRunner.stopGracefully();
+
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1));
+
+    publishedDescriptors();
+    publishedSegments();
+  }
+
+  @Test
+  public void testRunTaskWithIntermediateHandOff() throws ExecutionException, InterruptedException
+  {
+    TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig(
+            0,
+            STREAM,
+            new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()),
+            new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")),
+            null,
+            null,
+            null,
+            new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false)
+    );
+
+    TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig(
+            null,
+            null,
+            10L,
+            false,
+            null,
+            null,
+            Period.seconds(1),
+            new File(BASE_PERSIST_DIR),
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            Period.seconds(5),
+            null,
+            null,
+            null
+    );
+
+
+    SeekableStreamIndexTask task = new TestSeekableStreamIndexTask(
+            "id1",
+            null,
+            getDataSchema(),
+            taskTuningConfig,
+            taskIoConfig,
+            null,
+            "0"
+    );
+
+    taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK);
+
+    final ListenableFuture<TaskStatus> future = runTask(task);
+    Thread.sleep(10 * 1000L); // > intermediateHandoffPeriod
+
+    Assert.assertEquals(0, countEvents(task));
+    Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.PAUSED, task.getRunner().getStatus());
+
+    // taskRunner.pause();
+    taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true);
+    // taskRunner.possiblyResetDataSourceMetadata(this.toolbox, null, Collections.emptySet());
+
+    Thread.sleep(5 * 1000L); // wait for publishing segment
+    taskRunner.stopGracefully();
+
+    // Wait for task to exit
+    Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode());
+    verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1));
+
+    publishedDescriptors();
+    publishedSegments();
+  }
+
+  @Override
+  protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate()
+  {
+    return new DefaultQueryRunnerFactoryConglomerate(
+            ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder()
+                    .put(
+                            TimeseriesQuery.class,
+                            new TimeseriesQueryRunnerFactory(
+                                new TimeseriesQueryQueryToolChest(),
+                                new TimeseriesQueryEngine(),
+                                (query, future) -> {
+                                   // do nothing
+                                }
+                            )
+                    ).put(
+                            ScanQuery.class,
+                            new ScanQueryRunnerFactory(
+                                new ScanQueryQueryToolChest(
+                                    new ScanQueryConfig(),
+                                    new DefaultGenericQueryMetricsFactory()
+                                ),
+                                new ScanQueryEngine(),
+                                new ScanQueryConfig()
+                            )
+                    )
+                    .build()
+    );
+  }
+
+  public static class TestRecordSupplier implements RecordSupplier<String, String, ByteEntity>
+  {
+    @Override
+    public void assign(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public void seek(StreamPartition<String> partition, String sequenceNumber)
+    {
+    }
+
+    @Override
+    public void seekToEarliest(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public void seekToLatest(Set<StreamPartition<String>> streamPartitions)
+    {
+    }
+
+    @Override
+    public Collection<StreamPartition<String>> getAssignment()
+    {
+      return Collections.singletonList(new StreamPartition<>(STREAM, "0"));
+    }
+
+    @Override
+    public @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout)
+    {
+      return Collections.emptyList();
+    }
+
+    @Override
+    public String getLatestSequenceNumber(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public String getEarliestSequenceNumber(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset)
+    {
+      return false;
+    }
+
+    @Override
+    public String getPosition(StreamPartition<String> partition)
+    {
+      return "10";
+    }
+
+    @Override
+    public Set<String> getPartitionIds(String stream)
+    {
+      return Sets.newHashSet("0");
+    }
+
+    @Override
+    public void close()
+    {
+    }
+  }
+
+  public static class TestSeekableStreamSupervisorTuningConfig implements SeekableStreamSupervisorTuningConfig
+  {
+    @Override
+    public Integer getWorkerThreads()
+    {
+      return 1;
+    }
+
+    @Override
+    public Long getChatRetries()
+    {
+      return 1L;
+    }
+
+    @Override
+    public Duration getHttpTimeout()
+    {
+      return new Period("PT1M").toStandardDuration();
+    }
+
+    @Override
+    public Duration getShutdownTimeout()
+    {
+      return new Period("PT1S").toStandardDuration();
+    }
+
+    @Override
+    public Duration getRepartitionTransitionDuration()
+    {
+      return new Period("PT2M").toStandardDuration();
+    }
+
+    @Override
+    public Duration getOffsetFetchPeriod()
+    {
+      return new Period("PT5M").toStandardDuration();
+    }
+
+    @Override
+    public TestSeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
+    {
+      return new TestSeekableStreamIndexTaskTuningConfig(
+                null,
+                null,
+                10L,
+                false,
+                null,
+                null,
+                Period.seconds(1),
+                new File("./tmp"),
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                null,
+                Period.seconds(5),
+                null,
+                null,
+                null
+      );
+    }
+  }
+
+  public static class TestSeekableStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
+  {
+    @JsonCreator
+    public TestSeekableStreamIndexTaskTuningConfig(
+        @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
+        @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
+        @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+        @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
+        @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+        @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+        @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
+        @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
+        @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
+        @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
+        @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists,
+        @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions,
+        @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout,
+        @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically,
+        @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+        @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
+        @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+        @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+        @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
+    )
+    {
+      super(
+                appendableIndexSpec,
+                maxRowsInMemory,
+                maxBytesInMemory,
+                skipBytesInMemoryOverheadCheck,
+                maxRowsPerSegment,
+                maxTotalRows,
+                intermediatePersistPeriod,
+                basePersistDirectory,
+                maxPendingPersists,
+                indexSpec,
+                indexSpecForIntermediatePersists,
+                reportParseExceptions,
+                handoffConditionTimeout,
+                resetOffsetAutomatically,
+                false,
+                segmentWriteOutMediumFactory,
+                intermediateHandoffPeriod,
+                logParseExceptions,
+                maxParseExceptions,
+                maxSavedParseExceptions
+      );
+    }
+
+    @Override
+    public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
+    {
+      return new TestSeekableStreamIndexTaskTuningConfig(
+                getAppendableIndexSpec(),
+                getMaxRowsInMemory(),
+                getMaxBytesInMemory(),
+                isSkipBytesInMemoryOverheadCheck(),
+                getMaxRowsPerSegment(),
+                getMaxTotalRows(),
+                getIntermediatePersistPeriod(),
+                dir,
+                getMaxPendingPersists(),
+                getIndexSpec(),
+                getIndexSpecForIntermediatePersists(),
+                isReportParseExceptions(),
+                getHandoffConditionTimeout(),
+                isResetOffsetAutomatically(),
+                getSegmentWriteOutMediumFactory(),
+                getIntermediateHandoffPeriod(),
+                isLogParseExceptions(),
+                getMaxParseExceptions(),
+                getMaxSavedParseExceptions()
+        );
+    }
+
+    @Override
+    public String toString()
+    {
+      return "TestSeekableStreamIndexTaskTuningConfig{" +
+                "maxRowsInMemory=" + getMaxRowsInMemory() +
+                ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
+                ", maxTotalRows=" + getMaxTotalRows() +
+                ", maxBytesInMemory=" + getMaxBytesInMemory() +
+                ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
+                ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
+                ", maxPendingPersists=" + getMaxPendingPersists() +
+                ", indexSpec=" + getIndexSpec() +
+                ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() +
+                ", reportParseExceptions=" + isReportParseExceptions() +
+                ", handoffConditionTimeout=" + getHandoffConditionTimeout() +
+                ", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
+                ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() +
+                ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
+                ", logParseExceptions=" + isLogParseExceptions() +
+                ", maxParseExceptions=" + getMaxParseExceptions() +
+                ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
+                '}';
+    }
+  }
+
+  public static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
+  {
+    @JsonCreator
+    public TestSeekableStreamIndexTask(
+            @JsonProperty("id") String id,
+            @JsonProperty("resource") TaskResource taskResource,
+            @JsonProperty("dataSchema") DataSchema dataSchema,
+            @JsonProperty("tuningConfig") TestSeekableStreamIndexTaskTuningConfig tuningConfig,
+            @JsonProperty("ioConfig") TestSeekableStreamIndexTaskIOConfig ioConfig,
+            @JsonProperty("context") Map<String, Object> context,
+            @JsonProperty("groupId") String groupId
+    )
+    {
+      super(id,
+              taskResource,
+              dataSchema,
+              tuningConfig,
+              ioConfig,
+              context,
+              groupId
+      );
+    }
+
+    @Override
+    protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
+    {
+      return taskRunner;
+    }
+
+    @Override
+    protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox)
+    {
+      return recordSupplier;
+    }
+
+    @Override
+    public String getType()
+    {
+      return "index_test";
+    }
+  }
+
+  private static DataSchema getDataSchema()
+  {
+    List<DimensionSchema> dimensions = new ArrayList<>();
+    dimensions.add(StringDimensionSchema.create("id"));
+
+    return new DataSchema(
+            DATASOURCE,
+            new TimestampSpec("timestamp", "iso", null),
+            new DimensionsSpec(dimensions),
+            new AggregatorFactory[]{
+                    // new CountAggregatorFactory("count")
+            },
+            new UniformGranularitySpec(
+                    Granularities.HOUR,
+                    Granularities.MINUTE,
+                    false,
+                    ImmutableList.of()
+            ),
+            null
+    );
+  }
+
+  public static class TestSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, ByteEntity>
+  {
+    public TestSeekableStreamIndexTaskRunner(SeekableStreamIndexTask<String, String, ByteEntity> task, InputRowParser<ByteBuffer> parser, AuthorizerMapper authorizerMapper, LockGranularity lockGranularityToUse)
+    {
+      super(task, parser, authorizerMapper, lockGranularityToUse);
+    }
+
+    @Override
+    protected boolean isEndOfShard(String seqNum)
+    {
+      return false;
+    }
+
+    @Override
+    protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString)
+    {
+      return null;
+    }
+
+    @Override
+    protected String getNextStartOffset(String sequenceNumber)
+    {
+      return sequenceNumber;
+    }
+
+    @Override
+    protected SeekableStreamEndSequenceNumbers<String, String> deserializePartitionsFromMetadata(ObjectMapper mapper, Object object)
+    {
+      return mapper.convertValue(object, mapper.getTypeFactory().constructParametrizedType(
+                SeekableStreamEndSequenceNumbers.class,
+                SeekableStreamEndSequenceNumbers.class,
+                String.class,
+                String.class
+      ));
+    }
+
+    @Override
+    protected @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> getRecords(RecordSupplier<String, String, ByteEntity> recordSupplier, TaskToolbox toolbox)
+    {
+      return Collections.singletonList(new OrderedPartitionableRecord(STREAM, "0", "11", Collections.singletonList(new ByteEntity(MESSAGE.getBytes(StandardCharsets.UTF_8)))));
+    }
+
+    @Override
+    protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadata(SeekableStreamSequenceNumbers<String, String> partitions)
+    {
+      return new TestSeekableStreamDataSourceMetadata(partitions);
+    }
+
+    @Override
+    protected OrderedSequenceNumber<String> createSequenceNumber(String sequenceNumber)
+    {
+      return new TestSequenceNumber(sequenceNumber);
+    }
+
+    @Override
+    protected void possiblyResetDataSourceMetadata(TaskToolbox toolbox, RecordSupplier<String, String, ByteEntity> recordSupplier, Set<StreamPartition<String>> assignment)
+    {
+    }
+
+    @Override
+    protected boolean isEndOffsetExclusive()
+    {
+      return false;
+    }
+
+    @Override
+    protected TypeReference<List<SequenceMetadata<String, String>>> getSequenceMetadataTypeReference()
+    {
+      return new TypeReference<List<SequenceMetadata<String, String>>>()
+      {
+      };
+    }
+  }
+
+  private static class TestSequenceNumber extends OrderedSequenceNumber<String>

Review Comment:
   ## Inconsistent compareTo
   
   This class declares [compareTo](1) but inherits equals; the two could be inconsistent.
   
   [Show more details](https://github.com/apache/druid/security/code-scanning/5875)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "AmatyaAvadhanula (via GitHub)" <gi...@apache.org>.
AmatyaAvadhanula commented on PR #14805:
URL: https://github.com/apache/druid/pull/14805#issuecomment-1774443281

   Hi @panhongan. It seems like the new tests pass with or without the changes. Could you please try adding more deterministic tests if possible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


Re: [PR] Fix sequence change race condition (druid)

Posted by "abhishekagarwal87 (via GitHub)" <gi...@apache.org>.
abhishekagarwal87 commented on PR #14805:
URL: https://github.com/apache/druid/pull/14805#issuecomment-1735808700

   Is it same as https://github.com/apache/druid/pull/14995? cc @AmatyaAvadhanula 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org