You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by le...@apache.org on 2019/03/19 21:23:06 UTC

[incubator-druid] branch master updated: Avoid many unnecessary materializations of collections of 'all segments in cluster' cardinality (#7185)

This is an automated email from the ASF dual-hosted git repository.

leventov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new dfd27e0  Avoid many unnecessary materializations of collections of 'all segments in cluster' cardinality (#7185)
dfd27e0 is described below

commit dfd27e00c0559e89a77ad8abbae03eeb382c2d09
Author: Roman Leventov <le...@gmail.com>
AuthorDate: Tue Mar 19 18:22:56 2019 -0300

    Avoid many unnecessary materializations of collections of 'all segments in cluster' cardinality (#7185)
    
    * Avoid many  unnecessary materializations of collections of 'all segments in cluster' cardinality
    
    * Fix DruidCoordinatorTest; Renamed DruidCoordinator.getReplicationStatus() to computeUnderReplicationCountsPerDataSourcePerTier()
    
    * More Javadocs, typos, refactor DruidCoordinatorRuntimeParams.createAvailableSegmentsSet()
    
    * Style
    
    * typo
    
    * Disable StaticPseudoFunctionalStyleMethod inspection because of too much false positives
    
    * Fixes
---
 .idea/inspectionProfiles/Druid.xml                 |  28 ++++
 .../util/common/concurrent/ScheduledExecutors.java |   6 +-
 .../org/apache/druid/utils/CollectionUtils.java    |  72 +++++++++
 .../ambari/metrics/AmbariMetricsEmitterModule.java |  19 +--
 .../druid/indexing/worker/WorkerTaskMonitor.java   |   7 +-
 .../org/apache/druid/client/BrokerServerView.java  |   2 +-
 .../apache/druid/client/CoordinatorServerView.java |   2 +-
 .../java/org/apache/druid/client/DruidServer.java  |  20 ++-
 .../druid/client/HttpServerInventoryView.java      |  17 ++-
 .../apache/druid/client/ImmutableDruidServer.java  |  37 ++---
 .../org/apache/druid/curator/CuratorUtils.java     |   6 +
 .../druid/metadata/MetadataSegmentManager.java     |   9 ++
 .../druid/metadata/SQLMetadataSegmentManager.java  |   6 +
 .../druid/server/coordinator/BalancerStrategy.java |   4 +-
 .../server/coordinator/CostBalancerStrategy.java   |   6 +-
 .../server/coordinator/CuratorLoadQueuePeon.java   |   6 +-
 .../DiskNormalizedCostBalancerStrategy.java        |   4 +-
 .../druid/server/coordinator/DruidCoordinator.java |  79 +++++-----
 .../coordinator/DruidCoordinatorRuntimeParams.java |  58 ++++++--
 .../server/coordinator/HttpLoadQueuePeon.java      |   6 +-
 .../coordinator/ReservoirSegmentSampler.java       |   2 +-
 .../server/coordinator/SegmentReplicantLookup.java |   2 +-
 .../helper/DruidCoordinatorBalancer.java           |  15 +-
 .../helper/DruidCoordinatorCleanupUnneeded.java    |  63 ++++----
 .../coordinator/helper/DruidCoordinatorLogger.java |  10 +-
 .../helper/DruidCoordinatorRuleRunner.java         |  68 ++++-----
 .../helper/DruidCoordinatorSegmentInfoLoader.java  |  38 ++++-
 .../druid/server/coordinator/rules/Rule.java       |  12 +-
 .../druid/server/http/CoordinatorResource.java     |   2 +-
 .../apache/druid/server/http/ServersResource.java  |  10 +-
 .../apache/druid/server/http/TiersResource.java    |   2 +-
 .../druid/client/HttpServerInventoryViewTest.java  |   2 +-
 .../client/BatchServerInventoryViewTest.java       |  18 +--
 .../coordinator/CostBalancerStrategyTest.java      |   2 +-
 .../coordinator/CuratorDruidCoordinatorTest.java   |  14 +-
 .../DiskNormalizedCostBalancerStrategyTest.java    |   2 +-
 .../DruidCoordinatorBalancerProfiler.java          |  12 +-
 .../coordinator/DruidCoordinatorBalancerTest.java  |   4 +-
 .../DruidCoordinatorRuleRunnerTest.java            |  30 ++--
 .../server/coordinator/DruidCoordinatorTest.java   | 162 ++++++++-------------
 .../coordinator/ReservoirSegmentSamplerTest.java   |   8 +-
 .../cost/CachingCostBalancerStrategyTest.java      |   2 +-
 .../DruidCoordinatorCleanupOvershadowedTest.java   |  11 +-
 .../rules/BroadcastDistributionRuleTest.java       |   8 +-
 .../server/coordinator/rules/LoadRuleTest.java     |  24 +--
 .../druid/sql/calcite/schema/SystemSchema.java     |   2 +-
 .../druid/sql/calcite/schema/DruidSchemaTest.java  |  22 ++-
 47 files changed, 549 insertions(+), 392 deletions(-)

diff --git a/.idea/inspectionProfiles/Druid.xml b/.idea/inspectionProfiles/Druid.xml
index b2dcda3..77078ae 100644
--- a/.idea/inspectionProfiles/Druid.xml
+++ b/.idea/inspectionProfiles/Druid.xml
@@ -269,6 +269,30 @@
         <constraint name="b" within="" contains="" />
         <constraint name="c" within="" contains="" />
       </searchConfiguration>
+      <searchConfiguration name="No need to wrap a collection as unmodifiable before addAll()" text="$c$.addAll(Collections.$unmodifiableMethod$($c2$))" recursive="true" caseInsensitive="true" type="JAVA">
+        <constraint name="__context__" target="true" within="" contains="" />
+        <constraint name="c" within="" contains="" />
+        <constraint name="unmodifiableMethod" regexp="unmodifiable.*" within="" contains="" />
+        <constraint name="c2" within="" contains="" />
+      </searchConfiguration>
+      <searchConfiguration name="No need to copy a collection as immutable before addAll()" text="$m$.addAll($ImmutableCollection$.copyOf($x$))" recursive="true" caseInsensitive="true" type="JAVA">
+        <constraint name="__context__" target="true" within="" contains="" />
+        <constraint name="m" within="" contains="" />
+        <constraint name="x" maxCount="2147483647" within="" contains="" />
+        <constraint name="ImmutableCollection" regexp="Immutable.*" within="" contains="" />
+      </searchConfiguration>
+      <searchConfiguration name="No need to wrap a map as unmodifiable before putAll()" text="$m$.putAll(Collections.$unmodifiableMapMethod$($m2$))" recursive="true" caseInsensitive="true" type="JAVA">
+        <constraint name="__context__" target="true" within="" contains="" />
+        <constraint name="m" within="" contains="" />
+        <constraint name="m2" within="" contains="" />
+        <constraint name="unmodifiableMapMethod" regexp="unmodifiable.*" within="" contains="" />
+      </searchConfiguration>
+      <searchConfiguration name="No need to copy a map to immutable before putAll()" text="$m$.putAll($ImmutableMap$.copyOf($x$))" recursive="true" caseInsensitive="true" type="JAVA">
+        <constraint name="__context__" target="true" within="" contains="" />
+        <constraint name="m" within="" contains="" />
+        <constraint name="x" maxCount="2147483647" within="" contains="" />
+        <constraint name="ImmutableMap" regexp="Immutable.*" within="" contains="" />
+      </searchConfiguration>
     </inspection_tool>
     <inspection_tool class="SimplifyStreamApiCallChains" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
@@ -278,6 +302,10 @@
     </inspection_tool>
     <inspection_tool class="StaticCallOnSubclass" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="StaticFieldReferenceOnSubclass" enabled="true" level="ERROR" enabled_by_default="true" />
+    <inspection_tool class="StaticPseudoFunctionalStyleMethod" enabled="true" level="INFORMATION" enabled_by_default="true">
+      <!-- The current rate of false-positives produced by this inspection is very high,
+           see https://youtrack.jetbrains.com/issue/IDEA-153047#focus=streamItem-27-3326648.0-0 -->
+    </inspection_tool>
     <inspection_tool class="StringConcatenationInFormatCall" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="StringConcatenationInMessageFormatCall" enabled="true" level="ERROR" enabled_by_default="true" />
     <inspection_tool class="StringConcatenationMissingWhitespace" enabled="true" level="WARNING" enabled_by_default="true" />
diff --git a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
index a25d73e..2850c50 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
@@ -88,9 +88,9 @@ public class ScheduledExecutors
           public void run()
           {
             try {
-              log.debug("Running %s (delay %s)", callable, delay);
+              log.trace("Running %s (delay %s)", callable, delay);
               if (callable.call() == Signal.REPEAT) {
-                log.debug("Rescheduling %s (delay %s)", callable, delay);
+                log.trace("Rescheduling %s (delay %s)", callable, delay);
                 exec.schedule(this, delay.getMillis(), TimeUnit.MILLISECONDS);
               } else {
                 log.debug("Stopped rescheduling %s (delay %s)", callable, delay);
@@ -154,7 +154,7 @@ public class ScheduledExecutors
             }
 
             try {
-              log.debug("Running %s (period %s)", callable, rate);
+              log.trace("Running %s (period %s)", callable, rate);
               prevSignal = callable.call();
             }
             catch (Throwable e) {
diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
new file mode 100644
index 0000000..6b4d3cc
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.utils;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Spliterator;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+public final class CollectionUtils
+{
+  /**
+   * Returns a lazy collection from a stream supplier and a size. {@link Collection#iterator()} of the returned
+   * collection delegates to {@link Stream#iterator()} on the stream returned from the supplier.
+   */
+  public static <E> Collection<E> createLazyCollectionFromStream(Supplier<Stream<E>> sequentialStreamSupplier, int size)
+  {
+    return new AbstractCollection<E>()
+    {
+      @Override
+      public Iterator<E> iterator()
+      {
+        return sequentialStreamSupplier.get().iterator();
+      }
+
+      @Override
+      public Spliterator<E> spliterator()
+      {
+        return sequentialStreamSupplier.get().spliterator();
+      }
+
+      @Override
+      public Stream<E> stream()
+      {
+        return sequentialStreamSupplier.get();
+      }
+
+      @Override
+      public Stream<E> parallelStream()
+      {
+        return sequentialStreamSupplier.get().parallel();
+      }
+
+      @Override
+      public int size()
+      {
+        return size;
+      }
+    };
+  }
+
+  private CollectionUtils() {}
+}
diff --git a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitterModule.java b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitterModule.java
index 4468f63..bdb3bf1 100644
--- a/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitterModule.java
+++ b/extensions-contrib/ambari-metrics-emitter/src/main/java/org/apache/druid/emitter/ambari/metrics/AmbariMetricsEmitterModule.java
@@ -20,8 +20,6 @@
 package org.apache.druid.emitter.ambari.metrics;
 
 import com.fasterxml.jackson.databind.Module;
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
 import com.google.inject.Binder;
 import com.google.inject.Injector;
 import com.google.inject.Key;
@@ -35,6 +33,7 @@ import org.apache.druid.java.util.emitter.core.Emitter;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class AmbariMetricsEmitterModule implements DruidModule
 {
@@ -57,17 +56,11 @@ public class AmbariMetricsEmitterModule implements DruidModule
   @Named(EMITTER_TYPE)
   public Emitter getEmitter(AmbariMetricsEmitterConfig emitterConfig, final Injector injector)
   {
-    List<Emitter> emitters = Lists.transform(
-        emitterConfig.getAlertEmitters(),
-        new Function<String, Emitter>()
-        {
-          @Override
-          public Emitter apply(String s)
-          {
-            return injector.getInstance(Key.get(Emitter.class, Names.named(s)));
-          }
-        }
-    );
+    List<Emitter> emitters = emitterConfig
+        .getAlertEmitters()
+        .stream()
+        .map((String name) -> injector.getInstance(Key.get(Emitter.class, Names.named(name))))
+        .collect(Collectors.toList());
     return new AmbariMetricsEmitter(emitterConfig, emitters);
   }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
index 9aa4e5f..d577233 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskMonitor.java
@@ -27,6 +27,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
 import org.apache.druid.client.indexing.IndexingService;
+import org.apache.druid.curator.CuratorUtils;
 import org.apache.druid.discovery.DruidLeaderClient;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
@@ -156,12 +157,12 @@ public class WorkerTaskMonitor extends WorkerTaskManager
         new PathChildrenCacheListener()
         {
           @Override
-          public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
+          public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
               throws Exception
           {
-            if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+            if (CuratorUtils.isChildAdded(event)) {
               final Task task = jsonMapper.readValue(
-                  cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
+                  cf.getData().forPath(event.getData().getPath()),
                   Task.class
               );
 
diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
index 4d0616c..7df034a 100644
--- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java
+++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java
@@ -219,7 +219,7 @@ public class BrokerServerView implements TimelineServerView
 
   private QueryableDruidServer removeServer(DruidServer server)
   {
-    for (DataSegment segment : server.getSegments()) {
+    for (DataSegment segment : server.iterateAllSegments()) {
       serverRemovedSegment(server.getMetadata(), segment);
     }
     return clients.remove(server.getName());
diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
index 91ebe38..1f1d801 100644
--- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
+++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
@@ -123,7 +123,7 @@ public class CoordinatorServerView implements InventoryView
 
   private void removeServer(DruidServer server)
   {
-    for (DataSegment segment : server.getSegments()) {
+    for (DataSegment segment : server.iterateAllSegments()) {
       serverRemovedSegment(server.getMetadata(), segment);
     }
   }
diff --git a/server/src/main/java/org/apache/druid/client/DruidServer.java b/server/src/main/java/org/apache/druid/client/DruidServer.java
index ca825f6..ac73d63 100644
--- a/server/src/main/java/org/apache/druid/client/DruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/DruidServer.java
@@ -153,15 +153,27 @@ public class DruidServer implements Comparable<DruidServer>
     return metadata.getHostAndTlsPort() != null ? "https" : "http";
   }
 
-  public Iterable<DataSegment> getSegments()
+  /**
+   * Returns an iterable to go over all segments in all data sources, stored on this DruidServer. The order in which
+   * segments are iterated is unspecified.
+   *
+   * Since this DruidServer can be mutated concurrently, the set of segments observed during an iteration may _not_ be
+   * a momentary snapshot of the segments on the server, in other words, it may be that there was no moment when the
+   * DruidServer stored exactly the returned set of segments.
+   *
+   * Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try (to some
+   * reasonable extent) to organize the code so that it iterates the returned iterable only once rather than several
+   * times.
+   */
+  public Iterable<DataSegment> iterateAllSegments()
   {
     return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
   }
 
   /**
    * Returns the current number of segments, stored in this DruidServer object. This number if weakly consistent with
-   * the number of segments if {@link #getSegments} is iterated about the same time, because segments might be added or
-   * removed in parallel.
+   * the number of segments if {@link #iterateAllSegments} is iterated about the same time, because segments might be
+   * added or removed in parallel.
    */
   public int getTotalSegments()
   {
@@ -200,7 +212,7 @@ public class DruidServer implements Comparable<DruidServer>
 
   public DruidServer addDataSegments(DruidServer server)
   {
-    server.getSegments().forEach(this::addDataSegment);
+    server.iterateAllSegments().forEach(this::addDataSegment);
     return this;
   }
 
diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
index 1279b3a..0515414 100644
--- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
+++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java
@@ -70,16 +70,17 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
- * This class uses internal-discovery i.e. {@link DruidNodeDiscoveryProvider} to discover various queryable nodes in the cluster
- * such as historicals and realtime peon processes.
- * For each queryable server, it uses HTTP GET /druid-internal/v1/segments (see docs in SegmentListerResource.getSegments(..),
- * to keep sync'd state of segments served by those servers.
+ * This class uses internal-discovery i.e. {@link DruidNodeDiscoveryProvider} to discover various queryable nodes in the
+ * cluster such as historicals and realtime peon processes.
+ *
+ * For each queryable server, it uses HTTP GET /druid-internal/v1/segments (see docs for {@link
+ * org.apache.druid.server.http.SegmentListerResource#getSegments}), to keep sync'd state of segments served by those
+ * servers.
  */
 public class HttpServerInventoryView implements ServerInventoryView, FilteredServerInventoryView
 {
-  public static final TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>> SEGMENT_LIST_RESP_TYPE_REF = new TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>>()
-  {
-  };
+  public static final TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>> SEGMENT_LIST_RESP_TYPE_REF =
+      new TypeReference<ChangeRequestsSnapshot<DataSegmentChangeRequest>>() {};
 
   private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class);
   private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
@@ -545,7 +546,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
         public void fullSync(List<DataSegmentChangeRequest> changes)
         {
           Map<SegmentId, DataSegment> toRemove = Maps.newHashMapWithExpectedSize(druidServer.getTotalSegments());
-          druidServer.getSegments().forEach(segment -> toRemove.put(segment.getId(), segment));
+          druidServer.iterateAllSegments().forEach(segment -> toRemove.put(segment.getId(), segment));
 
           for (DataSegmentChangeRequest request : changes) {
             if (request instanceof SegmentChangeRequestLoad) {
diff --git a/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java b/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
index b6d893f..d01ad96 100644
--- a/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
+++ b/server/src/main/java/org/apache/druid/client/ImmutableDruidServer.java
@@ -26,10 +26,10 @@ import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.utils.CollectionUtils;
 
-import java.util.AbstractCollection;
+import javax.annotation.Nullable;
 import java.util.Collection;
-import java.util.Iterator;
 
 /**
  * This class should not be subclassed, it isn't declared final only to make it possible to mock the class with EasyMock
@@ -107,6 +107,7 @@ public class ImmutableDruidServer
     return metadata.getPriority();
   }
 
+  @Nullable
   public DataSegment getSegment(SegmentId segmentId)
   {
     ImmutableDruidDataSource dataSource = dataSources.get(segmentId.getDataSource());
@@ -126,22 +127,22 @@ public class ImmutableDruidServer
     return dataSources.get(name);
   }
 
-  public Collection<DataSegment> getSegments()
-  {
-    return new AbstractCollection<DataSegment>()
-    {
-      @Override
-      public Iterator<DataSegment> iterator()
-      {
-        return dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
-      }
-
-      @Override
-      public int size()
-      {
-        return totalSegments;
-      }
-    };
+  /**
+   * Returns a lazy collection with all segments in all data sources, stored on this ImmutableDruidServer. The order
+   * of segments in this collection is unspecified.
+   *
+   * Calling {@link Collection#size()} on the returned collection is cheap, O(1).
+   *
+   * Note: iteration over the returned collection may not be as trivially cheap as, for example, iteration over an
+   * ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned collection only
+   * once rather than several times.
+   */
+  public Collection<DataSegment> getLazyAllSegments()
+  {
+    return CollectionUtils.createLazyCollectionFromStream(
+        () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()),
+        totalSegments
+    );
   }
 
   public String getURL()
diff --git a/server/src/main/java/org/apache/druid/curator/CuratorUtils.java b/server/src/main/java/org/apache/druid/curator/CuratorUtils.java
index b89a645..91e26af 100644
--- a/server/src/main/java/org/apache/druid/curator/CuratorUtils.java
+++ b/server/src/main/java/org/apache/druid/curator/CuratorUtils.java
@@ -20,6 +20,7 @@
 package org.apache.druid.curator;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.zookeeper.CreateMode;
@@ -127,4 +128,9 @@ public class CuratorUtils
       );
     }
   }
+
+  public static boolean isChildAdded(PathChildrenCacheEvent event)
+  {
+    return event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED);
+  }
 }
diff --git a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
index e33e354..436ad12 100644
--- a/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/MetadataSegmentManager.java
@@ -21,6 +21,7 @@ package org.apache.druid.metadata;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.druid.client.ImmutableDruidDataSource;
+import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.SegmentId;
 import org.joda.time.Interval;
 
@@ -59,6 +60,14 @@ public interface MetadataSegmentManager
 
   Collection<ImmutableDruidDataSource> getDataSources();
 
+  /**
+   * Returns an iterable to go over all segments in all data sources. The order in which segments are iterated is
+   * unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
+   * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
+   * several times.
+   */
+  Iterable<DataSegment> iterateAllSegments();
+
   Collection<String> getAllDataSourceNames();
 
   /**
diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
index ec73dc5..dff9c9d 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataSegmentManager.java
@@ -436,6 +436,12 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
   }
 
   @Override
+  public Iterable<DataSegment> iterateAllSegments()
+  {
+    return () -> dataSources.values().stream().flatMap(dataSource -> dataSource.getSegments().stream()).iterator();
+  }
+
+  @Override
   public Collection<String> getAllDataSourceNames()
   {
     return connector.getDBI().withHandle(
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
index a75642f..552227e 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
@@ -55,8 +55,10 @@ public interface BalancerStrategy
   /**
    * Pick the best segment to move from one of the supplied set of servers according to the balancing strategy.
    * @param serverHolders set of historicals to consider for moving segments
-   * @return {@link BalancerSegmentHolder} containing segment to move and server it current resides on
+   * @return {@link BalancerSegmentHolder} containing segment to move and server it currently resides on, or null if
+   *         there are no segments to pick from (i. e. all provided serverHolders are empty).
    */
+  @Nullable
   BalancerSegmentHolder pickSegmentToMove(List<ServerHolder> serverHolders);
 
   /**
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
index 5bf4f38..d2d3029 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java
@@ -258,7 +258,7 @@ public class CostBalancerStrategy implements BalancerStrategy
   {
     double cost = 0;
     for (ServerHolder server : serverHolders) {
-      Iterable<DataSegment> segments = server.getServer().getSegments();
+      Iterable<DataSegment> segments = server.getServer().getLazyAllSegments();
       for (DataSegment s : segments) {
         cost += computeJointSegmentsCost(s, segments);
       }
@@ -280,7 +280,7 @@ public class CostBalancerStrategy implements BalancerStrategy
   {
     double cost = 0;
     for (ServerHolder server : serverHolders) {
-      for (DataSegment segment : server.getServer().getSegments()) {
+      for (DataSegment segment : server.getServer().getLazyAllSegments()) {
         cost += computeJointSegmentsCost(segment, segment);
       }
     }
@@ -334,7 +334,7 @@ public class CostBalancerStrategy implements BalancerStrategy
     // the sum of the costs of other (exclusive of the proposalSegment) segments on the server
     cost += computeJointSegmentsCost(
         proposalSegment,
-        Iterables.filter(server.getServer().getSegments(), segment -> !proposalSegment.equals(segment))
+        Iterables.filter(server.getServer().getLazyAllSegments(), segment -> !proposalSegment.equals(segment))
     );
 
     // plus the costs of segments that will be loaded
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
index 2c9a334..a6b10bc 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java
@@ -79,13 +79,13 @@ public class CuratorLoadQueuePeon extends LoadQueuePeon
   private final AtomicInteger failedAssignCount = new AtomicInteger(0);
 
   private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
-      DruidCoordinator.SEGMENT_COMPARATOR
+      DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
   private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
-      DruidCoordinator.SEGMENT_COMPARATOR
+      DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
   private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
-      DruidCoordinator.SEGMENT_COMPARATOR
+      DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
 
   private final Object lock = new Object();
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
index 2e065be..dff2871 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategy.java
@@ -47,8 +47,8 @@ public class DiskNormalizedCostBalancerStrategy extends CostBalancerStrategy
     }
 
     int nSegments = 1;
-    if (server.getServer().getSegments().size() > 0) {
-      nSegments = server.getServer().getSegments().size();
+    if (server.getServer().getLazyAllSegments().size() > 0) {
+      nSegments = server.getServer().getLazyAllSegments().size();
     }
 
     double normalizedCost = cost / nSegments;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 98c97a0..3b5faef 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -81,7 +81,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -93,10 +92,29 @@ import java.util.stream.Collectors;
 @ManageLifecycle
 public class DruidCoordinator
 {
-  public static Comparator<DataSegment> SEGMENT_COMPARATOR = Ordering.from(Comparators.intervalsByEndThenStart())
-                                                                     .onResultOf(DataSegment::getInterval)
-                                                                     .compound(Ordering.<DataSegment>natural())
-                                                                     .reverse();
+  /**
+   * This comparator orders "freshest" segments first, i. e. segments with most recent intervals.
+   *
+   * It is used in historical nodes' {@link LoadQueuePeon}s to make historicals load more recent segment first.
+   *
+   * It is also used in {@link DruidCoordinatorRuntimeParams} for {@link
+   * DruidCoordinatorRuntimeParams#getAvailableSegments()} - a collection of segments to be considered during some
+   * coordinator run for different {@link DruidCoordinatorHelper}s. The order matters only for {@link
+   * DruidCoordinatorRuleRunner}, which tries to apply the rules while iterating the segments in the order imposed by
+   * this comparator. In {@link LoadRule} the throttling limit may be hit (via {@link ReplicationThrottler}; see
+   * {@link CoordinatorDynamicConfig#getReplicationThrottleLimit()}). So before we potentially hit this limit, we want
+   * to schedule loading the more recent segments (among all of those that need to be loaded).
+   *
+   * In both {@link LoadQueuePeon}s and {@link DruidCoordinatorRuleRunner}, we want to load more recent segments first
+   * because presumably they are queried more often and contain are more important data for users, so if the Druid
+   * cluster has availability problems and struggling to make all segments available immediately, at least we try to
+   * make more "important" (more recent) segments available as soon as possible.
+   */
+  static final Comparator<DataSegment> SEGMENT_COMPARATOR_RECENT_FIRST = Ordering
+      .from(Comparators.intervalsByEndThenStart())
+      .onResultOf(DataSegment::getInterval)
+      .compound(Ordering.<DataSegment>natural())
+      .reverse();
 
   private static final EmittingLogger log = new EmittingLogger(DruidCoordinator.class);
 
@@ -224,17 +242,18 @@ public class DruidCoordinator
     return loadManagementPeons;
   }
 
-  public Map<String, ? extends Object2LongMap<String>> getReplicationStatus()
+  /** @return tier -> { dataSource -> underReplicationCount } map */
+  public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
   {
-    final Map<String, Object2LongOpenHashMap<String>> retVal = new HashMap<>();
+    final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
 
     if (segmentReplicantLookup == null) {
-      return retVal;
+      return underReplicationCountsPerDataSourcePerTier;
     }
 
     final DateTime now = DateTimes.nowUtc();
 
-    for (final DataSegment segment : getAvailableDataSegments()) {
+    for (final DataSegment segment : iterateAvailableDataSegments()) {
       final List<Rule> rules = metadataRuleManager.getRulesWithDefault(segment.getDataSource());
 
       for (final Rule rule : rules) {
@@ -246,15 +265,16 @@ public class DruidCoordinator
             .getTieredReplicants()
             .forEach((final String tier, final Integer ruleReplicants) -> {
               int currentReplicants = segmentReplicantLookup.getLoadedReplicants(segment.getId(), tier);
-              retVal
-                  .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>())
+              Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
+                  .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
+              ((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
                   .addTo(segment.getDataSource(), Math.max(ruleReplicants - currentReplicants, 0));
             });
         break; // only the first matching rule applies
       }
     }
 
-    return retVal;
+    return underReplicationCountsPerDataSourcePerTier;
   }
 
   public Object2LongMap<String> getSegmentAvailability()
@@ -265,7 +285,7 @@ public class DruidCoordinator
       return retVal;
     }
 
-    for (DataSegment segment : getAvailableDataSegments()) {
+    for (DataSegment segment : iterateAvailableDataSegments()) {
       if (segmentReplicantLookup.getLoadedReplicants(segment.getId()) == 0) {
         retVal.addTo(segment.getDataSource(), 1);
       } else {
@@ -428,30 +448,15 @@ public class DruidCoordinator
     }
   }
 
-  public Set<DataSegment> getOrderedAvailableDataSegments()
-  {
-    Set<DataSegment> availableSegments = new TreeSet<>(SEGMENT_COMPARATOR);
-
-    Iterable<DataSegment> dataSegments = getAvailableDataSegments();
-
-    for (DataSegment dataSegment : dataSegments) {
-      if (dataSegment.getSize() < 0) {
-        log.makeAlert("No size on Segment, wtf?")
-           .addData("segment", dataSegment)
-           .emit();
-      }
-      availableSegments.add(dataSegment);
-    }
-
-    return availableSegments;
-  }
-
-  private List<DataSegment> getAvailableDataSegments()
+  /**
+   * Returns an iterable to go over all available segments in all data sources. The order in which segments are iterated
+   * is unspecified. Note: the iteration may not be as trivially cheap as, for example, iteration over an ArrayList. Try
+   * (to some reasonable extent) to organize the code so that it iterates the returned iterable only once rather than
+   * several times.
+   */
+  public Iterable<DataSegment> iterateAvailableDataSegments()
   {
-    return metadataSegmentManager.getDataSources()
-                                 .stream()
-                                 .flatMap(source -> source.getSegments().stream())
-                                 .collect(Collectors.toList());
+    return metadataSegmentManager.iterateAllSegments();
   }
 
   @LifecycleStart
@@ -735,7 +740,7 @@ public class DruidCoordinator
                              .build();
               },
               new DruidCoordinatorRuleRunner(DruidCoordinator.this),
-              new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
+              new DruidCoordinatorCleanupUnneeded(),
               new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this),
               new DruidCoordinatorBalancer(DruidCoordinator.this),
               new DruidCoordinatorLogger(DruidCoordinator.this)
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
index 0839038..655d6bd 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinatorRuntimeParams.java
@@ -20,6 +20,7 @@
 package org.apache.druid.server.coordinator;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -28,24 +29,35 @@ import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.joda.time.DateTime;
 
+import javax.annotation.Nullable;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeSet;
 
 /**
  */
 public class DruidCoordinatorRuntimeParams
 {
+  /**
+   * Creates a TreeSet sorted in {@link DruidCoordinator#SEGMENT_COMPARATOR_RECENT_FIRST} order and populates it with
+   * the segments from the given iterable. The given iterable is iterated exactly once. No special action is taken if
+   * duplicate segments are encountered in the iterable.
+   */
+  public static TreeSet<DataSegment> createAvailableSegmentsSet(Iterable<DataSegment> availableSegments)
+  {
+    TreeSet<DataSegment> segmentsSet = new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST);
+    availableSegments.forEach(segmentsSet::add);
+    return segmentsSet;
+  }
+
   private final long startTime;
   private final DruidCluster druidCluster;
   private final MetadataRuleManager databaseRuleManager;
   private final SegmentReplicantLookup segmentReplicantLookup;
   private final Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
-  private final Set<DataSegment> availableSegments;
+  private final @Nullable TreeSet<DataSegment> availableSegments;
   private final Map<String, LoadQueuePeon> loadManagementPeons;
   private final ReplicationThrottler replicationManager;
   private final ServiceEmitter emitter;
@@ -61,7 +73,7 @@ public class DruidCoordinatorRuntimeParams
       MetadataRuleManager databaseRuleManager,
       SegmentReplicantLookup segmentReplicantLookup,
       Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
-      Set<DataSegment> availableSegments,
+      @Nullable TreeSet<DataSegment> availableSegments,
       Map<String, LoadQueuePeon> loadManagementPeons,
       ReplicationThrottler replicationManager,
       ServiceEmitter emitter,
@@ -113,8 +125,9 @@ public class DruidCoordinatorRuntimeParams
     return dataSources;
   }
 
-  public Set<DataSegment> getAvailableSegments()
+  public TreeSet<DataSegment> getAvailableSegments()
   {
+    Preconditions.checkState(availableSegments != null, "availableSegments must be set");
     return availableSegments;
   }
 
@@ -196,7 +209,7 @@ public class DruidCoordinatorRuntimeParams
         databaseRuleManager,
         segmentReplicantLookup,
         dataSources,
-        new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR),
+        null, // availableSegments
         loadManagementPeons,
         replicationManager,
         emitter,
@@ -215,7 +228,7 @@ public class DruidCoordinatorRuntimeParams
     private MetadataRuleManager databaseRuleManager;
     private SegmentReplicantLookup segmentReplicantLookup;
     private Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources;
-    private final Set<DataSegment> availableSegments;
+    private @Nullable TreeSet<DataSegment> availableSegments;
     private final Map<String, LoadQueuePeon> loadManagementPeons;
     private ReplicationThrottler replicationManager;
     private ServiceEmitter emitter;
@@ -232,7 +245,7 @@ public class DruidCoordinatorRuntimeParams
       this.databaseRuleManager = null;
       this.segmentReplicantLookup = null;
       this.dataSources = new HashMap<>();
-      this.availableSegments = new TreeSet<>(DruidCoordinator.SEGMENT_COMPARATOR);
+      this.availableSegments = null;
       this.loadManagementPeons = new HashMap<>();
       this.replicationManager = null;
       this.emitter = null;
@@ -248,7 +261,7 @@ public class DruidCoordinatorRuntimeParams
         MetadataRuleManager databaseRuleManager,
         SegmentReplicantLookup segmentReplicantLookup,
         Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources,
-        Set<DataSegment> availableSegments,
+        @Nullable TreeSet<DataSegment> availableSegments,
         Map<String, LoadQueuePeon> loadManagementPeons,
         ReplicationThrottler replicationManager,
         ServiceEmitter emitter,
@@ -346,22 +359,37 @@ public class DruidCoordinatorRuntimeParams
       return this;
     }
 
+    /** This method must be used in test code only. */
     @VisibleForTesting
-    public Builder withAvailableSegments(DataSegment... availableSegments)
+    public Builder withAvailableSegmentsInTest(DataSegment... availableSegments)
     {
-      this.availableSegments.addAll(Arrays.asList(availableSegments));
-      return this;
+      return withAvailableSegmentsInTest(Arrays.asList(availableSegments));
     }
 
-    public Builder withAvailableSegments(Collection<DataSegment> availableSegments)
+    /** This method must be used in test code only. */
+    @VisibleForTesting
+    public Builder withAvailableSegmentsInTest(Collection<DataSegment> availableSegments)
     {
-      this.availableSegments.addAll(Collections.unmodifiableCollection(availableSegments));
+      return setAvailableSegments(createAvailableSegmentsSet(availableSegments));
+    }
+
+    /**
+     * Note: unlike {@link #withAvailableSegmentsInTest(Collection)}, this method doesn't make a defensive copy of the
+     * provided set. The set passed into this method must not be modified afterwards.
+     */
+    public Builder setAvailableSegments(TreeSet<DataSegment> availableSegments)
+    {
+      //noinspection ObjectEquality
+      if (availableSegments.comparator() != DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST) {
+        throw new IllegalArgumentException("Expected DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST");
+      }
+      this.availableSegments = availableSegments;
       return this;
     }
 
     public Builder withLoadManagementPeons(Map<String, LoadQueuePeon> loadManagementPeonsCollection)
     {
-      loadManagementPeons.putAll(Collections.unmodifiableMap(loadManagementPeonsCollection));
+      loadManagementPeons.putAll(loadManagementPeonsCollection);
       return this;
     }
 
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
index 703f52e..cbb0c26 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java
@@ -81,13 +81,13 @@ public class HttpLoadQueuePeon extends LoadQueuePeon
   private final AtomicInteger failedAssignCount = new AtomicInteger(0);
 
   private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap<>(
-      DruidCoordinator.SEGMENT_COMPARATOR
+      DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
   private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap<>(
-      DruidCoordinator.SEGMENT_COMPARATOR
+      DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
   private final ConcurrentSkipListSet<DataSegment> segmentsMarkedToDrop = new ConcurrentSkipListSet<>(
-      DruidCoordinator.SEGMENT_COMPARATOR
+      DruidCoordinator.SEGMENT_COMPARATOR_RECENT_FIRST
   );
 
   private final ScheduledExecutorService processingExecutor;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
index 0402c9e..e770ef7 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/ReservoirSegmentSampler.java
@@ -34,7 +34,7 @@ final class ReservoirSegmentSampler
     int numSoFar = 0;
 
     for (ServerHolder server : serverHolders) {
-      for (DataSegment segment : server.getServer().getSegments()) {
+      for (DataSegment segment : server.getServer().getLazyAllSegments()) {
         int randNum = ThreadLocalRandom.current().nextInt(numSoFar + 1);
         // w.p. 1 / (numSoFar+1), swap out the server and segment
         if (randNum == numSoFar) {
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
index 33e6309..ce08bfe 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicantLookup.java
@@ -43,7 +43,7 @@ public class SegmentReplicantLookup
       for (ServerHolder serverHolder : serversByType) {
         ImmutableDruidServer server = serverHolder.getServer();
 
-        for (DataSegment segment : server.getSegments()) {
+        for (DataSegment segment : server.getLazyAllSegments()) {
           Integer numReplicants = segmentsInCluster.get(segment.getId(), server.getTier());
           if (numReplicants == null) {
             numReplicants = 0;
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
index cf8d725..b4b900b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorBalancer.java
@@ -134,7 +134,7 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
 
     int numSegments = 0;
     for (ServerHolder sourceHolder : servers) {
-      numSegments += sourceHolder.getServer().getSegments().size();
+      numSegments += sourceHolder.getServer().getLazyAllSegments().size();
     }
 
     if (numSegments == 0) {
@@ -191,7 +191,18 @@ public class DruidCoordinatorBalancer implements DruidCoordinatorHelper
     //noinspection ForLoopThatDoesntUseLoopVariable
     for (int iter = 0; (moved + unmoved) < maxSegmentsToMove; ++iter) {
       final BalancerSegmentHolder segmentToMoveHolder = strategy.pickSegmentToMove(toMoveFrom);
-      if (segmentToMoveHolder != null && params.getAvailableSegments().contains(segmentToMoveHolder.getSegment())) {
+      if (segmentToMoveHolder == null) {
+        log.info("All servers to move segments from are empty, ending run.");
+        break;
+      }
+      // DruidCoordinatorRuntimeParams.getAvailableSegments originate from MetadataSegmentManager, i. e. that's a
+      // "desired" or "theoretical" set of segments. segmentToMoveHolder.getSegment originates from ServerInventoryView,
+      // i. e. that may be any segment that happens to be loaded on some server, even if it "shouldn't" from the
+      // "theoretical" point of view (Coordinator closes such discrepancies eventually via
+      // DruidCoordinatorCleanupUnneeded). Therefore the picked segmentToMoveHolder's segment may not need to be
+      // balanced.
+      boolean needToBalancePickedSegment = params.getAvailableSegments().contains(segmentToMoveHolder.getSegment());
+      if (needToBalancePickedSegment) {
         final DataSegment segmentToMove = segmentToMoveHolder.getSegment();
         final ImmutableDruidServer fromServer = segmentToMoveHolder.getFromServer();
         // we want to leave the server the segment is currently on in the list...
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
index 7c41f99..a7a1bcc 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupUnneeded.java
@@ -24,7 +24,6 @@ import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.server.coordinator.CoordinatorStats;
 import org.apache.druid.server.coordinator.DruidCluster;
-import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.server.coordinator.LoadQueuePeon;
 import org.apache.druid.server.coordinator.ServerHolder;
@@ -39,15 +38,6 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
 {
   private static final Logger log = new Logger(DruidCoordinatorCleanupUnneeded.class);
 
-  private final DruidCoordinator coordinator;
-
-  public DruidCoordinatorCleanupUnneeded(
-      DruidCoordinator coordinator
-  )
-  {
-    this.coordinator = coordinator;
-  }
-
   @Override
   public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
   {
@@ -55,47 +45,46 @@ public class DruidCoordinatorCleanupUnneeded implements DruidCoordinatorHelper
     Set<DataSegment> availableSegments = params.getAvailableSegments();
     DruidCluster cluster = params.getDruidCluster();
 
+    if (availableSegments.isEmpty()) {
+      log.info(
+          "Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent " +
+          "a race condition in which the coordinator would drop all segments if it started running cleanup before " +
+          "it finished polling the metadata storage for available segments for the first time."
+      );
+      return params.buildFromExisting().withCoordinatorStats(stats).build();
+    }
+
     // Drop segments that no longer exist in the available segments configuration, *if* it has been populated. (It might
     // not have been loaded yet since it's filled asynchronously. But it's also filled atomically, so if there are any
     // segments at all, we should have all of them.)
     // Note that if metadata store has no segments, then availableSegments will stay empty and nothing will be dropped.
     // This is done to prevent a race condition in which the coordinator would drop all segments if it started running
     // cleanup before it finished polling the metadata storage for available segments for the first time.
-    if (!availableSegments.isEmpty()) {
-      for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
-        for (ServerHolder serverHolder : serverHolders) {
-          ImmutableDruidServer server = serverHolder.getServer();
+    for (SortedSet<ServerHolder> serverHolders : cluster.getSortedHistoricalsByTier()) {
+      for (ServerHolder serverHolder : serverHolders) {
+        ImmutableDruidServer server = serverHolder.getServer();
 
-          for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
-            for (DataSegment segment : dataSource.getSegments()) {
-              if (!availableSegments.contains(segment)) {
-                LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
+        for (ImmutableDruidDataSource dataSource : server.getDataSources()) {
+          for (DataSegment segment : dataSource.getSegments()) {
+            if (!availableSegments.contains(segment)) {
+              LoadQueuePeon queuePeon = params.getLoadManagementPeons().get(server.getName());
 
-                if (!queuePeon.getSegmentsToDrop().contains(segment)) {
-                  queuePeon.dropSegment(segment, () -> {});
-                  stats.addToTieredStat("unneededCount", server.getTier(), 1);
-                  log.info(
-                      "Dropping uneeded segment [%s] from server [%s] in tier [%s]",
-                      segment.getId(),
-                      server.getName(),
-                      server.getTier()
-                  );
-                }
+              if (!queuePeon.getSegmentsToDrop().contains(segment)) {
+                queuePeon.dropSegment(segment, () -> {});
+                stats.addToTieredStat("unneededCount", server.getTier(), 1);
+                log.info(
+                    "Dropping uneeded segment [%s] from server [%s] in tier [%s]",
+                    segment.getId(),
+                    server.getName(),
+                    server.getTier()
+                );
               }
             }
           }
         }
       }
-    } else {
-      log.info(
-          "Found 0 availableSegments, skipping the cleanup of segments from historicals. This is done to prevent a race condition in which the coordinator would drop all segments if it started running cleanup before it finished polling the metadata storage for available segments for the first time."
-      );
     }
 
-    return params.buildFromExisting()
-                 .withCoordinatorStats(stats)
-                 .build();
+    return params.buildFromExisting().withCoordinatorStats(stats).build();
   }
-
-
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorLogger.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorLogger.java
index 1e7ef94..a271109 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorLogger.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorLogger.java
@@ -227,17 +227,17 @@ public class DruidCoordinatorLogger implements DruidCoordinatorHelper
         }
     );
 
-    coordinator.getReplicationStatus().forEach(
-        (final String tier, final Object2LongMap<String> status) -> {
-          for (final Object2LongMap.Entry<String> entry : status.object2LongEntrySet()) {
+    coordinator.computeUnderReplicationCountsPerDataSourcePerTier().forEach(
+        (final String tier, final Object2LongMap<String> underReplicationCountsPerDataSource) -> {
+          for (final Object2LongMap.Entry<String> entry : underReplicationCountsPerDataSource.object2LongEntrySet()) {
             final String dataSource = entry.getKey();
-            final long count = entry.getLongValue();
+            final long underReplicationCount = entry.getLongValue();
 
             emitter.emit(
                 new ServiceMetricEvent.Builder()
                     .setDimension(DruidMetrics.TIER, tier)
                     .setDimension(DruidMetrics.DATASOURCE, dataSource).build(
-                    "segment/underReplicated/count", count
+                    "segment/underReplicated/count", underReplicationCount
                 )
             );
           }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
index 524797d..7d78301 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorRuleRunner.java
@@ -89,43 +89,16 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
     // find available segments which are not overshadowed by other segments in DB
     // only those would need to be loaded/dropped
     // anything overshadowed by served segments is dropped automatically by DruidCoordinatorCleanupOvershadowed
-    Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
-    for (DataSegment segment : params.getAvailableSegments()) {
-      VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(segment.getDataSource());
-      if (timeline == null) {
-        timeline = new VersionedIntervalTimeline<>(Ordering.natural());
-        timelines.put(segment.getDataSource(), timeline);
-      }
-
-      timeline.add(
-          segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)
-      );
-    }
-
-    Set<DataSegment> overshadowed = new HashSet<>();
-    for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) {
-      for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
-        for (DataSegment dataSegment : holder.getObject().payloads()) {
-          overshadowed.add(dataSegment);
-        }
-      }
-    }
-
-    Set<DataSegment> nonOvershadowed = new HashSet<>();
-    for (DataSegment dataSegment : params.getAvailableSegments()) {
-      if (!overshadowed.contains(dataSegment)) {
-        nonOvershadowed.add(dataSegment);
-      }
-    }
+    Set<DataSegment> overshadowed = determineOvershadowedSegments(params);
 
     for (String tier : cluster.getTierNames()) {
       replicatorThrottler.updateReplicationState(tier);
     }
 
-    DruidCoordinatorRuntimeParams paramsWithReplicationManager = params.buildFromExistingWithoutAvailableSegments()
-                                                                       .withReplicationManager(replicatorThrottler)
-                                                                       .withAvailableSegments(nonOvershadowed)
-                                                                       .build();
+    DruidCoordinatorRuntimeParams paramsWithReplicationManager = params
+        .buildFromExistingWithoutAvailableSegments()
+        .withReplicationManager(replicatorThrottler)
+        .build();
 
     // Run through all matched rules for available segments
     DateTime now = DateTimes.nowUtc();
@@ -133,7 +106,11 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
 
     final List<SegmentId> segmentsWithMissingRules = Lists.newArrayListWithCapacity(MAX_MISSING_RULES);
     int missingRules = 0;
-    for (DataSegment segment : paramsWithReplicationManager.getAvailableSegments()) {
+    for (DataSegment segment : params.getAvailableSegments()) {
+      if (overshadowed.contains(segment)) {
+        // Skipping overshadowed segments
+        continue;
+      }
       List<Rule> rules = databaseRuleManager.getRulesWithDefault(segment.getDataSource());
       boolean foundMatchingRule = false;
       for (Rule rule : rules) {
@@ -159,9 +136,26 @@ public class DruidCoordinatorRuleRunner implements DruidCoordinatorHelper
          .emit();
     }
 
-    return paramsWithReplicationManager.buildFromExistingWithoutAvailableSegments()
-                                       .withCoordinatorStats(stats)
-                                       .withAvailableSegments(params.getAvailableSegments())
-                                       .build();
+    return params.buildFromExisting().withCoordinatorStats(stats).build();
+  }
+
+  private Set<DataSegment> determineOvershadowedSegments(DruidCoordinatorRuntimeParams params)
+  {
+    Map<String, VersionedIntervalTimeline<String, DataSegment>> timelines = new HashMap<>();
+    for (DataSegment segment : params.getAvailableSegments()) {
+      timelines
+          .computeIfAbsent(segment.getDataSource(), dataSource -> new VersionedIntervalTimeline<>(Ordering.natural()))
+          .add(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment));
+    }
+
+    Set<DataSegment> overshadowed = new HashSet<>();
+    for (VersionedIntervalTimeline<String, DataSegment> timeline : timelines.values()) {
+      for (TimelineObjectHolder<String, DataSegment> holder : timeline.findOvershadowed()) {
+        for (DataSegment dataSegment : holder.getObject().payloads()) {
+          overshadowed.add(dataSegment);
+        }
+      }
+    }
+    return overshadowed;
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
index 788b0d5..801bd3b 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorSegmentInfoLoader.java
@@ -19,18 +19,19 @@
 
 package org.apache.druid.server.coordinator.helper;
 
-import org.apache.druid.java.util.common.logger.Logger;
+import com.google.common.collect.Iterables;
+import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
 import org.apache.druid.timeline.DataSegment;
 
-import java.util.Set;
+import java.util.TreeSet;
 
 public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
 {
-  private final DruidCoordinator coordinator;
+  private static final EmittingLogger log = new EmittingLogger(DruidCoordinatorSegmentInfoLoader.class);
 
-  private static final Logger log = new Logger(DruidCoordinatorSegmentInfoLoader.class);
+  private final DruidCoordinator coordinator;
 
   public DruidCoordinatorSegmentInfoLoader(DruidCoordinator coordinator)
   {
@@ -42,8 +43,31 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
   {
     log.info("Starting coordination. Getting available segments.");
 
-    // Display info about all available segments
-    final Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
+    // The following transform() call doesn't actually transform the iterable. It only checks the sizes of the segments
+    // and emits alerts if segments with negative sizes are encountered. In other words, semantically it's similar to
+    // Stream.peek(). It works as long as DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() (which is called
+    // below) guarantees to go over the passed iterable exactly once.
+    //
+    // An iterable returned from iterateAvailableDataSegments() is not simply iterated (with size checks) before passing
+    // into DruidCoordinatorRuntimeParams.createAvailableSegmentsSet() because iterateAvailableDataSegments()'s
+    // documentation says to strive to avoid iterating the result more than once.
+    //
+    //noinspection StaticPseudoFunctionalStyleMethod: https://youtrack.jetbrains.com/issue/IDEA-153047
+    Iterable<DataSegment> availableSegmentsWithSizeChecking = Iterables.transform(
+        coordinator.iterateAvailableDataSegments(),
+        segment -> {
+          if (segment.getSize() < 0) {
+            log.makeAlert("No size on a segment")
+               .addData("segment", segment)
+               .emit();
+          }
+          return segment;
+        }
+    );
+    final TreeSet<DataSegment> availableSegments =
+        DruidCoordinatorRuntimeParams.createAvailableSegmentsSet(availableSegmentsWithSizeChecking);
+
+    // Log info about all available segments
     if (log.isDebugEnabled()) {
       log.debug("Available DataSegments");
       for (DataSegment dataSegment : availableSegments) {
@@ -54,7 +78,7 @@ public class DruidCoordinatorSegmentInfoLoader implements DruidCoordinatorHelper
     log.info("Found [%,d] available segments.", availableSegments.size());
 
     return params.buildFromExisting()
-                 .withAvailableSegments(availableSegments)
+                 .setAvailableSegments(availableSegments)
                  .build();
   }
 }
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
index f2c52a1..48aff51 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
@@ -43,7 +43,6 @@ import org.joda.time.Interval;
     @JsonSubTypes.Type(name = IntervalBroadcastDistributionRule.TYPE, value = IntervalBroadcastDistributionRule.class),
     @JsonSubTypes.Type(name = PeriodBroadcastDistributionRule.TYPE, value = PeriodBroadcastDistributionRule.class)
 })
-
 public interface Rule
 {
   String getType();
@@ -52,5 +51,16 @@ public interface Rule
 
   boolean appliesTo(Interval interval, DateTime referenceTimestamp);
 
+  /**
+   * {@link DruidCoordinatorRuntimeParams#getAvailableSegments()} must not be called in Rule's code, because the
+   * available segments are not specified for the {@link DruidCoordinatorRuntimeParams} passed into Rule's code. This is
+   * because {@link DruidCoordinatorRuntimeParams} entangles two slightly different (nonexistent yet) abstractions:
+   * "DruidCoordinatorHelperParams" and "RuleParams" which contain params that only {@link
+   * org.apache.druid.server.coordinator.helper.DruidCoordinatorHelper}s and Rules need, respectively.
+   * For example, {@link org.apache.druid.server.coordinator.ReplicationThrottler} needs to belong only to "RuleParams",
+   * but not "DruidCoordinatorHelperParams". The opposite for "AvailableSegments".
+   *
+   * See https://github.com/apache/incubator-druid/issues/7228
+   */
   CoordinatorStats run(DruidCoordinator coordinator, DruidCoordinatorRuntimeParams params, DataSegment segment);
 }
diff --git a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java
index 6578d74..779e909 100644
--- a/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java
@@ -95,7 +95,7 @@ public class CoordinatorResource
     }
 
     if (full != null) {
-      return Response.ok(coordinator.getReplicationStatus()).build();
+      return Response.ok(coordinator.computeUnderReplicationCountsPerDataSourcePerTier()).build();
     }
     return Response.ok(coordinator.getLoadStatus()).build();
   }
diff --git a/server/src/main/java/org/apache/druid/server/http/ServersResource.java b/server/src/main/java/org/apache/druid/server/http/ServersResource.java
index 437eb5a..9b31a67 100644
--- a/server/src/main/java/org/apache/druid/server/http/ServersResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/ServersResource.java
@@ -80,8 +80,8 @@ public class ServersResource
    * *and* values, but is done so for compatibility with the existing HTTP JSON API.
    *
    * Returns a lazy map suitable for serialization (i. e. entrySet iteration) only, relying on the fact that the
-   * segments returned from {@link DruidServer#getSegments()} are unique. This is not a part of the {@link DruidServer}
-   * API to not let abuse this map (like trying to get() from it).
+   * segments returned from {@link DruidServer#iterateAllSegments()} are unique. This is not a part of the {@link
+   * DruidServer} API to not let abuse this map (like trying to get() from it).
    */
   private static Map<SegmentId, DataSegment> createLazySegmentsMap(DruidServer server)
   {
@@ -96,7 +96,7 @@ public class ServersResource
           public Iterator<Entry<SegmentId, DataSegment>> iterator()
           {
             return Iterators.transform(
-                server.getSegments().iterator(),
+                server.iterateAllSegments().iterator(),
                 segment -> new AbstractMap.SimpleImmutableEntry<>(segment.getId(), segment)
             );
           }
@@ -173,11 +173,11 @@ public class ServersResource
     }
 
     if (full != null) {
-      return builder.entity(Iterables.toString(server.getSegments())).build();
+      return builder.entity(Iterables.toString(server.iterateAllSegments())).build();
     }
 
     return builder
-        .entity(Iterables.toString(Iterables.transform(server.getSegments(), DataSegment::getId)))
+        .entity(Iterables.toString(Iterables.transform(server.iterateAllSegments(), DataSegment::getId)))
         .build();
   }
 
diff --git a/server/src/main/java/org/apache/druid/server/http/TiersResource.java b/server/src/main/java/org/apache/druid/server/http/TiersResource.java
index 4debc83..51a0fda 100644
--- a/server/src/main/java/org/apache/druid/server/http/TiersResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/TiersResource.java
@@ -104,7 +104,7 @@ public class TiersResource
       Map<String, Map<Interval, Map<IntervalProperties, Object>>> tierToStatsPerInterval = new HashMap<>();
       for (DruidServer druidServer : serverInventoryView.getInventory()) {
         if (druidServer.getTier().equalsIgnoreCase(tierName)) {
-          for (DataSegment dataSegment : druidServer.getSegments()) {
+          for (DataSegment dataSegment : druidServer.iterateAllSegments()) {
             Map<IntervalProperties, Object> properties = tierToStatsPerInterval
                 .computeIfAbsent(dataSegment.getDataSource(), dsName -> new HashMap<>())
                 .computeIfAbsent(dataSegment.getInterval(), interval -> new EnumMap<>(IntervalProperties.class));
diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
index d4f4890..5e07b7f 100644
--- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java
@@ -259,7 +259,7 @@ public class HttpServerInventoryViewTest
     DruidServer druidServer = httpServerInventoryView.getInventoryValue("host:8080");
     Assert.assertEquals(
         ImmutableMap.of(segment3.getId(), segment3, segment4.getId(), segment4),
-        Maps.uniqueIndex(druidServer.getSegments(), DataSegment::getId)
+        Maps.uniqueIndex(druidServer.iterateAllSegments(), DataSegment::getId)
     );
 
     druidNodeDiscovery.listener.nodesRemoved(ImmutableList.of(druidNode));
diff --git a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
index 1586b60..d2de54f 100644
--- a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
+++ b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java
@@ -237,7 +237,7 @@ public class BatchServerInventoryViewTest
     waitForSync(batchServerInventoryView, testSegments);
 
     DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
-    Set<DataSegment> segments = Sets.newHashSet(server.getSegments());
+    Set<DataSegment> segments = Sets.newHashSet(server.iterateAllSegments());
 
     Assert.assertEquals(testSegments, segments);
 
@@ -251,7 +251,7 @@ public class BatchServerInventoryViewTest
 
     waitForSync(batchServerInventoryView, testSegments);
 
-    Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
+    Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
 
     segmentAnnouncer.unannounceSegment(segment1);
     segmentAnnouncer.unannounceSegment(segment2);
@@ -260,7 +260,7 @@ public class BatchServerInventoryViewTest
 
     waitForSync(batchServerInventoryView, testSegments);
 
-    Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
+    Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
   }
 
   @Test
@@ -271,7 +271,7 @@ public class BatchServerInventoryViewTest
     waitForSync(filteredBatchServerInventoryView, testSegments);
 
     DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
-    Set<DataSegment> segments = Sets.newHashSet(server.getSegments());
+    Set<DataSegment> segments = Sets.newHashSet(server.iterateAllSegments());
 
     Assert.assertEquals(testSegments, segments);
     int prevUpdateCount = inventoryUpdateCounter.get();
@@ -297,7 +297,7 @@ public class BatchServerInventoryViewTest
     waitForSync(filteredBatchServerInventoryView, testSegments);
 
     DruidServer server = Iterables.get(filteredBatchServerInventoryView.getInventory(), 0);
-    Set<DataSegment> segments = Sets.newHashSet(server.getSegments());
+    Set<DataSegment> segments = Sets.newHashSet(server.iterateAllSegments());
 
     Assert.assertEquals(testSegments, segments);
 
@@ -393,7 +393,7 @@ public class BatchServerInventoryViewTest
     final Timing forWaitingTiming = timing.forWaiting();
     Stopwatch stopwatch = Stopwatch.createStarted();
     while (Iterables.isEmpty(batchServerInventoryView.getInventory())
-           || Iterables.size(Iterables.get(batchServerInventoryView.getInventory(), 0).getSegments()) !=
+           || Iterables.size(Iterables.get(batchServerInventoryView.getInventory(), 0).iterateAllSegments()) !=
               testSegments.size()) {
       Thread.sleep(100);
       if (stopwatch.elapsed(TimeUnit.MILLISECONDS) > forWaitingTiming.milliseconds()) {
@@ -430,7 +430,7 @@ public class BatchServerInventoryViewTest
     waitForSync(batchServerInventoryView, testSegments);
 
     DruidServer server = Iterables.get(batchServerInventoryView.getInventory(), 0);
-    final Set<DataSegment> segments = Sets.newHashSet(server.getSegments());
+    final Set<DataSegment> segments = Sets.newHashSet(server.iterateAllSegments());
 
     Assert.assertEquals(testSegments, segments);
 
@@ -498,7 +498,7 @@ public class BatchServerInventoryViewTest
     Assert.assertEquals(INITIAL_SEGMENTS * 2, testSegments.size());
     waitForSync(batchServerInventoryView, testSegments);
 
-    Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
+    Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
 
     for (int i = 0; i < INITIAL_SEGMENTS; ++i) {
       final DataSegment segment = makeSegment(100 + i);
@@ -508,6 +508,6 @@ public class BatchServerInventoryViewTest
 
     waitForSync(batchServerInventoryView, testSegments);
 
-    Assert.assertEquals(testSegments, Sets.newHashSet(server.getSegments()));
+    Assert.assertEquals(testSegments, Sets.newHashSet(server.iterateAllSegments()));
   }
 }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
index 8a5cf43..e6aa17e 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CostBalancerStrategyTest.java
@@ -94,7 +94,7 @@ public class CostBalancerStrategyTest
       segments.put(segment.getId(), segment);
       EasyMock.expect(druidServer.getSegment(segment.getId())).andReturn(segment).anyTimes();
     }
-    EasyMock.expect(druidServer.getSegments()).andReturn(segments.values()).anyTimes();
+    EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments.values()).anyTimes();
 
     EasyMock.replay(druidServer);
     serverHolderList.add(new ServerHolder(druidServer, fromPeon));
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
index 94fecdb..25dce8c 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/CuratorDruidCoordinatorTest.java
@@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
 import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
 import org.apache.curator.utils.ZKPaths;
@@ -35,6 +36,7 @@ import org.apache.druid.client.DruidServer;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.curator.CuratorUtils;
 import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -324,10 +326,10 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     // for the destination and unannouncing from source server when noticing a drop request
 
     sourceLoadQueueChildrenCache.getListenable().addListener(
-        (curatorFramework, pathChildrenCacheEvent) -> {
-          if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
+        (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
+          if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
             srcCountdown.countDown();
-          } else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+          } else if (CuratorUtils.isChildAdded(event)) {
             //Simulate source server dropping segment
             unannounceSegmentFromBatchForServer(source, segmentToMove, sourceSegKeys.get(2), zkPathsConfig);
           }
@@ -335,10 +337,10 @@ public class CuratorDruidCoordinatorTest extends CuratorTestBase
     );
 
     destinationLoadQueueChildrenCache.getListenable().addListener(
-        (curatorFramework, pathChildrenCacheEvent) -> {
-          if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
+        (CuratorFramework curatorFramework, PathChildrenCacheEvent event) -> {
+          if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
             destCountdown.countDown();
-          } else if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+          } else if (CuratorUtils.isChildAdded(event)) {
             //Simulate destination server loading segment
             announceBatchSegmentsForServer(dest, ImmutableSet.of(segmentToMove), zkPathsConfig, jsonMapper);
           }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java
index fc2a17c..0bbe46d 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DiskNormalizedCostBalancerStrategyTest.java
@@ -88,7 +88,7 @@ public class DiskNormalizedCostBalancerStrategyTest
       segments.add(segment);
       EasyMock.expect(druidServer.getSegment(segment.getId())).andReturn(segment).anyTimes();
     }
-    EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
+    EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments).anyTimes();
 
     EasyMock.replay(druidServer);
     serverHolderList.add(new ServerHolder(druidServer, fromPeon));
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
index 719a2b4..ed4f600 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerProfiler.java
@@ -116,9 +116,9 @@ public class DruidCoordinatorBalancerProfiler
       EasyMock.expect(server.getName()).andReturn(Integer.toString(i)).atLeastOnce();
       EasyMock.expect(server.getHost()).andReturn(Integer.toString(i)).anyTimes();
       if (i == 0) {
-        EasyMock.expect(server.getSegments()).andReturn(segments).anyTimes();
+        EasyMock.expect(server.getLazyAllSegments()).andReturn(segments).anyTimes();
       } else {
-        EasyMock.expect(server.getSegments()).andReturn(Collections.emptyList()).anyTimes();
+        EasyMock.expect(server.getLazyAllSegments()).andReturn(Collections.emptyList()).anyTimes();
       }
       EasyMock.expect(server.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
       EasyMock.replay(server);
@@ -148,7 +148,7 @@ public class DruidCoordinatorBalancerProfiler
                                 .withLoadManagementPeons(
                                     peonMap
                                 )
-                                .withAvailableSegments(segments)
+                                .withAvailableSegmentsInTest(segments)
                                 .withDynamicConfigs(
                                     CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
                                         MAX_SEGMENTS_TO_MOVE
@@ -197,7 +197,7 @@ public class DruidCoordinatorBalancerProfiler
     EasyMock.expect(druidServer1.getName()).andReturn("from").atLeastOnce();
     EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
     EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
-    EasyMock.expect(druidServer1.getSegments()).andReturn(segments).anyTimes();
+    EasyMock.expect(druidServer1.getLazyAllSegments()).andReturn(segments).anyTimes();
     EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer1);
 
@@ -205,7 +205,7 @@ public class DruidCoordinatorBalancerProfiler
     EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
     EasyMock.expect(druidServer2.getCurrSize()).andReturn(0L).atLeastOnce();
     EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
-    EasyMock.expect(druidServer2.getSegments()).andReturn(Collections.emptyList()).anyTimes();
+    EasyMock.expect(druidServer2.getLazyAllSegments()).andReturn(Collections.emptyList()).anyTimes();
     EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer2);
 
@@ -246,7 +246,7 @@ public class DruidCoordinatorBalancerProfiler
                                         toPeon
                                     )
                                 )
-                                .withAvailableSegments(segments)
+                                .withAvailableSegmentsInTest(segments)
                                 .withDynamicConfigs(
                                     CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
                                         MAX_SEGMENTS_TO_MOVE
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java
index dbd3048..0a30898 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorBalancerTest.java
@@ -535,7 +535,7 @@ public class DruidCoordinatorBalancerTest
                 .boxed()
                 .collect(Collectors.toMap(i -> String.valueOf(i + 1), peons::get))
         )
-        .withAvailableSegments(segments)
+        .withAvailableSegmentsInTest(segments)
         .withDynamicConfigs(
             CoordinatorDynamicConfig.builder().withMaxSegmentsToMove(
                 MAX_SEGMENTS_TO_MOVE
@@ -558,7 +558,7 @@ public class DruidCoordinatorBalancerTest
     EasyMock.expect(druidServer.getTier()).andReturn(tier).anyTimes();
     EasyMock.expect(druidServer.getCurrSize()).andReturn(currentSize).atLeastOnce();
     EasyMock.expect(druidServer.getMaxSize()).andReturn(maxSize).atLeastOnce();
-    EasyMock.expect(druidServer.getSegments()).andReturn(segments).anyTimes();
+    EasyMock.expect(druidServer.getLazyAllSegments()).andReturn(segments).anyTimes();
     EasyMock.expect(druidServer.getHost()).andReturn(name).anyTimes();
     EasyMock.expect(druidServer.getType()).andReturn(ServerType.HISTORICAL).anyTimes();
     if (!segments.isEmpty()) {
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
index 80dfd82..51a7930 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java
@@ -198,7 +198,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params =
         new DruidCoordinatorRuntimeParams.Builder()
             .withDruidCluster(druidCluster)
-            .withAvailableSegments(availableSegments)
+            .withAvailableSegmentsInTest(availableSegments)
             .withDatabaseRuleManager(databaseRuleManager)
             .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
             .withBalancerStrategy(balancerStrategy)
@@ -304,7 +304,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params =
         new DruidCoordinatorRuntimeParams.Builder()
             .withDruidCluster(druidCluster)
-            .withAvailableSegments(availableSegments)
+            .withAvailableSegmentsInTest(availableSegments)
             .withDatabaseRuleManager(databaseRuleManager)
             .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
             .withBalancerStrategy(balancerStrategy)
@@ -403,7 +403,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params =
         new DruidCoordinatorRuntimeParams.Builder()
             .withDruidCluster(druidCluster)
-            .withAvailableSegments(availableSegments)
+            .withAvailableSegmentsInTest(availableSegments)
             .withDatabaseRuleManager(databaseRuleManager)
             .withSegmentReplicantLookup(segmentReplicantLookup)
             .withBalancerStrategy(balancerStrategy)
@@ -478,7 +478,7 @@ public class DruidCoordinatorRuleRunnerTest
         new DruidCoordinatorRuntimeParams.Builder()
             .withEmitter(emitter)
             .withDruidCluster(druidCluster)
-            .withAvailableSegments(availableSegments)
+            .withAvailableSegmentsInTest(availableSegments)
             .withDatabaseRuleManager(databaseRuleManager)
             .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
             .withBalancerStrategy(balancerStrategy)
@@ -540,7 +540,7 @@ public class DruidCoordinatorRuleRunnerTest
         new DruidCoordinatorRuntimeParams.Builder()
             .withEmitter(emitter)
             .withDruidCluster(druidCluster)
-            .withAvailableSegments(availableSegments)
+            .withAvailableSegmentsInTest(availableSegments)
             .withDatabaseRuleManager(databaseRuleManager)
             .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
             .build();
@@ -609,7 +609,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
         .withDruidCluster(druidCluster)
         .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
-        .withAvailableSegments(availableSegments)
+        .withAvailableSegmentsInTest(availableSegments)
         .withDatabaseRuleManager(databaseRuleManager)
         .withSegmentReplicantLookup(segmentReplicantLookup)
         .withBalancerStrategy(balancerStrategy)
@@ -695,7 +695,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
         .withDruidCluster(druidCluster)
         .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
-        .withAvailableSegments(availableSegments)
+        .withAvailableSegmentsInTest(availableSegments)
         .withDatabaseRuleManager(databaseRuleManager)
         .withSegmentReplicantLookup(segmentReplicantLookup)
         .withBalancerStrategy(balancerStrategy)
@@ -786,7 +786,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
         .withDruidCluster(druidCluster)
         .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
-        .withAvailableSegments(availableSegments)
+        .withAvailableSegmentsInTest(availableSegments)
         .withDatabaseRuleManager(databaseRuleManager)
         .withSegmentReplicantLookup(segmentReplicantLookup)
         .withBalancerStrategy(balancerStrategy)
@@ -865,7 +865,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
         .withDruidCluster(druidCluster)
         .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
-        .withAvailableSegments(availableSegments)
+        .withAvailableSegmentsInTest(availableSegments)
         .withDatabaseRuleManager(databaseRuleManager)
         .withSegmentReplicantLookup(segmentReplicantLookup)
         .withBalancerStrategy(balancerStrategy)
@@ -963,7 +963,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
         .withDruidCluster(druidCluster)
         .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
-        .withAvailableSegments(availableSegments)
+        .withAvailableSegmentsInTest(availableSegments)
         .withDatabaseRuleManager(databaseRuleManager)
         .withSegmentReplicantLookup(segmentReplicantLookup)
         .withBalancerStrategy(balancerStrategy)
@@ -1047,7 +1047,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params =
         new DruidCoordinatorRuntimeParams.Builder()
             .withDruidCluster(druidCluster)
-            .withAvailableSegments(availableSegments)
+            .withAvailableSegmentsInTest(availableSegments)
             .withDatabaseRuleManager(databaseRuleManager)
             .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
             .withBalancerStrategy(balancerStrategy)
@@ -1077,7 +1077,7 @@ public class DruidCoordinatorRuleRunnerTest
         new DruidCoordinatorRuntimeParams.Builder()
             .withDruidCluster(druidCluster)
             .withEmitter(emitter)
-            .withAvailableSegments(Collections.singletonList(overFlowSegment))
+            .withAvailableSegmentsInTest(Collections.singletonList(overFlowSegment))
             .withDatabaseRuleManager(databaseRuleManager)
             .withBalancerStrategy(balancerStrategy)
             .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
@@ -1180,7 +1180,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params =
         new DruidCoordinatorRuntimeParams.Builder()
             .withDruidCluster(druidCluster)
-            .withAvailableSegments(availableSegments)
+            .withAvailableSegmentsInTest(availableSegments)
             .withDatabaseRuleManager(databaseRuleManager)
             .withBalancerStrategy(balancerStrategy)
             .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
@@ -1286,7 +1286,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params = new DruidCoordinatorRuntimeParams.Builder()
         .withDruidCluster(druidCluster)
         .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMillisToWaitBeforeDeleting(0L).build())
-        .withAvailableSegments(longerAvailableSegments)
+        .withAvailableSegmentsInTest(longerAvailableSegments)
         .withDatabaseRuleManager(databaseRuleManager)
         .withSegmentReplicantLookup(segmentReplicantLookup)
         .withBalancerStrategy(balancerStrategy)
@@ -1369,7 +1369,7 @@ public class DruidCoordinatorRuleRunnerTest
     DruidCoordinatorRuntimeParams params =
         new DruidCoordinatorRuntimeParams.Builder()
             .withDruidCluster(druidCluster)
-            .withAvailableSegments(availableSegments)
+            .withAvailableSegmentsInTest(availableSegments)
             .withDatabaseRuleManager(databaseRuleManager)
             .withSegmentReplicantLookup(SegmentReplicantLookup.make(new DruidCluster()))
             .withBalancerStrategy(balancerStrategy)
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
index f24f066..1f4f7e2 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorTest.java
@@ -36,6 +36,7 @@ import org.apache.druid.client.ImmutableDruidServer;
 import org.apache.druid.client.SingleServerInventoryView;
 import org.apache.druid.common.config.JacksonConfigManager;
 import org.apache.druid.curator.CuratorTestBase;
+import org.apache.druid.curator.CuratorUtils;
 import org.apache.druid.curator.discovery.NoopServiceAnnouncer;
 import org.apache.druid.discovery.DruidLeaderSelector;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -59,18 +60,15 @@ import org.apache.druid.timeline.SegmentId;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.joda.time.Duration;
-import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayList;
+import javax.annotation.Nullable;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
@@ -82,13 +80,15 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class DruidCoordinatorTest extends CuratorTestBase
 {
+  private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
+  private static final long COORDINATOR_START_DELAY = 1;
+  private static final long COORDINATOR_PERIOD = 100;
+
   private DruidCoordinator coordinator;
   private MetadataSegmentManager databaseSegmentManager;
   private SingleServerInventoryView serverInventoryView;
   private ScheduledExecutorFactory scheduledExecutorFactory;
   private DruidServer druidServer;
-  private DruidServer druidServer2;
-  private DataSegment segment;
   private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
   private LoadQueuePeon loadQueuePeon;
   private MetadataRuleManager metadataRuleManager;
@@ -97,12 +97,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
   private PathChildrenCache pathChildrenCache;
   private DruidCoordinatorConfig druidCoordinatorConfig;
   private ObjectMapper objectMapper;
-  private JacksonConfigManager configManager;
   private DruidNode druidNode;
   private LatchableServiceEmitter serviceEmitter = new LatchableServiceEmitter();
-  private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
-  private static final long COORDINATOR_START_DELAY = 1;
-  private static final long COORDINATOR_PERIOD = 100;
 
   @Before
   public void setUp() throws Exception
@@ -111,7 +107,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     serverInventoryView = EasyMock.createMock(SingleServerInventoryView.class);
     databaseSegmentManager = EasyMock.createNiceMock(MetadataSegmentManager.class);
     metadataRuleManager = EasyMock.createNiceMock(MetadataRuleManager.class);
-    configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
+    JacksonConfigManager configManager = EasyMock.createNiceMock(JacksonConfigManager.class);
     EasyMock.expect(
         configManager.watch(
             EasyMock.eq(CoordinatorDynamicConfig.CONFIG_KEY),
@@ -228,7 +224,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
   @Test
   public void testMoveSegment()
   {
-    segment = EasyMock.createNiceMock(DataSegment.class);
+    final DataSegment segment = EasyMock.createNiceMock(DataSegment.class);
     EasyMock.expect(segment.getId()).andReturn(SegmentId.dummy("dummySegment"));
     EasyMock.expect(segment.getDataSource()).andReturn("dummyDataSource");
     EasyMock.replay(segment);
@@ -269,7 +265,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     ).atLeastOnce();
     EasyMock.replay(druidServer);
 
-    druidServer2 = EasyMock.createMock(DruidServer.class);
+    DruidServer druidServer2 = EasyMock.createMock(DruidServer.class);
 
     EasyMock.expect(druidServer2.toImmutableDruidServer()).andReturn(
         new ImmutableDruidServer(
@@ -324,7 +320,7 @@ public class DruidCoordinatorTest extends CuratorTestBase
     EasyMock.replay(metadataRuleManager);
 
     // Setup MetadataSegmentManager
-    DruidDataSource[] druidDataSources = {
+    DruidDataSource[] dataSources = {
         new DruidDataSource(dataSource, Collections.emptyMap())
     };
     final DataSegment dataSegment = new DataSegment(
@@ -338,13 +334,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
         0x9,
         0
     );
-    druidDataSources[0].addSegment(dataSegment);
+    dataSources[0].addSegment(dataSegment);
 
-    EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
-    EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
-        ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
-    ).atLeastOnce();
-    EasyMock.replay(databaseSegmentManager);
+    setupMetadataSegmentManagerMock(dataSources[0]);
     ImmutableDruidDataSource immutableDruidDataSource = EasyMock.createNiceMock(ImmutableDruidDataSource.class);
     EasyMock.expect(immutableDruidDataSource.getSegments())
             .andReturn(ImmutableSet.of(dataSegment)).atLeastOnce();
@@ -372,9 +364,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
         new PathChildrenCacheListener()
         {
           @Override
-          public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
+          public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent event)
           {
-            if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
+            if (CuratorUtils.isChildAdded(event)) {
               if (assignSegmentLatch.getCount() > 0) {
                 //Coordinator should try to assign segment to druidServer historical
                 //Simulate historical loading segment
@@ -402,17 +394,19 @@ public class DruidCoordinatorTest extends CuratorTestBase
     Assert.assertEquals(1, segmentAvailability.size());
     Assert.assertEquals(0L, segmentAvailability.get(dataSource));
 
-    Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
-    Assert.assertNotNull(replicationStatus);
-    Assert.assertEquals(1, replicationStatus.entrySet().size());
+    Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
+        coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
+    Assert.assertNotNull(underReplicationCountsPerDataSourcePerTier);
+    Assert.assertEquals(1, underReplicationCountsPerDataSourcePerTier.size());
 
-    Object2LongMap<String> dataSourceMap = replicationStatus.get(tier);
-    Assert.assertNotNull(dataSourceMap);
-    Assert.assertEquals(1, dataSourceMap.size());
-    Assert.assertNotNull(dataSourceMap.get(dataSource));
+    Object2LongMap<String> underRepliicationCountsPerDataSource = underReplicationCountsPerDataSourcePerTier.get(tier);
+    Assert.assertNotNull(underRepliicationCountsPerDataSource);
+    Assert.assertEquals(1, underRepliicationCountsPerDataSource.size());
+    //noinspection deprecation
+    Assert.assertNotNull(underRepliicationCountsPerDataSource.get(dataSource));
     // Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
     // The load rules asks for 2 replicas, therefore 1 replica should still be pending
-    Assert.assertEquals(1L, dataSourceMap.getLong(dataSource));
+    Assert.assertEquals(1L, underRepliicationCountsPerDataSource.getLong(dataSource));
 
     coordinator.stop();
     leaderUnannouncerLatch.await();
@@ -467,18 +461,17 @@ public class DruidCoordinatorTest extends CuratorTestBase
     DruidDataSource[] druidDataSources = {new DruidDataSource(dataSource, Collections.emptyMap())};
     dataSegments.values().forEach(druidDataSources[0]::addSegment);
 
+    setupMetadataSegmentManagerMock(druidDataSources[0]);
+
     EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
             .andReturn(ImmutableList.of(hotTier, coldTier)).atLeastOnce();
-    EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
-    EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
-        ImmutableList.of(druidDataSources[0].toImmutableDruidDataSource())
-    ).atLeastOnce();
+
     EasyMock.expect(serverInventoryView.getInventory())
             .andReturn(ImmutableList.of(hotServer, coldServer))
             .atLeastOnce();
     EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
 
-    EasyMock.replay(metadataRuleManager, databaseSegmentManager, serverInventoryView);
+    EasyMock.replay(metadataRuleManager, serverInventoryView);
 
     coordinator.start();
     leaderAnnouncerLatch.await(); // Wait for this coordinator to become leader
@@ -486,15 +479,8 @@ public class DruidCoordinatorTest extends CuratorTestBase
     final CountDownLatch assignSegmentLatchHot = new CountDownLatch(2);
     pathChildrenCache.getListenable().addListener(
         (client, event) -> {
-          if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
-            DataSegment segment = dataSegments
-                .entrySet()
-                .stream()
-                .filter(x -> event.getData().getPath().contains(x.getKey()))
-                .map(Map.Entry::getValue)
-                .findFirst()
-                .orElse(null);
-
+          if (CuratorUtils.isChildAdded(event)) {
+            DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
             if (segment != null) {
               hotServer.addDataSegment(segment);
               curator.delete().guaranteed().forPath(event.getData().getPath());
@@ -507,15 +493,9 @@ public class DruidCoordinatorTest extends CuratorTestBase
 
     final CountDownLatch assignSegmentLatchCold = new CountDownLatch(1);
     pathChildrenCacheCold.getListenable().addListener(
-        (client, event) -> {
-          if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
-            DataSegment segment = dataSegments
-                .entrySet()
-                .stream()
-                .filter(x -> event.getData().getPath().contains(x.getKey()))
-                .map(Map.Entry::getValue)
-                .findFirst()
-                .orElse(null);
+        (CuratorFramework client, PathChildrenCacheEvent event) -> {
+          if (CuratorUtils.isChildAdded(event)) {
+            DataSegment segment = findSegmentRelatedToCuratorEvent(dataSegments, event);
 
             if (segment != null) {
               coldServer.addDataSegment(segment);
@@ -536,10 +516,11 @@ public class DruidCoordinatorTest extends CuratorTestBase
 
     Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
 
-    Map<String, ? extends Object2LongMap<String>> replicationStatus = coordinator.getReplicationStatus();
-    Assert.assertEquals(2, replicationStatus.entrySet().size());
-    Assert.assertEquals(0L, replicationStatus.get(hotTierName).getLong(dataSource));
-    Assert.assertEquals(0L, replicationStatus.get(coldTierName).getLong(dataSource));
+    Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier =
+        coordinator.computeUnderReplicationCountsPerDataSourcePerTier();
+    Assert.assertEquals(2, underReplicationCountsPerDataSourcePerTier.size());
+    Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(hotTierName).getLong(dataSource));
+    Assert.assertEquals(0L, underReplicationCountsPerDataSourcePerTier.get(coldTierName).getLong(dataSource));
 
     coordinator.stop();
     leaderUnannouncerLatch.await();
@@ -549,52 +530,37 @@ public class DruidCoordinatorTest extends CuratorTestBase
     EasyMock.verify(metadataRuleManager);
   }
 
-  @Test
-  public void testOrderedAvailableDataSegments()
+  private void setupMetadataSegmentManagerMock(DruidDataSource dataSource)
   {
-    DruidDataSource dataSource = new DruidDataSource("test", new HashMap());
-    DataSegment[] segments = new DataSegment[]{
-        getSegment("test", Intervals.of("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
-        getSegment("test", Intervals.of("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
-        getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z")),
-        getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z"))
-    };
-    for (DataSegment segment : segments) {
-      dataSource.addSegment(segment);
-    }
-
-    EasyMock.expect(databaseSegmentManager.getDataSources()).andReturn(
-        ImmutableList.of(dataSource.toImmutableDruidDataSource())
-    ).atLeastOnce();
+    EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
+    EasyMock
+        .expect(databaseSegmentManager.iterateAllSegments())
+        .andReturn(dataSource.getSegments())
+        .anyTimes();
+    EasyMock
+        .expect(databaseSegmentManager.getDataSources())
+        .andReturn(Collections.singleton(dataSource.toImmutableDruidDataSource()))
+        .anyTimes();
+    EasyMock
+        .expect(databaseSegmentManager.getAllDataSourceNames())
+        .andReturn(Collections.singleton(dataSource.getName()))
+        .anyTimes();
     EasyMock.replay(databaseSegmentManager);
-    Set<DataSegment> availableSegments = coordinator.getOrderedAvailableDataSegments();
-    DataSegment[] expected = new DataSegment[]{
-        getSegment("test", Intervals.of("2016-01-11T01:00:00Z/2016-01-11T02:00:00Z")),
-        getSegment("test", Intervals.of("2016-01-10T03:00:00Z/2016-01-10T04:00:00Z")),
-        getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T12:00:00Z")),
-        getSegment("test", Intervals.of("2016-01-09T10:00:00Z/2016-01-09T11:00:00Z"))
-    };
-    Assert.assertEquals(expected.length, availableSegments.size());
-    Assert.assertEquals(expected, availableSegments.toArray());
-    EasyMock.verify(databaseSegmentManager);
   }
 
-
-  private DataSegment getSegment(String dataSource, Interval interval)
+  @Nullable
+  private static DataSegment findSegmentRelatedToCuratorEvent(
+      Map<String, DataSegment> dataSegments,
+      PathChildrenCacheEvent event
+  )
   {
-    // Not using EasyMock as it hampers the performance of multithreads.
-    DataSegment segment = new DataSegment(
-        dataSource,
-        interval,
-        "dummy_version",
-        new ConcurrentHashMap<>(),
-        new ArrayList<>(),
-        new ArrayList<>(),
-        null,
-        0,
-        0L
-    );
-    return segment;
+    return dataSegments
+        .entrySet()
+        .stream()
+        .filter(x -> event.getData().getPath().contains(x.getKey()))
+        .map(Map.Entry::getValue)
+        .findFirst()
+        .orElse(null);
   }
 
   private static class TestDruidLeaderSelector implements DruidLeaderSelector
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
index 2ef3cdd..001dc2a 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/ReservoirSegmentSamplerTest.java
@@ -138,7 +138,7 @@ public class ReservoirSegmentSamplerTest
     EasyMock.expect(druidServer1.getName()).andReturn("1").atLeastOnce();
     EasyMock.expect(druidServer1.getCurrSize()).andReturn(30L).atLeastOnce();
     EasyMock.expect(druidServer1.getMaxSize()).andReturn(100L).atLeastOnce();
-    EasyMock.expect(druidServer1.getSegments()).andReturn(segments1).anyTimes();
+    EasyMock.expect(druidServer1.getLazyAllSegments()).andReturn(segments1).anyTimes();
     EasyMock.expect(druidServer1.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer1);
 
@@ -146,7 +146,7 @@ public class ReservoirSegmentSamplerTest
     EasyMock.expect(druidServer2.getTier()).andReturn("normal").anyTimes();
     EasyMock.expect(druidServer2.getCurrSize()).andReturn(30L).atLeastOnce();
     EasyMock.expect(druidServer2.getMaxSize()).andReturn(100L).atLeastOnce();
-    EasyMock.expect(druidServer2.getSegments()).andReturn(segments2).anyTimes();
+    EasyMock.expect(druidServer2.getLazyAllSegments()).andReturn(segments2).anyTimes();
     EasyMock.expect(druidServer2.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer2);
 
@@ -154,7 +154,7 @@ public class ReservoirSegmentSamplerTest
     EasyMock.expect(druidServer3.getTier()).andReturn("normal").anyTimes();
     EasyMock.expect(druidServer3.getCurrSize()).andReturn(30L).atLeastOnce();
     EasyMock.expect(druidServer3.getMaxSize()).andReturn(100L).atLeastOnce();
-    EasyMock.expect(druidServer3.getSegments()).andReturn(segments3).anyTimes();
+    EasyMock.expect(druidServer3.getLazyAllSegments()).andReturn(segments3).anyTimes();
     EasyMock.expect(druidServer3.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer3);
 
@@ -162,7 +162,7 @@ public class ReservoirSegmentSamplerTest
     EasyMock.expect(druidServer4.getTier()).andReturn("normal").anyTimes();
     EasyMock.expect(druidServer4.getCurrSize()).andReturn(30L).atLeastOnce();
     EasyMock.expect(druidServer4.getMaxSize()).andReturn(100L).atLeastOnce();
-    EasyMock.expect(druidServer4.getSegments()).andReturn(segments4).anyTimes();
+    EasyMock.expect(druidServer4.getLazyAllSegments()).andReturn(segments4).anyTimes();
     EasyMock.expect(druidServer4.getSegment(EasyMock.anyObject())).andReturn(null).anyTimes();
     EasyMock.replay(druidServer4);
 
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java b/server/src/test/java/org/apache/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java
index fe7b321..b8699fb 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/cost/CachingCostBalancerStrategyTest.java
@@ -115,7 +115,7 @@ public class CachingCostBalancerStrategyTest
   {
     ClusterCostCache.Builder builder = ClusterCostCache.builder();
     serverHolders.forEach(
-        s -> s.getServer().getSegments().forEach(segment -> builder.addSegment(s.getServer().getName(), segment))
+        s -> s.getServer().getLazyAllSegments().forEach(segment -> builder.addSegment(s.getServer().getName(), segment))
     );
     return new CachingCostBalancerStrategy(builder.build(), listeningExecutorService);
   }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java
index 996f3f9..0ae0ee6 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorCleanupOvershadowedTest.java
@@ -117,11 +117,12 @@ public class DruidCoordinatorCleanupOvershadowedTest
             ).collect(Collectors.toCollection(() -> new TreeSet<>(Collections.reverseOrder())))
         ));
 
-    DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams.newBuilder()
-                                                                        .withAvailableSegments(availableSegments)
-                                                                        .withCoordinatorStats(new CoordinatorStats())
-                                                                        .withDruidCluster(druidCluster)
-                                                                        .build();
+    DruidCoordinatorRuntimeParams params = DruidCoordinatorRuntimeParams
+        .newBuilder()
+        .withAvailableSegmentsInTest(availableSegments)
+        .withCoordinatorStats(new CoordinatorStats())
+        .withDruidCluster(druidCluster)
+        .build();
     druidCoordinatorCleanupOvershadowed.run(params);
     EasyMock.verify(coordinator, druidDataSource, druidServer);
   }
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
index 83398d7..adb3447 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/BroadcastDistributionRuleTest.java
@@ -286,7 +286,7 @@ public class BroadcastDistributionRuleTest
                                      .withDruidCluster(druidCluster)
                                      .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(
+                                     .withAvailableSegmentsInTest(
                                          smallSegment,
                                          largeSegments.get(0),
                                          largeSegments.get(1),
@@ -337,7 +337,7 @@ public class BroadcastDistributionRuleTest
                                      .withDruidCluster(secondCluster)
                                      .withSegmentReplicantLookup(SegmentReplicantLookup.make(secondCluster))
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(
+                                     .withAvailableSegmentsInTest(
                                          smallSegment,
                                          largeSegments.get(0),
                                          largeSegments.get(1)
@@ -366,7 +366,7 @@ public class BroadcastDistributionRuleTest
                                      .withDruidCluster(druidCluster)
                                      .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(
+                                     .withAvailableSegmentsInTest(
                                          smallSegment,
                                          largeSegments.get(0),
                                          largeSegments.get(1),
@@ -404,7 +404,7 @@ public class BroadcastDistributionRuleTest
                                      .withDruidCluster(druidCluster)
                                      .withSegmentReplicantLookup(SegmentReplicantLookup.make(druidCluster))
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(
+                                     .withAvailableSegmentsInTest(
                                          smallSegment,
                                          largeSegments.get(0),
                                          largeSegments.get(1),
diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
index a8793b2..73d47cc 100644
--- a/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
+++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/LoadRuleTest.java
@@ -182,7 +182,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -253,7 +253,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -303,7 +303,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -393,7 +393,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -482,7 +482,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -541,7 +541,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -614,7 +614,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -671,7 +671,7 @@ public class LoadRuleTest
             .withReplicationManager(throttler)
             .withBalancerStrategy(mockBalancerStrategy)
             .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-            .withAvailableSegments(dataSegment1, dataSegment2, dataSegment3)
+            .withAvailableSegmentsInTest(dataSegment1, dataSegment2, dataSegment3)
             .withDynamicConfigs(CoordinatorDynamicConfig.builder().withMaxSegmentsInNodeLoadingQueue(2).build())
             .build();
 
@@ -728,7 +728,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -785,7 +785,7 @@ public class LoadRuleTest
                                      .withReplicationManager(throttler)
                                      .withBalancerStrategy(mockBalancerStrategy)
                                      .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-                                     .withAvailableSegments(segment).build(),
+                                     .withAvailableSegmentsInTest(segment).build(),
         segment
     );
 
@@ -838,7 +838,7 @@ public class LoadRuleTest
         .withReplicationManager(throttler)
         .withBalancerStrategy(mockBalancerStrategy)
         .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-        .withAvailableSegments(segment1, segment2)
+        .withAvailableSegmentsInTest(segment1, segment2)
         .build();
     CoordinatorStats stats = rule.run(
         null,
@@ -904,7 +904,7 @@ public class LoadRuleTest
         .withReplicationManager(throttler)
         .withBalancerStrategy(mockBalancerStrategy)
         .withBalancerReferenceTimestamp(DateTimes.of("2013-01-01"))
-        .withAvailableSegments(segment1)
+        .withAvailableSegmentsInTest(segment1)
         .build();
     CoordinatorStats stats = rule.run(
         null,
diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
index e895113..6316b40 100644
--- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
+++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java
@@ -479,7 +479,7 @@ public class SystemSchema extends AbstractSchema
       final List<ImmutableDruidServer> druidServers = serverView.getDruidServers();
       final int serverSegmentsTableSize = SERVER_SEGMENTS_SIGNATURE.getRowOrder().size();
       for (ImmutableDruidServer druidServer : druidServers) {
-        for (DataSegment segment : druidServer.getSegments()) {
+        for (DataSegment segment : druidServer.getLazyAllSegments()) {
           Object[] row = new Object[serverSegmentsTableSize];
           row[0] = druidServer.getHost();
           row[1] = segment.getId();
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
index e707c40..ba44081 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java
@@ -268,19 +268,15 @@ public class DruidSchemaTest extends CalciteTestBase
     SegmentMetadataHolder updatedHolder = SegmentMetadataHolder.from(existingHolder).withNumRows(5).build();
     schema.setSegmentMetadataHolder(existingSegment, updatedHolder);
     // find a druidServer holding existingSegment
-    final Pair<ImmutableDruidServer, DataSegment> pair = druidServers.stream()
-                                                                     .flatMap(druidServer -> druidServer.getSegments()
-                                                                                                        .stream()
-                                                                                                        .filter(segment -> segment
-                                                                                                            .equals(
-                                                                                                                existingSegment))
-                                                                                                        .map(segment -> Pair
-                                                                                                            .of(
-                                                                                                                druidServer,
-                                                                                                                segment
-                                                                                                            )))
-                                                                     .findAny()
-                                                                     .orElse(null);
+    final Pair<ImmutableDruidServer, DataSegment> pair = druidServers
+        .stream()
+        .flatMap(druidServer -> druidServer
+            .getLazyAllSegments().stream()
+            .filter(segment -> segment.equals(existingSegment))
+            .map(segment -> Pair.of(druidServer, segment))
+        )
+        .findAny()
+        .orElse(null);
     Assert.assertNotNull(pair);
     final ImmutableDruidServer server = pair.lhs;
     Assert.assertNotNull(server);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org