You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by am...@apache.org on 2022/10/28 10:49:16 UTC
[druid] branch master updated: Remove skip ignorable shards (#13221)
This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9cbda66d96 Remove skip ignorable shards (#13221)
9cbda66d96 is described below
commit 9cbda66d96797a69b6fcdd7c0cc9cf20a7a5d5b7
Author: AmatyaAvadhanula <am...@imply.io>
AuthorDate: Fri Oct 28 16:19:01 2022 +0530
Remove skip ignorable shards (#13221)
* Revert "Improve kinesis task assignment after resharding (#12235)"
This reverts commit 1ec57cb935bd0b04d3123dfbb26a962a984422c7.
---
.../indexing/kinesis/KinesisRecordSupplier.java | 29 ++--------
.../kinesis/supervisor/KinesisSupervisor.java | 65 ----------------------
.../supervisor/KinesisSupervisorTuningConfig.java | 13 +----
.../kinesis/KinesisIndexTaskTuningConfigTest.java | 1 -
.../kinesis/KinesisRecordSupplierTest.java | 40 -------------
.../kinesis/supervisor/KinesisSupervisorTest.java | 55 ------------------
.../seekablestream/common/RecordSupplier.java | 4 +-
.../supervisor/SeekableStreamSupervisor.java | 26 +--------
8 files changed, 10 insertions(+), 223 deletions(-)
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index f4a1f074fa..f0645f8f82 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -76,6 +76,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -742,7 +743,7 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
});
}
- public Set<Shard> getShards(String stream)
+ private Set<Shard> getShards(String stream)
{
if (useListShards) {
return getShardsUsingListShards(stream);
@@ -782,7 +783,8 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
* This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
*
* @param stream name of stream
- * @return Immutable set of shards
+ *
+ * @return Set of Shard ids
*/
private Set<Shard> getShardsUsingListShards(String stream)
{
@@ -803,11 +805,11 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
public Set<String> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
- ImmutableSet.Builder<String> partitionIds = ImmutableSet.builder();
+ Set<String> partitionIds = new TreeSet<>();
for (Shard shard : getShards(stream)) {
partitionIds.add(shard.getShardId());
}
- return partitionIds.build();
+ return partitionIds;
});
}
@@ -870,25 +872,6 @@ public class KinesisRecordSupplier implements RecordSupplier<String, String, Byt
.anyMatch(fetch -> (fetch != null && !fetch.isDone()));
}
- /**
- * Fetches records from the specified shard to determine if it is empty.
- * @param stream to which shard belongs
- * @param shardId of the closed shard
- * @return true if the closed shard is empty, false otherwise.
- */
- public boolean isClosedShardEmpty(String stream, String shardId)
- {
- String shardIterator = kinesis.getShardIterator(stream,
- shardId,
- ShardIteratorType.TRIM_HORIZON.toString())
- .getShardIterator();
- GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator)
- .withLimit(1);
- GetRecordsResult shardData = kinesis.getRecords(request);
-
- return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null;
- }
-
/**
* Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call
* {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index e506893034..010b671ff1 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -19,12 +19,10 @@
package org.apache.druid.indexing.kinesis.supervisor;
-import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.ByteEntity;
@@ -66,7 +64,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.stream.Collectors;
/**
@@ -91,11 +88,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag;
- // Maintain sets of currently closed shards to find ignorable (closed and empty) shards
- // Poll closed shards once and store the result to avoid redundant costly calls to kinesis
- private final Set<String> emptyClosedShardIds = new TreeSet<>();
- private final Set<String> nonEmptyClosedShardIds = new TreeSet<>();
-
public KinesisSupervisor(
final TaskStorage taskStorage,
final TaskMaster taskMaster,
@@ -425,52 +417,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
return true;
}
- @Override
- protected boolean shouldSkipIgnorablePartitions()
- {
- return spec.getSpec().getTuningConfig().isSkipIgnorableShards();
- }
-
- /**
- * A kinesis shard is considered to be an ignorable partition if it is both closed and empty
- * @return set of shards ignorable by kinesis ingestion
- */
- @Override
- protected Set<String> computeIgnorablePartitionIds()
- {
- updateClosedShardCache();
- return ImmutableSet.copyOf(emptyClosedShardIds);
- }
-
- private synchronized void updateClosedShardCache()
- {
- final KinesisRecordSupplier kinesisRecordSupplier = (KinesisRecordSupplier) recordSupplier;
- final String stream = spec.getSource();
- final Set<Shard> allActiveShards = kinesisRecordSupplier.getShards(stream);
- final Set<String> activeClosedShards = allActiveShards.stream()
- .filter(shard -> isShardClosed(shard))
- .map(Shard::getShardId)
- .collect(Collectors.toSet());
-
- // clear stale shards
- emptyClosedShardIds.retainAll(activeClosedShards);
- nonEmptyClosedShardIds.retainAll(activeClosedShards);
-
- for (String closedShardId : activeClosedShards) {
- // Try to utilize cache
- if (emptyClosedShardIds.contains(closedShardId) || nonEmptyClosedShardIds.contains(closedShardId)) {
- continue;
- }
-
- // Check if it is closed using kinesis and add to cache
- if (kinesisRecordSupplier.isClosedShardEmpty(stream, closedShardId)) {
- emptyClosedShardIds.add(closedShardId);
- } else {
- nonEmptyClosedShardIds.add(closedShardId);
- }
- }
- }
-
@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds
@@ -536,15 +482,4 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
return new KinesisDataSourceMetadata(newSequences);
}
-
- /**
- * A shard is considered closed iff it has an ending sequence number.
- *
- * @param shard to be checked
- * @return if shard is closed
- */
- private boolean isShardClosed(Shard shard)
- {
- return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
- }
}
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index ba8f4311f0..3df3bed162 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -41,7 +41,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration repartitionTransitionDuration;
private final Duration offsetFetchPeriod;
private final boolean useListShards;
- private final boolean skipIgnorableShards;
public static KinesisSupervisorTuningConfig defaultConfig()
{
@@ -77,7 +76,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
null,
null,
null,
- null,
null
);
}
@@ -114,8 +112,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
- @JsonProperty("useListShards") Boolean useListShards,
- @JsonProperty("skipIgnorableShards") Boolean skipIgnorableShards
+ @JsonProperty("useListShards") Boolean useListShards
)
{
super(
@@ -163,7 +160,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
DEFAULT_OFFSET_FETCH_PERIOD
);
this.useListShards = (useListShards != null ? useListShards : false);
- this.skipIgnorableShards = (skipIgnorableShards != null ? skipIgnorableShards : false);
}
@Override
@@ -220,12 +216,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
return useListShards;
}
- @JsonProperty
- public boolean isSkipIgnorableShards()
- {
- return skipIgnorableShards;
- }
-
@Override
public String toString()
{
@@ -259,7 +249,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
", useListShards=" + isUseListShards() +
- ", skipIgnorableShards=" + isSkipIgnorableShards() +
'}';
}
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 61db57449d..acb599856a 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -310,7 +310,6 @@ public class KinesisIndexTaskTuningConfigTest
null,
null,
null,
- null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
index 07ac822524..af0755fcd4 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java
@@ -53,7 +53,6 @@ import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -1107,45 +1106,6 @@ public class KinesisRecordSupplierTest extends EasyMockSupport
verifyAll();
}
- @Test
- public void testIsClosedShardEmpty()
- {
- AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
- KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
- recordsPerFetch,
- 0,
- 2,
- false,
- 100,
- 5000,
- 5000,
- 5,
- true,
- false
- );
- Record record = new Record();
-
- final String shardWithoutRecordsAndNullNextIterator = "0";
- setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNullNextIterator, new ArrayList<>(), null);
-
- final String shardWithRecordsAndNullNextIterator = "1";
- setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNullNextIterator, Collections.singletonList(record), null);
-
- final String shardWithoutRecordsAndNonNullNextIterator = "2";
- setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNonNullNextIterator, new ArrayList<>(), "nextIterator");
-
- final String shardWithRecordsAndNonNullNextIterator = "3";
- setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNonNullNextIterator, Collections.singletonList(record), "nextIterator");
-
- EasyMock.replay(mockKinesis);
-
- // A closed shard is empty only when the records are empty and the next iterator is null
- Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNullNextIterator));
- Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNullNextIterator));
- Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNonNullNextIterator));
- Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNonNullNextIterator));
- }
-
@Test
public void testIsOffsetAvailable()
{
diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index be836567eb..7496b69773 100644
--- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -19,8 +19,6 @@
package org.apache.druid.indexing.kinesis.supervisor;
-import com.amazonaws.services.kinesis.model.SequenceNumberRange;
-import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
@@ -107,7 +105,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -208,7 +205,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- null,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
@@ -3970,7 +3966,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- null,
null
);
@@ -4924,55 +4919,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());
}
- @Test
- public void testGetIgnorablePartitionIds()
- {
- supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
- supervisor.setupRecordSupplier();
- supervisor.tryInit();
- String stream = supervisor.getKinesisSupervisorSpec().getSource();
- SequenceNumberRange openShardRange = new SequenceNumberRange().withEndingSequenceNumber(null);
- SequenceNumberRange closedShardRange = new SequenceNumberRange().withEndingSequenceNumber("non-null");
-
- Shard openShard = new Shard().withShardId("openShard")
- .withSequenceNumberRange(openShardRange);
- Shard emptyClosedShard = new Shard().withShardId("emptyClosedShard")
- .withSequenceNumberRange(closedShardRange);
- Shard nonEmptyClosedShard = new Shard().withShardId("nonEmptyClosedShard")
- .withSequenceNumberRange(closedShardRange);
-
- EasyMock.expect(supervisorRecordSupplier.getShards(stream))
- .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once()
- .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once()
- .andReturn(ImmutableSet.of(openShard, emptyClosedShard)).once()
- .andReturn(ImmutableSet.of(openShard)).once()
- .andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once();
-
- // The following calls happen twice, once during the first call since there was no cache,
- // and once during the last since the cache was cleared prior to it
- EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId()))
- .andReturn(true).times(2);
- EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId()))
- .andReturn(false).times(2);
-
- EasyMock.replay(supervisorRecordSupplier);
-
- // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
- // {empty-closed, nonEmpty-closed} added to cache
- Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
- // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
- Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
- // ActiveShards = {open, empty-closed}, IgnorableShards = {empty-closed}
- // {nonEmpty-closed} removed from cache
- Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
- // ActiveShards = {open}, IgnorableShards = {}
- // {empty-closed} removed from cache
- Assert.assertEquals(new HashSet<>(), supervisor.computeIgnorablePartitionIds());
- // ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
- // {empty-closed, nonEmpty-closed} re-added to cache
- Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
- }
-
private TestableKinesisSupervisor getTestableSupervisor(
int replicas,
int taskCount,
@@ -5082,7 +5028,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
- null,
null
);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
index df2b5c943b..b9db4202a8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
@@ -124,11 +124,11 @@ public interface RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType
SequenceOffsetType getPosition(StreamPartition<PartitionIdType> partition);
/**
- * returns the set of all available partitions under the given stream
+ * returns the set of partitions under the given stream
*
* @param stream name of stream
*
- * @return set of partition ids belonging to the stream
+ * @return set of partitions
*/
Set<PartitionIdType> getPartitionIds(String stream);
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 586c0cb1e7..90258e386f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -2315,30 +2315,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false;
}
- protected boolean shouldSkipIgnorablePartitions()
- {
- return false;
- }
-
- /**
- * Use this method if skipIgnorablePartitions is true in the spec
- *
- * These partitions can be safely ignored for both ingestion task assignment and autoscaler limits
- *
- * @return set of ids of ignorable partitions
- */
- protected Set<PartitionIdType> computeIgnorablePartitionIds()
- {
- return ImmutableSet.of();
- }
-
public int getPartitionCount()
{
- int partitionCount = recordSupplier.getPartitionIds(ioConfig.getStream()).size();
- if (shouldSkipIgnorablePartitions()) {
- partitionCount -= computeIgnorablePartitionIds().size();
- }
- return partitionCount;
+ return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
}
private boolean updatePartitionDataFromStream()
@@ -2348,9 +2327,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
recordSupplierLock.lock();
try {
partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream());
- if (shouldSkipIgnorablePartitions()) {
- partitionIdsFromSupplier.removeAll(computeIgnorablePartitionIds());
- }
}
catch (Exception e) {
stateManager.recordThrowableEvent(e);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org