You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by am...@apache.org on 2022/10/28 10:49:16 UTC

[druid] branch master updated: Remove skip ignorable shards (#13221)

This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cbda66d96 Remove skip ignorable shards (#13221)
9cbda66d96 is described below

commit 9cbda66d96797a69b6fcdd7c0cc9cf20a7a5d5b7
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Fri Oct 28 16:19:01 2022 +0530

    Remove skip ignorable shards (#13221)
    
    * Revert "Improve kinesis task assignment after resharding (#12235)"
    
    This reverts commit 1ec57cb935bd0b04d3123dfbb26a962a984422c7.
---
 .../indexing/kinesis/KinesisRecordSupplier.java    | 29 ++--------
 .../kinesis/supervisor/KinesisSupervisor.java      | 65 ----------------------
 .../supervisor/KinesisSupervisorTuningConfig.java  | 13 +----
 .../kinesis/KinesisIndexTaskTuningConfigTest.java  |  1 -
 .../kinesis/KinesisRecordSupplierTest.java         | 40 -------------
 .../kinesis/supervisor/KinesisSupervisorTest.java  | 55 ------------------
 .../seekablestream/common/RecordSupplier.java      |  4 +-
 .../supervisor/SeekableStreamSupervisor.java       | 26 +--------
 8 files changed, 10 insertions(+), 223 deletions(-)

diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index f4a1f074fa..f0645f8f82 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -76,6 +76,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -742,7 +743,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
     });
   }
 
-  public Set<Shard> getShards(String stream)
+  private Set<Shard> getShards(String stream)
   {
     if (useListShards) {
       return getShardsUsingListShards(stream);
@@ -782,7 +783,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
    * This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
    *
    * @param stream name of stream
-   * @return Immutable set of shards
+   *
+   * @return Set of Shard ids
    */
   private Set<Shard> getShardsUsingListShards(String stream)
   {
@@ -803,11 +805,11 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
   public Set<String> getPartitionIds(String stream)
   {
     return wrapExceptions(() -> {
-      ImmutableSet.Builder<String> partitionIds = ImmutableSet.builder();
+      Set<String> partitionIds = new TreeSet<>();
       for (Shard shard : getShards(stream)) {
         partitionIds.add(shard.getShardId());
       }
-      return partitionIds.build();
+      return partitionIds;
     });
   }
 
@@ -870,25 +872,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
                              .anyMatch(fetch -> (fetch != null && !fetch.isDone()));
   }
 
-  /**
-   * Fetches records from the specified shard to determine if it is empty.
-   * @param stream to which shard belongs
-   * @param shardId of the closed shard
-   * @return true if the closed shard is empty, false otherwise.
-   */
-  public boolean isClosedShardEmpty(String stream, String shardId)
-  {
-    String shardIterator = kinesis.getShardIterator(stream,
-                                                    shardId,
-                                                    ShardIteratorType.TRIM_HORIZON.toString())
-                                  .getShardIterator();
-    GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator)
-                                                       .withLimit(1);
-    GetRecordsResult shardData = kinesis.getRecords(request);
-
-    return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null;
-  }
-
   /**
    * Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call
    * {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index e506893034..010b671ff1 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -19,12 +19,10 @@
 
 package org.apache.druid.indexing.kinesis.supervisor;
 
-import com.amazonaws.services.kinesis.model.Shard;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
 import org.apache.druid.common.aws.AWSCredentialsConfig;
 import org.apache.druid.common.utils.IdUtils;
 import org.apache.druid.data.input.impl.ByteEntity;
@@ -66,7 +64,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.stream.Collectors;
 
 /**
@@ -91,11 +88,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
   private final AWSCredentialsConfig awsCredentialsConfig;
   private volatile Map<String, Long> currentPartitionTimeLag;
 
-  // Maintain sets of currently closed shards to find ignorable (closed and empty) shards
-  // Poll closed shards once and store the result to avoid redundant costly calls to kinesis
-  private final Set<String> emptyClosedShardIds = new TreeSet<>();
-  private final Set<String> nonEmptyClosedShardIds = new TreeSet<>();
-
   public KinesisSupervisor(
       final TaskStorage taskStorage,
       final TaskMaster taskMaster,
@@ -425,52 +417,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
     return true;
   }
 
-  @Override
-  protected boolean shouldSkipIgnorablePartitions()
-  {
-    return spec.getSpec().getTuningConfig().isSkipIgnorableShards();
-  }
-
-  /**
-   * A kinesis shard is considered to be an ignorable partition if it is both closed and empty
-   * @return set of shards ignorable by kinesis ingestion
-   */
-  @Override
-  protected Set<String> computeIgnorablePartitionIds()
-  {
-    updateClosedShardCache();
-    return ImmutableSet.copyOf(emptyClosedShardIds);
-  }
-
-  private synchronized void updateClosedShardCache()
-  {
-    final KinesisRecordSupplier kinesisRecordSupplier = (KinesisRecordSupplier) recordSupplier;
-    final String stream = spec.getSource();
-    final Set<Shard> allActiveShards = kinesisRecordSupplier.getShards(stream);
-    final Set<String> activeClosedShards = allActiveShards.stream()
-                                                          .filter(shard -> isShardClosed(shard))
-                                                          .map(Shard::getShardId)
-                                                          .collect(Collectors.toSet());
-
-    // clear stale shards
-    emptyClosedShardIds.retainAll(activeClosedShards);
-    nonEmptyClosedShardIds.retainAll(activeClosedShards);
-
-    for (String closedShardId : activeClosedShards) {
-      // Try to utilize cache
-      if (emptyClosedShardIds.contains(closedShardId) || nonEmptyClosedShardIds.contains(closedShardId)) {
-        continue;
-      }
-
-      // Check if it is closed using kinesis and add to cache
-      if (kinesisRecordSupplier.isClosedShardEmpty(stream, closedShardId)) {
-        emptyClosedShardIds.add(closedShardId);
-      } else {
-        nonEmptyClosedShardIds.add(closedShardId);
-      }
-    }
-  }
-
   @Override
   protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(
       SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds
@@ -536,15 +482,4 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
 
     return new KinesisDataSourceMetadata(newSequences);
   }
-
-  /**
-   * A shard is considered closed iff it has an ending sequence number.
-   *
-   * @param shard to be checked
-   * @return if shard is closed
-   */
-  private boolean isShardClosed(Shard shard)
-  {
-    return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
-  }
 }
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index ba8f4311f0..3df3bed162 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -41,7 +41,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
   private final Duration repartitionTransitionDuration;
   private final Duration offsetFetchPeriod;
   private final boolean useListShards;
-  private final boolean skipIgnorableShards;
 
   public static KinesisSupervisorTuningConfig defaultConfig()
   {
@@ -77,7 +76,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
         null,
         null,
         null,
-        null,
         null
     );
   }
@@ -114,8 +112,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
       @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
       @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
       @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
-      @JsonProperty("useListShards") Boolean useListShards,
-      @JsonProperty("skipIgnorableShards") Boolean skipIgnorableShards
+      @JsonProperty("useListShards") Boolean useListShards
   )
   {
     super(
@@ -163,7 +160,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
         DEFAULT_OFFSET_FETCH_PERIOD
     );
     this.useListShards = (useListShards != null ? useListShards : false);
-    this.skipIgnorableShards = (skipIgnorableShards != null ? skipIgnorableShards : false);
   }
 
   @Override
@@ -220,12 +216,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
     return useListShards;
   }
 
-  @JsonProperty
-  public boolean isSkipIgnorableShards()
-  {
-    return skipIgnorableShards;
-  }
-
   @Override
   public String toString()
   {
@@ -259,7 +249,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
            ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
            ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
            ", useListShards=" + isUseListShards() +
-           ", skipIgnorableShards=" + isSkipIgnorableShards() +
            '}';
   }
 
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 61db57449d..acb599856a 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -310,7 +310,6 @@ public class KinesisIndexTaskTuningConfigTest
         null,
         null,
         null,
-        null,
         null
     );
     KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index 07ac822524..af0755fcd4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -53,7 +53,6 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -1107,45 +1106,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
     verifyAll();
   }
 
-  @Test
-  public void testIsClosedShardEmpty()
-  {
-    AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
-    KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
-                                                             recordsPerFetch,
-                                                             0,
-                                                             2,
-                                                             false,
-                                                             100,
-                                                             5000,
-                                                             5000,
-                                                             5,
-                                                             true,
-                                                             false
-    );
-    Record record = new Record();
-
-    final String shardWithoutRecordsAndNullNextIterator = "0";
-    setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNullNextIterator, new ArrayList<>(), null);
-
-    final String shardWithRecordsAndNullNextIterator = "1";
-    setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNullNextIterator, Collections.singletonList(record), null);
-
-    final String shardWithoutRecordsAndNonNullNextIterator = "2";
-    setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNonNullNextIterator, new ArrayList<>(), "nextIterator");
-
-    final String shardWithRecordsAndNonNullNextIterator = "3";
-    setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNonNullNextIterator, Collections.singletonList(record), "nextIterator");
-
-    EasyMock.replay(mockKinesis);
-
-    // A closed shard is empty only when the records are empty and the next iterator is null
-    Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNullNextIterator));
-    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNullNextIterator));
-    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNonNullNextIterator));
-    Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNonNullNextIterator));
-  }
-
   @Test
   public void testIsOffsetAvailable()
   {
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index be836567eb..7496b69773 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -19,8 +19,6 @@
 
 package org.apache.druid.indexing.kinesis.supervisor;
 
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
@@ -107,7 +105,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -208,7 +205,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     );
     rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@@ -3970,7 +3966,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     );
 
@@ -4924,55 +4919,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
     Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());
   }
 
-  @Test
-  public void testGetIgnorablePartitionIds()
-  {
-    supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
-    supervisor.setupRecordSupplier();
-    supervisor.tryInit();
-    String stream = supervisor.getKinesisSupervisorSpec().getSource();
-    SequenceNumberRange openShardRange = new SequenceNumberRange().withEndingSequenceNumber(null);
-    SequenceNumberRange closedShardRange = new SequenceNumberRange().withEndingSequenceNumber("non-null");
-
-    Shard openShard = new Shard().withShardId("openShard")
-                                 .withSequenceNumberRange(openShardRange);
-    Shard emptyClosedShard = new Shard().withShardId("emptyClosedShard")
-                                        .withSequenceNumberRange(closedShardRange);
-    Shard nonEmptyClosedShard = new Shard().withShardId("nonEmptyClosedShard")
-                                           .withSequenceNumberRange(closedShardRange);
-
-    EasyMock.expect(supervisorRecordSupplier.getShards(stream))
-            .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once()
-            .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once()
-            .andReturn(ImmutableSet.of(openShard, emptyClosedShard)).once()
-            .andReturn(ImmutableSet.of(openShard)).once()
-            .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once();
-
-    // The following calls happen twice, once during the first call since there was no cache,
-    // and once during the last since the cache was cleared prior to it
-    EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId()))
-            .andReturn(true).times(2);
-    EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId()))
-            .andReturn(false).times(2);
-
-    EasyMock.replay(supervisorRecordSupplier);
-
-    // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
-    // {empty-closed, nonEmpty-closed} added to cache
-    Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
-    // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
-    Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
-    // ActiveShards = {open, empty-closed}, IgnorableShards = {empty-closed}
-    // {nonEmpty-closed} removed from cache
-    Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
-    // ActiveShards = {open}, IgnorableShards = {}
-    // {empty-closed} removed from cache
-    Assert.assertEquals(new HashSet<>(), supervisor.computeIgnorablePartitionIds());
-    // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
-    // {empty-closed, nonEmpty-closed} re-added to cache
-    Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
-  }
-
   private TestableKinesisSupervisor getTestableSupervisor(
       int replicas,
       int taskCount,
@@ -5082,7 +5028,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         null,
         null,
         null,
-        null,
         null
     );
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index df2b5c943b..b9db4202a8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -124,11 +124,11 @@ public interface RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType
   SequenceOffsetType getPosition(StreamPartition<PartitionIdType> partition);
 
   /**
-   * returns the set of all available partitions under the given stream
+   * returns the set of partitions under the given stream
    *
    * @param stream name of stream
    *
-   * @return set of partition ids belonging to the stream
+   * @return set of partitions
    */
   Set<PartitionIdType> getPartitionIds(String stream);
 
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 586c0cb1e7..90258e386f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2315,30 +2315,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     return false;
   }
 
-  protected boolean shouldSkipIgnorablePartitions()
-  {
-    return false;
-  }
-
-  /**
-   * Use this method if skipIgnorablePartitions is true in the spec
-   *
-   * These partitions can be safely ignored for both ingestion task assignment and autoscaler limits
-   *
-   * @return set of ids of ignorable partitions
-   */
-  protected Set<PartitionIdType> computeIgnorablePartitionIds()
-  {
-    return ImmutableSet.of();
-  }
-
   public int getPartitionCount()
   {
-    int partitionCount = recordSupplier.getPartitionIds(ioConfig.getStream()).size();
-    if (shouldSkipIgnorablePartitions()) {
-      partitionCount -= computeIgnorablePartitionIds().size();
-    }
-    return partitionCount;
+    return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
   }
 
   private boolean updatePartitionDataFromStream()
@@ -2348,9 +2327,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     recordSupplierLock.lock();
     try {
       partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream());
-      if (shouldSkipIgnorablePartitions()) {
-        partitionIdsFromSupplier.removeAll(computeIgnorablePartitionIds());
-      }
     }
     catch (Exception e) {
       stateManager.recordThrowableEvent(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org