You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ji...@apache.org on 2019/06/07 20:29:06 UTC

[incubator-druid] branch 0.15.0-incubating updated: Optimize overshadowed segments computation (#7595) (#7850)

This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.15.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.15.0-incubating by this push:
     new ae57304  Optimize overshadowed segments computation (#7595) (#7850)
ae57304 is described below

commit ae573044241d8c2de29cc4f471c75713580c14e4
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Fri Jun 7 13:29:00 2019 -0700

    Optimize overshadowed segments computation (#7595) (#7850)
    
    * Move the overshadowed segment computation to SQLMetadataSegmentManager's poll
    
    * rename method in MetadataSegmentManager
    
    * Fix tests
    
    * PR comments
    
    * PR comments
    
    * PR comments
    
    * fix indentation
    
    * fix tests
    
    *  fix test
    
    *  add test for SegmentWithOvershadowedStatus serde format
    
    * PR comments
    
    * PR comments
    
    * fix test
    
    * remove snapshot updates outside poll
    
    * PR comments
    
    * PR comments
    
    * PR comments
    
    *  removed unused import
---
 .../timeline/SegmentWithOvershadowedStatus.java    |   7 +
 .../org/apache/druid/utils/CollectionUtils.java    |  19 ++-
 .../SegmentWithOvershadowedStatusTest.java         | 180 +++++++++++++++++++++
 .../MaterializedViewSupervisor.java                |   2 +-
 .../common/actions/SegmentListActionsTest.java     |   2 +-
 .../results/auth_test_sys_schema_segments.json     |   2 +-
 .../apache/druid/client/DataSourcesSnapshot.java   | 114 +++++++++++++
 .../druid/client/ImmutableDruidDataSource.java     |  40 -----
 .../druid/metadata/MetadataSegmentManager.java     |  27 +++-
 .../druid/metadata/SQLMetadataSegmentManager.java  | 132 +++++++--------
 .../druid/server/coordinator/DruidCoordinator.java |  35 +++-
 .../coordinator/DruidCoordinatorRuntimeParams.java |  22 ++-
 .../helper/DruidCoordinatorRuleRunner.java         |  16 +-
 .../druid/server/http/DataSourcesResource.java     |   2 +-
 .../apache/druid/server/http/MetadataResource.java |  25 ++-
 .../metadata/SQLMetadataSegmentManagerTest.java    |   2 +-
 .../coordinator/CuratorDruidCoordinatorTest.java   |   6 +
 .../DruidCoordinatorRuleRunnerTest.java            |  19 ++-
 .../server/coordinator/DruidCoordinatorTest.java   |  22 +--
 .../druid/sql/calcite/schema/SystemSchema.java     |  13 +-
 20 files changed, 514 insertions(+), 173 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
index e86daea..3f2972f 100644
--- a/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
+++ b/core/src/main/java/org/apache/druid/timeline/SegmentWithOvershadowedStatus.java
@@ -21,6 +21,7 @@ package org.apache.druid.timeline;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
 
 /**
  * DataSegment object plus the overshadowed status for the segment. An immutable object.
@@ -31,6 +32,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 public class SegmentWithOvershadowedStatus implements Comparable<SegmentWithOvershadowedStatus>
 {
   private final boolean overshadowed;
+  /**
+   * dataSegment is serialized "unwrapped", i.e. it's properties are included as properties of
+   * enclosing class. If in future, if {@Code SegmentWithOvershadowedStatus} were to extend {@link DataSegment},
+   * there will be no change in the serialized format.
+   */
+  @JsonUnwrapped
   private final DataSegment dataSegment;
 
   @JsonCreator
diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
index 6b4d3cc..af3cf07 100644
--- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
+++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
@@ -19,10 +19,14 @@
 
 package org.apache.druid.utils;
 
+import com.google.common.collect.Maps;
+
 import java.util.AbstractCollection;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Spliterator;
+import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
@@ -68,5 +72,18 @@ public final class CollectionUtils
     };
   }
 
-  private CollectionUtils() {}
+  /**
+   * Returns a transformed map from the given input map where the value is modified based on the given valueMapper
+   * function.
+   */
+  public static <K, V, V2> Map<K, V2> mapValues(Map<K, V> map, Function<V, V2> valueMapper)
+  {
+    final Map<K, V2> result = Maps.newHashMapWithExpectedSize(map.size());
+    map.forEach((k, v) -> result.put(k, valueMapper.apply(v)));
+    return result;
+  }
+
+  private CollectionUtils()
+  {
+  }
 }
diff --git a/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java
new file mode 100644
index 0000000..050f9e0
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/timeline/SegmentWithOvershadowedStatusTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.TestObjectMapper;
+import org.apache.druid.jackson.CommaListJoinDeserializer;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.timeline.partition.ShardSpec;
+import org.joda.time.Interval;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class SegmentWithOvershadowedStatusTest
+{
+  private static final ObjectMapper mapper = new TestObjectMapper();
+  private static final int TEST_VERSION = 0x9;
+
+  @Before
+  public void setUp()
+  {
+    InjectableValues.Std injectableValues = new InjectableValues.Std();
+    injectableValues.addValue(DataSegment.PruneLoadSpecHolder.class, DataSegment.PruneLoadSpecHolder.DEFAULT);
+    mapper.setInjectableValues(injectableValues);
+  }
+
+  @Test
+  public void testUnwrappedSegmentWithOvershadowedStatusDeserialization() throws Exception
+  {
+    final Interval interval = Intervals.of("2011-10-01/2011-10-02");
+    final ImmutableMap<String, Object> loadSpec = ImmutableMap.of("something", "or_other");
+
+    final DataSegment dataSegment = new DataSegment(
+        "something",
+        interval,
+        "1",
+        loadSpec,
+        Arrays.asList("dim1", "dim2"),
+        Arrays.asList("met1", "met2"),
+        NoneShardSpec.instance(),
+        TEST_VERSION,
+        1
+    );
+
+    final SegmentWithOvershadowedStatus segment = new SegmentWithOvershadowedStatus(dataSegment, false);
+
+    final Map<String, Object> objectMap = mapper.readValue(
+        mapper.writeValueAsString(segment),
+        JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
+    );
+
+    Assert.assertEquals(11, objectMap.size());
+    Assert.assertEquals("something", objectMap.get("dataSource"));
+    Assert.assertEquals(interval.toString(), objectMap.get("interval"));
+    Assert.assertEquals("1", objectMap.get("version"));
+    Assert.assertEquals(loadSpec, objectMap.get("loadSpec"));
+    Assert.assertEquals("dim1,dim2", objectMap.get("dimensions"));
+    Assert.assertEquals("met1,met2", objectMap.get("metrics"));
+    Assert.assertEquals(ImmutableMap.of("type", "none"), objectMap.get("shardSpec"));
+    Assert.assertEquals(TEST_VERSION, objectMap.get("binaryVersion"));
+    Assert.assertEquals(1, objectMap.get("size"));
+    Assert.assertEquals(false, objectMap.get("overshadowed"));
+
+    final String json = mapper.writeValueAsString(segment);
+
+    final TestSegmentWithOvershadowedStatus deserializedSegment = mapper.readValue(
+        json,
+        TestSegmentWithOvershadowedStatus.class
+    );
+
+    Assert.assertEquals(segment.getDataSegment().getDataSource(), deserializedSegment.getDataSource());
+    Assert.assertEquals(segment.getDataSegment().getInterval(), deserializedSegment.getInterval());
+    Assert.assertEquals(segment.getDataSegment().getVersion(), deserializedSegment.getVersion());
+    Assert.assertEquals(segment.getDataSegment().getLoadSpec(), deserializedSegment.getLoadSpec());
+    Assert.assertEquals(segment.getDataSegment().getDimensions(), deserializedSegment.getDimensions());
+    Assert.assertEquals(segment.getDataSegment().getMetrics(), deserializedSegment.getMetrics());
+    Assert.assertEquals(segment.getDataSegment().getShardSpec(), deserializedSegment.getShardSpec());
+    Assert.assertEquals(segment.getDataSegment().getSize(), deserializedSegment.getSize());
+    Assert.assertEquals(segment.getDataSegment().getId(), deserializedSegment.getId());
+
+  }
+}
+
+/**
+ * Subclass of DataSegment with overshadowed status
+ */
+class TestSegmentWithOvershadowedStatus extends DataSegment
+{
+  private final boolean overshadowed;
+
+  @JsonCreator
+  public TestSegmentWithOvershadowedStatus(
+      @JsonProperty("dataSource") String dataSource,
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("version") String version,
+      @JsonProperty("loadSpec") @Nullable Map<String, Object> loadSpec,
+      @JsonProperty("dimensions")
+      @JsonDeserialize(using = CommaListJoinDeserializer.class)
+      @Nullable
+          List<String> dimensions,
+      @JsonProperty("metrics")
+      @JsonDeserialize(using = CommaListJoinDeserializer.class)
+      @Nullable
+          List<String> metrics,
+      @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
+      @JsonProperty("binaryVersion") Integer binaryVersion,
+      @JsonProperty("size") long size,
+      @JsonProperty("overshadowed") boolean overshadowed
+  )
+  {
+    super(
+        dataSource,
+        interval,
+        version,
+        loadSpec,
+        dimensions,
+        metrics,
+        shardSpec,
+        binaryVersion,
+        size
+    );
+    this.overshadowed = overshadowed;
+  }
+
+  @JsonProperty
+  public boolean isOvershadowed()
+  {
+    return overshadowed;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TestSegmentWithOvershadowedStatus)) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    final TestSegmentWithOvershadowedStatus that = (TestSegmentWithOvershadowedStatus) o;
+    if (overshadowed != (that.overshadowed)) {
+      return false;
+    }
+    return true;
+  }
+
+}
diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 105afdf..efbdcbe 100644
--- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -365,7 +365,7 @@ public class MaterializedViewSupervisor implements Supervisor
     // drop derivative segments which interval equals the interval in toDeleteBaseSegments 
     for (Interval interval : toDropInterval.keySet()) {
       for (DataSegment segment : derivativeSegments.get(interval)) {
-        segmentManager.removeSegment(segment.getId());
+        segmentManager.removeSegment(segment.getId().toString());
       }
     }
     // data of the latest interval will be built firstly.
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
index 7d5c64c..fa30dde 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentListActionsTest.java
@@ -73,7 +73,7 @@ public class SegmentListActionsTest
 
     expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));
 
-    expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId()));
+    expectedUnusedSegments.forEach(s -> actionTestKit.getMetadataSegmentManager().removeSegment(s.getId().toString()));
   }
 
   private DataSegment createSegment(Interval interval, String version)
diff --git a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
index f2046de..4437e72 100644
--- a/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
+++ b/integration-tests/src/test/resources/results/auth_test_sys_schema_segments.json
@@ -13,6 +13,6 @@
     "is_available": 1,
     "is_realtime": 0,
     "is_overshadowed": 0,
-    "payload": "{\"dataSegment\":{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted, [...]
+    "payload": "{\"dataSource\":\"auth_test\",\"interval\":\"2012-12-29T00:00:00.000Z/2013-01-10T08:00:00.000Z\",\"version\":\"2013-01-10T08:13:47.830Z_v9\",\"loadSpec\":{\"load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space\":\"\"},\"dimensions\":\"anonymous,area_code,city,continent_code,country_name,dma_code,geo,language,namespace,network,newpage,page,postal_code,region_lookup,robot,unpatrolled,user\",\"metrics\":\"added,count,deleted,delta,delta_hist, [...]
   }
 ]
diff --git a/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
new file mode 100644
index 0000000..8417662
--- /dev/null
+++ b/server/src/main/java/org/apache/druid/client/DataSourcesSnapshot.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.client;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Ordering;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.VersionedIntervalTimeline;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable snapshot of fields from {@link org.apache.druid.metadata.SQLMetadataSegmentManager} (dataSources and
+ * overshadowedSegments). Getters of {@link org.apache.druid.metadata.MetadataSegmentManager} should use this snapshot
+ * to return dataSources and overshadowedSegments.
+ */
+public class DataSourcesSnapshot
+{
+  private final Map<String, ImmutableDruidDataSource> dataSources;
+  private final ImmutableSet<SegmentId> overshadowedSegments;
+
+  public DataSourcesSnapshot(
+      Map<String, ImmutableDruidDataSource> dataSources
+  )
+  {
+    this.dataSources = dataSources;
+    this.overshadowedSegments = ImmutableSet.copyOf(determineOvershadowedSegments());
+  }
+
+  public Collection<ImmutableDruidDataSource> getDataSources()
+  {
+    return dataSources.values();
+  }
+
+  public Map<String, ImmutableDruidDataSource> getDataSourcesMap()
+  {
+    return dataSources;
+  }
+
+  @Nullable
+  public ImmutableDruidDataSource getDataSource(String dataSourceName)
+  {
+    return dataSources.get(dataSourceName);
+  }
+
+  public ImmutableSet<SegmentId> getOvershadowedSegments()
+  {
+    return overshadowedSegments;
+  }
+
+  @Nullable
+  public Iterable<DataSegment> iterateAllSegmentsInSnapshot()
+  {
+    if (dataSources == null) {
+      return null;
+    }
+    return () -> dataSources.values().stream()
+                            .flatMap(dataSource -> dataSource.getSegments().stream())
+                            .iterator();
+  }
+
+  /**
+   * This method builds timelines from all dataSources and finds the overshadowed segments list
+   *
+   * @return overshadowed segment Ids list
+   */
+  private List<SegmentId> determineOvershadowedSegments()
+  {
+    final List<DataSegment> segments = dataSources.values().stream()
+                                                  .flatMap(ds -> ds.getSegments().stream())
+                                                  .collect(Collectors.toList());
+    final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
+    segments.forEach(segment -> timelines
+        .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
+        .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
+
+    // It's fine to add all overshadowed segments to a single collection because only
+    // a small fraction of the segments in the cluster are expected to be overshadowed,
+    // so building this collection shouldn't generate a lot of garbage.
+    final List<SegmentId> overshadowedSegments = new ArrayList<>();
+    for (DataSegment dataSegment : segments) {
+      final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
+      if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
+        overshadowedSegments.add(dataSegment.getId());
+      }
+    }
+    return overshadowedSegments;
+  }
+
+}
diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
index 841b716..5953944 100644
--- a/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
+++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidDataSource.java
@@ -25,17 +25,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedMap;
-import com.google.common.collect.Ordering;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
-import org.apache.druid.timeline.VersionedIntervalTimeline;
 
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 
 /**
  * An immutable collection of metadata of segments ({@link DataSegment} objects), belonging to a particular data source.
@@ -114,41 +109,6 @@ public class ImmutableDruidDataSource
     return totalSizeOfSegments;
   }
 
-  /**
-   * This method finds the overshadowed segments from the given segments
-   *
-   * @return set of overshadowed segments
-   */
-  public static Set<DataSegment> determineOvershadowedSegments(Iterable<DataSegment> segments)
-  {
-    final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = buildTimelines(segments);
-
-    final Set<DataSegment> overshadowedSegments = new HashSet<>();
-    for (DataSegment dataSegment : segments) {
-      final VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
-      if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
-        overshadowedSegments.add(dataSegment);
-      }
-    }
-    return overshadowedSegments;
-  }
-
-  /**
-   * Builds a timeline from given segments
-   *
-   * @return map of datasource to VersionedIntervalTimeline of segments
-   */
-  private static Map<String, VersionedIntervalTimeline<String, DataSegment>> buildTimelines(
-      Iterable<DataSegment> segments
-  )
-  {
-    final Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
-    segments.forEach(segment -> timelines
-        .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
-        .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)));
-    return timelines;
-  }
-
   @Override
   public String toString()
   {
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
index 4fc5177..db1fbef 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
@@ -20,6 +20,7 @@
 package org.apache.druid.metadata;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
@@ -28,6 +29,7 @@ import org.joda.time.Interval;
 import javax.annotation.Nullable;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 /**
  */
@@ -57,14 +59,9 @@ public interface MetadataSegmentManager
   boolean removeDataSource(String dataSource);
 
   /**
-   * Prefer {@link #removeSegment(SegmentId)} to this method when possible.
-   *
-   * This method is not removed because {@link org.apache.druid.server.http.DataSourcesResource#deleteDatasourceSegment}
-   * uses it and if it migrates to {@link #removeSegment(SegmentId)} the performance will be worse.
+   * Removes the given segmentId from metadata store. Returns true if one or more rows were affected.
    */
-  boolean removeSegment(String dataSource, String segmentId);
-
-  boolean removeSegment(SegmentId segmentId);
+  boolean removeSegment(String segmentId);
 
   long disableSegments(String dataSource, Collection<String> segmentIds);
 
@@ -99,6 +96,22 @@ public interface MetadataSegmentManager
   Collection<String> getAllDataSourceNames();
 
   /**
+   * Returns a set of overshadowed segment Ids
+   *
+   * Will return null if we do not have a valid snapshot of segments yet (perhaps the underlying metadata store has
+   * not yet been polled.)
+   */
+  @Nullable
+  Set<SegmentId> getOvershadowedSegments();
+
+  /**
+   * Returns a snapshot of DruidDataSources and overshadowed segments
+   *
+   */
+  @Nullable
+  DataSourcesSnapshot getDataSourcesSnapshot();
+
+  /**
    * Returns top N unused segment intervals in given interval when ordered by segment start time, end time.
    */
   List<Interval> getUnusedSegmentIntervals(String dataSource, Interval interval, int limit);
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
index 35843c9..bb85142 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
@@ -24,6 +24,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.inject.Inject;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.guice.ManageLifecycle;
@@ -40,6 +41,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
+import org.apache.druid.utils.CollectionUtils;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.skife.jdbi.v2.BaseResultSetMapper;
@@ -65,6 +67,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -103,11 +106,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
   private final SQLMetadataConnector connector;
 
   // Volatile since this reference is reassigned in "poll" and then read from in other threads.
-  // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty map).
+  // Starts null so we can differentiate "never polled" (null) from "polled, but empty" (empty dataSources map and
+  // empty overshadowedSegments set).
   // Note that this is not simply a lazy-initialized variable: it starts off as null, and may transition between
   // null and nonnull multiple times as stop() and start() are called.
   @Nullable
-  private volatile ConcurrentHashMap<String, DruidDataSource> dataSources = null;
+  private volatile DataSourcesSnapshot dataSourcesSnapshot = null;
 
   /**
    * The number of times this SQLMetadataSegmentManager was started.
@@ -206,8 +210,7 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
       if (!isStarted()) {
         return;
       }
-
-      dataSources = null;
+      dataSourcesSnapshot = null;
       currentStartOrder = -1;
       exec.shutdownNow();
       exec = null;
@@ -449,8 +452,6 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
           ).bind("dataSource", dataSource).execute()
       );
 
-      Optional.ofNullable(dataSources).ifPresent(m -> m.remove(dataSource));
-
       if (removed == 0) {
         return false;
       }
@@ -463,58 +464,15 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
     return true;
   }
 
+  /**
+   * This method does not update {@code dataSourcesSnapshot}, see the comments in {@code doPoll()} about
+   * snapshot update. The segment removal will be reflected after next poll cyccle runs.
+   */
   @Override
-  public boolean removeSegment(String dataSourceName, final String segmentId)
-  {
-    try {
-      final boolean removed = removeSegmentFromTable(segmentId);
-
-      // Call iteratePossibleParsingsWithDataSource() outside of dataSources.computeIfPresent() because the former is a
-      // potentially expensive operation, while lambda to be passed into computeIfPresent() should preferably run fast.
-      List<SegmentId> possibleSegmentIds = SegmentId.iteratePossibleParsingsWithDataSource(dataSourceName, segmentId);
-      Optional.ofNullable(dataSources).ifPresent(
-          m ->
-              m.computeIfPresent(
-                  dataSourceName,
-                  (dsName, dataSource) -> {
-                    for (SegmentId possibleSegmentId : possibleSegmentIds) {
-                      if (dataSource.removeSegment(possibleSegmentId) != null) {
-                        break;
-                      }
-                    }
-                    // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
-                    //noinspection ReturnOfNull
-                    return dataSource.isEmpty() ? null : dataSource;
-                  }
-              )
-      );
-
-      return removed;
-    }
-    catch (Exception e) {
-      log.error(e, e.toString());
-      return false;
-    }
-  }
-
-  @Override
-  public boolean removeSegment(SegmentId segmentId)
+  public boolean removeSegment(String segmentId)
   {
     try {
-      final boolean removed = removeSegmentFromTable(segmentId.toString());
-      Optional.ofNullable(dataSources).ifPresent(
-          m ->
-              m.computeIfPresent(
-                  segmentId.getDataSource(),
-                  (dsName, dataSource) -> {
-                    dataSource.removeSegment(segmentId);
-                    // Returning null from the lambda here makes the ConcurrentHashMap to remove the current entry.
-                    //noinspection ReturnOfNull
-                    return dataSource.isEmpty() ? null : dataSource;
-                  }
-              )
-      );
-      return removed;
+      return removeSegmentFromTable(segmentId);
     }
     catch (Exception e) {
       log.error(e, e.toString());
@@ -607,37 +565,47 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
   @Nullable
   public ImmutableDruidDataSource getDataSource(String dataSourceName)
   {
-    final DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(dataSourceName)).orElse(null);
-    return dataSource == null ? null : dataSource.toImmutableDruidDataSource();
+    final ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
+                                                        .map(m -> m.getDataSourcesMap().get(dataSourceName))
+                                                        .orElse(null);
+    return dataSource == null ? null : dataSource;
   }
 
   @Override
   @Nullable
   public Collection<ImmutableDruidDataSource> getDataSources()
   {
-    return Optional.ofNullable(dataSources)
-                   .map(m ->
-                            m.values()
-                             .stream()
-                             .map(DruidDataSource::toImmutableDruidDataSource)
-                             .collect(Collectors.toList())
-                   )
-                   .orElse(null);
+    return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getDataSources()).orElse(null);
   }
 
   @Override
   @Nullable
   public Iterable<DataSegment> iterateAllSegments()
   {
-    final ConcurrentHashMap<String, DruidDataSource> dataSourcesSnapshot = dataSources;
-    if (dataSourcesSnapshot == null) {
+    final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
+                                                                     .map(m -> m.getDataSources())
+                                                                     .orElse(null);
+    if (dataSources == null) {
       return null;
     }
 
-    return () -> dataSourcesSnapshot.values()
-                                    .stream()
-                                    .flatMap(dataSource -> dataSource.getSegments().stream())
-                                    .iterator();
+    return () -> dataSources.stream()
+                            .flatMap(dataSource -> dataSource.getSegments().stream())
+                            .iterator();
+  }
+
+  @Override
+  @Nullable
+  public Set<SegmentId> getOvershadowedSegments()
+  {
+    return Optional.ofNullable(dataSourcesSnapshot).map(m -> m.getOvershadowedSegments()).orElse(null);
+  }
+
+  @Nullable
+  @Override
+  public DataSourcesSnapshot getDataSourcesSnapshot()
+  {
+    return dataSourcesSnapshot;
   }
 
   @Override
@@ -742,14 +710,26 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
               .addSegmentIfAbsent(segment);
         });
 
-    // Replace "dataSources" atomically.
-    dataSources = newDataSources;
+    // dataSourcesSnapshot is updated only here, please note that if datasources or segments are enabled or disabled
+    // outside of poll, the dataSourcesSnapshot can become invalid until the next poll cycle.
+    // DataSourcesSnapshot computes the overshadowed segments, which makes it an expensive operation if the
+    // snapshot is invalidated on each segment removal, especially if a user issues a lot of single segment remove
+    // calls in rapid succession. So the snapshot update is not done outside of poll at this time.
+    // Updates outside of poll(), were primarily for the user experience, so users would immediately see the effect of
+    // a segment remove call reflected in MetadataResource API calls. These updates outside of scheduled poll may be
+    // added back in removeDataSource and removeSegment methods after the on-demand polling changes from
+    // https://github.com/apache/incubator-druid/pull/7653 are in.
+    final Map<String, ImmutableDruidDataSource> updatedDataSources = CollectionUtils.mapValues(
+        newDataSources,
+        v -> v.toImmutableDruidDataSource()
+    );
+    dataSourcesSnapshot = new DataSourcesSnapshot(updatedDataSources);
   }
 
   /**
    * For the garbage collector in Java, it's better to keep new objects short-living, but once they are old enough
    * (i. e. promoted to old generation), try to keep them alive. In {@link #poll()}, we fetch and deserialize all
-   * existing segments each time, and then replace them in {@link #dataSources}. This method allows to use already
+   * existing segments each time, and then replace them in {@link #dataSourcesSnapshot}. This method allows to use already
    * existing (old) segments when possible, effectively interning them a-la {@link String#intern} or {@link
    * com.google.common.collect.Interner}, aiming to make the majority of {@link DataSegment} objects garbage soon after
    * they are deserialized and to die in young generation. It allows to avoid fragmentation of the old generation and
@@ -757,7 +737,9 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
    */
   private DataSegment replaceWithExistingSegmentIfPresent(DataSegment segment)
   {
-    DruidDataSource dataSource = Optional.ofNullable(dataSources).map(m -> m.get(segment.getDataSource())).orElse(null);
+    ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
+                                                  .map(m -> m.getDataSourcesMap().get(segment.getDataSource()))
+                                                  .orElse(null);
     if (dataSource == null) {
       return segment;
     }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 27bee2f..ba42968 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.Sets;
@@ -29,6 +30,7 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
@@ -82,6 +84,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -145,6 +148,10 @@ public class DruidCoordinator
 
   private volatile boolean started = false;
   private volatile SegmentReplicantLookup segmentReplicantLookup = null;
+  /**
+   * set in {@link CoordinatorRunnable#run()} at start of every coordinator run
+   */
+  private volatile DataSourcesSnapshot dataSourcesSnapshot = null;
 
   @Inject
   public DruidCoordinator(
@@ -316,7 +323,9 @@ public class DruidCoordinator
   public Map<String, Double> getLoadStatus()
   {
     final Map<String, Double> loadStatus = new HashMap<>();
-    final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
+    final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
+                                                                     .map(m -> m.getDataSources())
+                                                                     .orElse(null);
 
     if (dataSources == null) {
       return loadStatus;
@@ -365,7 +374,7 @@ public class DruidCoordinator
   public void removeSegment(DataSegment segment)
   {
     log.info("Removing Segment[%s]", segment.getId());
-    metadataSegmentManager.removeSegment(segment.getId());
+    metadataSegmentManager.removeSegment(segment.getId().toString());
   }
 
   public String getCurrentLeader()
@@ -373,6 +382,12 @@ public class DruidCoordinator
     return coordLeaderSelector.getCurrentLeader();
   }
 
+  @VisibleForTesting
+  void setDataSourcesSnapshotForTest(DataSourcesSnapshot snapshot)
+  {
+    dataSourcesSnapshot = snapshot;
+  }
+
   public void moveSegment(
       ImmutableDruidServer fromServer,
       ImmutableDruidServer toServer,
@@ -393,7 +408,9 @@ public class DruidCoordinator
         throw new IAE("Cannot move [%s] to and from the same server [%s]", segmentId, fromServer.getName());
       }
 
-      ImmutableDruidDataSource dataSource = metadataSegmentManager.getDataSource(segment.getDataSource());
+      ImmutableDruidDataSource dataSource = Optional.ofNullable(dataSourcesSnapshot)
+                                                    .map(m -> m.getDataSource(segment.getDataSource()))
+                                                    .orElse(null);
       if (dataSource == null) {
         throw new IAE("Unable to find dataSource for segment [%s] in metadata", segmentId);
       }
@@ -483,7 +500,10 @@ public class DruidCoordinator
   @Nullable
   public Iterable<DataSegment> iterateAvailableDataSegments()
   {
-    return metadataSegmentManager.iterateAllSegments();
+    final Iterable<DataSegment> dataSources = Optional.ofNullable(dataSourcesSnapshot)
+                                                      .map(m -> m.iterateAllSegmentsInSnapshot())
+                                                      .orElse(null);
+    return dataSources == null ? null : dataSources;
   }
 
   @LifecycleStart
@@ -670,7 +690,11 @@ public class DruidCoordinator
         BalancerStrategy balancerStrategy = factory.createBalancerStrategy(balancerExec);
 
         // Do coordinator stuff.
-        final Collection<ImmutableDruidDataSource> dataSources = metadataSegmentManager.getDataSources();
+        dataSourcesSnapshot = metadataSegmentManager.getDataSourcesSnapshot();
+        final Collection<ImmutableDruidDataSource> dataSources = Optional.ofNullable(dataSourcesSnapshot)
+                                                                         .map(m -> m.getDataSources())
+                                                                         .orElse(null);
+
         if (dataSources == null) {
           log.info("Metadata store not polled yet, skipping this run.");
           return;
@@ -684,6 +708,7 @@ public class DruidCoordinator
                                          .withCompactionConfig(getCompactionConfig())
                                          .withEmitter(emitter)
                                          .withBalancerStrategy(balancerStrategy)
+                                         .withDataSourcesSnapshot(dataSourcesSnapshot)
                                          .build();
         for (DruidCoordinatorHelper helper : helpers) {
           // Don't read state and run state in the same helper otherwise racy conditions may exist
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 655d6bd..de75bce 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -21,6 +21,7 @@ package org.apache.druid.server.coordinator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -66,6 +67,7 @@ public class DruidCoordinatorRuntimeParams
   private final CoordinatorStats stats;
   private final DateTime balancerReferenceTimestamp;
   private final BalancerStrategy balancerStrategy;
+  private final DataSourcesSnapshot dataSourcesSnapshot;
 
   private DruidCoordinatorRuntimeParams(
       long startTime,
@@ -81,7 +83,8 @@ public class DruidCoordinatorRuntimeParams
       CoordinatorCompactionConfig coordinatorCompactionConfig,
       CoordinatorStats stats,
       DateTime balancerReferenceTimestamp,
-      BalancerStrategy balancerStrategy
+      BalancerStrategy balancerStrategy,
+      DataSourcesSnapshot dataSourcesSnapshot
   )
   {
     this.startTime = startTime;
@@ -98,6 +101,7 @@ public class DruidCoordinatorRuntimeParams
     this.stats = stats;
     this.balancerReferenceTimestamp = balancerReferenceTimestamp;
     this.balancerStrategy = balancerStrategy;
+    this.dataSourcesSnapshot = dataSourcesSnapshot;
   }
 
   public long getStartTime()
@@ -171,6 +175,11 @@ public class DruidCoordinatorRuntimeParams
     return balancerStrategy;
   }
 
+  public DataSourcesSnapshot getDataSourcesSnapshot()
+  {
+    return dataSourcesSnapshot;
+  }
+
   public boolean hasDeletionWaitTimeElapsed()
   {
     return (System.currentTimeMillis() - getStartTime() > coordinatorDynamicConfig.getMillisToWaitBeforeDeleting());
@@ -237,6 +246,7 @@ public class DruidCoordinatorRuntimeParams
     private CoordinatorStats stats;
     private DateTime balancerReferenceTimestamp;
     private BalancerStrategy balancerStrategy;
+    private DataSourcesSnapshot dataSourcesSnapshot;
 
     Builder()
     {
@@ -253,6 +263,7 @@ public class DruidCoordinatorRuntimeParams
       this.coordinatorDynamicConfig = CoordinatorDynamicConfig.builder().build();
       this.coordinatorCompactionConfig = CoordinatorCompactionConfig.empty();
       this.balancerReferenceTimestamp = DateTimes.nowUtc();
+      this.dataSourcesSnapshot = null;
     }
 
     Builder(
@@ -304,7 +315,8 @@ public class DruidCoordinatorRuntimeParams
           coordinatorCompactionConfig,
           stats,
           balancerReferenceTimestamp,
-          balancerStrategy
+          balancerStrategy,
+          dataSourcesSnapshot
       );
     }
 
@@ -434,5 +446,11 @@ public class DruidCoordinatorRuntimeParams
       this.balancerStrategy = balancerStrategy;
       return this;
     }
+
+    public Builder withDataSourcesSnapshot(DataSourcesSnapshot snapshot)
+    {
+      this.dataSourcesSnapshot = snapshot;
+      return this;
+    }
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
index bbceaaf..7570c81 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
@@ -19,8 +19,9 @@
 
 package org.apache.druid.server.coordinator.helper;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.metadata.MetadataRuleManager;
@@ -35,6 +36,7 @@ import org.apache.druid.timeline.SegmentId;
 import org.joda.time.DateTime;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -84,8 +86,14 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
     // find available segments which are not overshadowed by other segments in DB
     // only those would need to be loaded/dropped
     // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
-    final Set<DataSegment> overshadowed = ImmutableDruidDataSource
-        .determineOvershadowedSegments(params.getAvailableSegments());
+    // If metadata store hasn't been polled yet, use empty overshadowed list
+    final DataSourcesSnapshot dataSourcesSnapshot = params.getDataSourcesSnapshot();
+    Set<SegmentId> overshadowed = ImmutableSet.of();
+    if (dataSourcesSnapshot != null) {
+      overshadowed = Optional
+          .ofNullable(dataSourcesSnapshot.getOvershadowedSegments())
+          .orElse(ImmutableSet.of());
+    }
 
     for (String tier : cluster.getTierNames()) {
       replicatorThrottler.updateReplicationState(tier);
@@ -103,7 +111,7 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
     final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
     int missingRules = 0;
     for (DataSegment segment : params.getAvailableSegments()) {
-      if (overshadowed.contains(segment)) {
+      if (overshadowed.contains(segment.getId())) {
         // Skipping overshadowed segments
         continue;
       }
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 5904d52..3787cc6 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -434,7 +434,7 @@ public class DataSourcesResource
       @PathParam("segmentId") String segmentId
   )
   {
-    if (databaseSegmentManager.removeSegment(dataSourceName, segmentId)) {
+    if (databaseSegmentManager.removeSegment(segmentId)) {
       return Response.ok().build();
     }
     return Response.noContent().build();
diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
index 3c7e8ac..556ed3d 100644
--- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java
@@ -52,7 +52,6 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -163,11 +162,11 @@ public class MetadataResource
     final Stream<DataSegment> metadataSegments = dataSourceStream.flatMap(t -> t.getSegments().stream());
 
     if (includeOvershadowedStatus != null) {
-      final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = findAuthorizedSegmentWithOvershadowedStatus(
-          req,
-          druidDataSources,
-          metadataSegments
-      );
+      final Iterable<SegmentWithOvershadowedStatus> authorizedSegments =
+          findAuthorizedSegmentWithOvershadowedStatus(
+              req,
+              metadataSegments
+          );
       Response.ResponseBuilder builder = Response.status(Response.Status.OK);
       return builder.entity(authorizedSegments).build();
     } else {
@@ -189,22 +188,18 @@ public class MetadataResource
 
   private Iterable<SegmentWithOvershadowedStatus> findAuthorizedSegmentWithOvershadowedStatus(
       HttpServletRequest req,
-      Collection<ImmutableDruidDataSource> druidDataSources,
       Stream<DataSegment> metadataSegments
   )
   {
-    // It's fine to add all overshadowed segments to a single collection because only
-    // a small fraction of the segments in the cluster are expected to be overshadowed,
-    // so building this collection shouldn't generate a lot of garbage.
-    final Set<DataSegment> overshadowedSegments = new HashSet<>();
-    for (ImmutableDruidDataSource dataSource : druidDataSources) {
-      overshadowedSegments.addAll(ImmutableDruidDataSource.determineOvershadowedSegments(dataSource.getSegments()));
-    }
+    // If metadata store hasn't been polled yet, use empty overshadowed list
+    final Set<SegmentId> overshadowedSegments = Optional
+        .ofNullable(metadataSegmentManager.getOvershadowedSegments())
+        .orElse(Collections.emptySet());
 
     final Stream<SegmentWithOvershadowedStatus> segmentsWithOvershadowedStatus = metadataSegments
         .map(segment -> new SegmentWithOvershadowedStatus(
             segment,
-            overshadowedSegments.contains(segment)
+            overshadowedSegments.contains(segment.getId())
         ));
 
     final Function<SegmentWithOvershadowedStatus, Iterable<ResourceAction>> raGenerator = segment -> Collections
diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
index e796448..a5d436b 100644
--- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSegmentManagerTest.java
@@ -294,7 +294,7 @@ public class SQLMetadataSegmentManagerTest
     publisher.publishSegment(newSegment);
 
     Assert.assertNull(manager.getDataSource(newDataSource));
-    Assert.assertTrue(manager.removeSegment(newSegment.getId()));
+    Assert.assertTrue(manager.removeSegment(newSegment.getId().toString()));
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 3b22235..9cd0206 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -32,6 +32,7 @@ import org.apache.curator.utils.ZKPaths;
 import org.apache.druid.client.BatchServerInventoryView;
 import org.apache.druid.client.CoordinatorSegmentWatcherConfig;
 import org.apache.druid.client.CoordinatorServerView;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.common.config.JacksonConfigManager;
@@ -96,6 +97,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
   private ObjectMapper objectMapper;
   private JacksonConfigManager configManager;
   private DruidNode druidNode;
+  private DataSourcesSnapshot dataSourcesSnapshot;
   private static final String SEGPATH = "/druid/segments";
   private static final String SOURCE_LOAD_PATH = "/druid/loadQueue/localhost:1";
   private static final String DESTINATION_LOAD_PATH = "/druid/loadQueue/localhost:2";
@@ -127,6 +129,7 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
     metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
     configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
+    dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
     EasyMock.expect(
         configManager.watch(
             EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@@ -368,6 +371,9 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource);
     EasyMock.replay(databaseSegmentManager);
 
+    coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot);
+    EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes();
+    EasyMock.replay(dataSourcesSnapshot);
     coordinator.moveSegment(
         source.toImmutableDruidServer(),
         dest.toImmutableDruidServer(),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
index 51a7930..7d38fa0 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
@@ -20,9 +20,11 @@
 package org.apache.druid.server.coordinator;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -30,6 +32,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
 import org.apache.druid.metadata.MetadataRuleManager;
+import org.apache.druid.metadata.MetadataSegmentManager;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.helper.DruidCoordinatorRuleRunner;
@@ -67,6 +70,8 @@ public class DruidCoordinatorRuleRunnerTest
   private DruidCoordinatorRuleRunner ruleRunner;
   private ServiceEmitter emitter;
   private MetadataRuleManager databaseRuleManager;
+  private MetadataSegmentManager databaseSegmentManager;
+  private DataSourcesSnapshot dataSourcesSnapshot;
 
   @Before
   public void setUp()
@@ -76,6 +81,8 @@ public class DruidCoordinatorRuleRunnerTest
     emitter = EasyMock.createMock(ServiceEmitter.class);
     EmittingLogger.registerEmitter(emitter);
     databaseRuleManager = EasyMock.createMock(MetadataRuleManager.class);
+    databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
+    dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
 
     DateTime start = DateTimes.of("2012-01-01");
     availableSegments = new ArrayList<>();
@@ -989,7 +996,9 @@ public class DruidCoordinatorRuleRunnerTest
   @Test
   public void testReplicantThrottle()
   {
-    mockCoordinator();
+    EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes();
+    EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
+    EasyMock.replay(coordinator, databaseSegmentManager, dataSourcesSnapshot);
     mockPeon.loadSegment(EasyMock.anyObject(), EasyMock.anyObject());
     EasyMock.expectLastCall().atLeastOnce();
     mockEmptyPeon();
@@ -1114,6 +1123,8 @@ public class DruidCoordinatorRuleRunnerTest
                                     .build()
         )
         .atLeastOnce();
+    EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of()).anyTimes();
+    EasyMock.replay(dataSourcesSnapshot);
     coordinator.removeSegment(EasyMock.anyObject());
     EasyMock.expectLastCall().anyTimes();
     EasyMock.replay(coordinator);
@@ -1330,8 +1341,9 @@ public class DruidCoordinatorRuleRunnerTest
     );
     availableSegments.add(v1);
     availableSegments.add(v2);
-
-    mockCoordinator();
+    EasyMock.expect(dataSourcesSnapshot.getOvershadowedSegments()).andReturn(ImmutableSet.of(v1.getId())).anyTimes();
+    EasyMock.expect(coordinator.getDynamicConfigs()).andReturn(createCoordinatorDynamicConfig()).anyTimes();
+    EasyMock.replay(coordinator, dataSourcesSnapshot);
     mockPeon.loadSegment(EasyMock.eq(v2), EasyMock.anyObject());
     EasyMock.expectLastCall().once();
     mockEmptyPeon();
@@ -1375,6 +1387,7 @@ public class DruidCoordinatorRuleRunnerTest
             .withBalancerStrategy(balancerStrategy)
             .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
             .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(5).build())
+            .withDataSourcesSnapshot(dataSourcesSnapshot)
             .build();
 
     DruidCoordinatorRuntimeParams afterParams = ruleRunner.run(params);
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index 793bd28..bb44a6c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -29,6 +29,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.curator.utils.ZKPaths;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
@@ -99,6 +100,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
   private ObjectMapper objectMapper;
   private DruidNode druidNode;
   private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
+  private DataSourcesSnapshot dataSourcesSnapshot;
 
   @Before
   public void setUp() throws Exception
@@ -106,8 +108,12 @@ public class DruidCoordinatorTest extends CuratorTestBase
     druidServer = EasyMock.createMock(DruidServer.class);
     serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
     databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
+    dataSourcesSnapshot = EasyMock.createNiceMock(DataSourcesSnapshot.class);
     metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
     JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
+    EasyMock.expect(databaseSegmentManager.getDataSourcesSnapshot()).andReturn(dataSourcesSnapshot).anyTimes();
+    EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
+    EasyMock.replay(databaseSegmentManager);
     EasyMock.expect(
         configManager.watch(
             EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@@ -248,8 +254,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
     ImmutableDruidDataSource druidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
     EasyMock.expect(druidDataSource.getSegment(EasyMock.anyObject(SegmentId.class))).andReturn(segment);
     EasyMock.replay(druidDataSource);
-    EasyMock.expect(databaseSegmentManager.getDataSource(EasyMock.anyString())).andReturn(druidDataSource);
-    EasyMock.replay(databaseSegmentManager);
+    coordinator.setDataSourcesSnapshotForTest(dataSourcesSnapshot);
+    EasyMock.expect(dataSourcesSnapshot.getDataSource(EasyMock.anyString())).andReturn(druidDataSource).anyTimes();
+    EasyMock.replay(dataSourcesSnapshot);
     scheduledExecutorFactory = EasyMock.createNiceMock(ScheduledExecutorFactory.class);
     EasyMock.replay(scheduledExecutorFactory);
     EasyMock.replay(metadataRuleManager);
@@ -533,20 +540,15 @@ public class DruidCoordinatorTest extends CuratorTestBase
 
   private void setupMetadataSegmentManagerMock(DruidDataSource dataSource)
   {
-    EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
     EasyMock
-        .expect(databaseSegmentManager.iterateAllSegments())
+        .expect(dataSourcesSnapshot.iterateAllSegmentsInSnapshot())
         .andReturn(dataSource.getSegments())
         .anyTimes();
     EasyMock
-        .expect(databaseSegmentManager.getDataSources())
+        .expect(dataSourcesSnapshot.getDataSources())
         .andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
         .anyTimes();
-    EasyMock
-        .expect(databaseSegmentManager.getAllDataSourceNames())
-        .andReturn(Collections.singleton(dataSource.getName()))
-        .anyTimes();
-    EasyMock.replay(databaseSegmentManager);
+    EasyMock.replay(dataSourcesSnapshot);
   }
 
   @Nullable
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index e5cfa91..0289788 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -348,12 +348,13 @@ public class SystemSchema extends AbstractSchema
       final AuthenticationResult authenticationResult =
           (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT);
 
-      final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils.filterAuthorizedResources(
-          authenticationResult,
-          () -> it,
-          SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR,
-          authorizerMapper
-      );
+      final Iterable<SegmentWithOvershadowedStatus> authorizedSegments = AuthorizationUtils
+          .filterAuthorizedResources(
+              authenticationResult,
+              () -> it,
+              SEGMENT_WITH_OVERSHADOWED_STATUS_RA_GENERATOR,
+              authorizerMapper
+          );
       return authorizedSegments.iterator();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org