You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2020/06/17 06:48:55 UTC

[druid] branch master updated: API to verify a datasource has the latest ingested data (#9965)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a26206  API to verify a datasource has the latest ingested data (#9965)
1a26206 is described below

commit 1a2620606d8187da7725088e9c52ce41b8a692b0
Author: Maytas Monsereenusorn <52...@users.noreply.github.com>
AuthorDate: Tue Jun 16 20:48:30 2020 -1000

    API to verify a datasource has the latest ingested data (#9965)
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * fix checksyle
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * API to verify a datasource has the latest ingested data
    
    * fix spelling
    
    * address comments
    
    * fix checkstyle
    
    * update docs
    
    * fix tests
    
    * fix doc
    
    * address comments
    
    * fix typo
    
    * fix spelling
    
    * address comments
    
    * address comments
    
    * fix typo in docs
---
 docs/ingestion/faq.md                              |  14 +
 docs/operations/api-reference.md                   |  43 ++-
 .../apache/druid/client/CoordinatorServerView.java |   4 +
 .../druid/metadata/SegmentsMetadataManager.java    |  17 ++
 .../druid/metadata/SqlSegmentsMetadataManager.java | 142 ++++++++--
 .../druid/server/coordinator/DruidCoordinator.java |  19 +-
 .../druid/server/http/DataSourcesResource.java     | 133 ++++++++-
 .../metadata/SqlSegmentsMetadataManagerTest.java   | 159 ++++++++++-
 .../druid/server/http/DataSourcesResourceTest.java | 315 ++++++++++++++++++---
 9 files changed, 773 insertions(+), 73 deletions(-)

diff --git a/docs/ingestion/faq.md b/docs/ingestion/faq.md
index 1e6ffe1..308407f 100644
--- a/docs/ingestion/faq.md
+++ b/docs/ingestion/faq.md
@@ -66,6 +66,20 @@ Other common reasons that hand-off fails are as follows:
 
 Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) .
 
+## How do I know when I can make query to Druid after submitting batch ingestion task?
+
+You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow.
+1. Submit your ingestion task.
+2. Repeatedly poll the [Overlord's tasks API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed.
+3. Poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with 
+`forceMetadataRefresh=true` and `interval=<INTERVAL_OF_INGESTED_DATA>` once. 
+(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms of the load on the metadata store but is necessary to make sure that we verify all the latest segments' load status)
+If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
+4. Repeatedly poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with 
+`forceMetadataRefresh=false` and `interval=<INTERVAL_OF_INGESTED_DATA>`. 
+Continue polling until all segments are loaded. Once all segments are loaded you can now query the data. 
+Note that this workflow only guarantees that the segments are available at the time of the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) call. Segments can still become missing because of historical process failures or any other reasons afterward.
+
 ## I don't see my Druid segments on my Historical processes
 
 You can check the Coordinator console located at `<COORDINATOR_IP>:<PORT>`. Make sure that your segments have actually loaded on [Historical processes](../design/historical.md). If your segments are not present, check the Coordinator logs for messages about capacity of replication errors. One reason that segments are not downloaded is because Historical processes have maxSizes that are too small, making them incapable of downloading more data. You can change that with (for example):
diff --git a/docs/operations/api-reference.md b/docs/operations/api-reference.md
index a66dd64..a3610a8 100644
--- a/docs/operations/api-reference.md
+++ b/docs/operations/api-reference.md
@@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment
 
  * `/druid/coordinator/v1/loadstatus?simple`
 
-Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication.
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include segment replication counts.
 
 * `/druid/coordinator/v1/loadstatus?full`
 
-Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes replication.
+Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes segment replication counts.
 
 * `/druid/coordinator/v1/loadqueue`
 
@@ -114,6 +114,45 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading by Datasource
+
+Note that all _interval_ query parameters are ISO 8601 strings (e.g., 2016-06-27/2016-06-28).
+Also note that these APIs only guarantees that the segments are available at the time of the call. 
+Segments can still become missing because of historical process failures or any other reasons afterward.
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given 
+datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store 
+(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms 
+of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status)
+Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. 
+If no used segments are found for the given inputs, this API returns `204 No Content`
+
+ * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). This does not include segment replication counts. `forceMetadataRefresh` is required to be set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store 
+(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms 
+of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status)
+Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. 
+If no used segments are found for the given inputs, this API returns `204 No Content` 
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?full&forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). This includes segment replication counts. `forceMetadataRefresh` is required to be set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store 
+(Note: `forceMetadataRefresh=true` refreshes Coordinator's metadata cache of all datasources. This can be a heavy operation in terms 
+of the load on the metadata store but can be necessary to make sure that we verify all the latest segments' load status)
+Setting `forceMetadataRefresh` to false will use the metadata cached on the coordinator from the last force/periodic refresh. 
+If no used segments are found for the given inputs, this API returns `204 No Content`
+
 #### Metadata store information
 
 ##### GET
diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
index 2517a8f..538cc2f 100644
--- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
+++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java
@@ -200,6 +200,10 @@ public class CoordinatorServerView implements InventoryView
     }
   }
 
+  public Map<SegmentId, SegmentLoadInfo> getSegmentLoadInfos()
+  {
+    return segmentLoadInfos;
+  }
 
   @Override
   public DruidServer getInventoryValue(String serverKey)
diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
index 4f97b15..889141a 100644
--- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
@@ -20,6 +20,7 @@
 package org.apache.druid.metadata;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.timeline.DataSegment;
@@ -114,6 +115,22 @@ public interface SegmentsMetadataManager
   Iterable<DataSegment> iterateAllUsedSegments();
 
   /**
+   * Returns an iterable to go over all used and non-overshadowed segments of given data sources over given interval.
+   * The order in which segments are iterated is unspecified. Note: the iteration may not be as trivially cheap as,
+   * for example, iteration over an ArrayList. Try (to some reasonable extent) to organize the code so that it
+   * iterates the returned iterable only once rather than several times.
+   * If {@param requiresLatest} is true then a force metadatastore poll will be triggered. This can cause a longer
+   * response time but will ensure that the latest segment information (at the time this method is called) is returned.
+   * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll
+   * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used.
+   */
+  Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+      String datasource,
+      Interval interval,
+      boolean requiresLatest
+  );
+
+  /**
    * Retrieves all data source names for which there are segment in the database, regardless of whether those segments
    * are used or not. If there are no segments in the database, returns an empty set.
    *
diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
index 92a8748..60f7f4b 100644
--- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
+++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
@@ -20,6 +20,8 @@
 package org.apache.druid.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Supplier;
 import com.google.common.base.Throwables;
@@ -44,6 +46,7 @@ import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.Partitions;
 import org.apache.druid.timeline.SegmentId;
 import org.apache.druid.timeline.VersionedIntervalTimeline;
 import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@@ -95,7 +98,8 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
   {}
 
   /** Represents periodic {@link #poll}s happening from {@link #exec}. */
-  private static class PeriodicDatabasePoll implements DatabasePoll
+  @VisibleForTesting
+  static class PeriodicDatabasePoll implements DatabasePoll
   {
     /**
      * This future allows to wait until {@link #dataSourcesSnapshot} is initialized in the first {@link #poll()}
@@ -104,13 +108,15 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
      * leadership changes.
      */
     final CompletableFuture<Void> firstPollCompletionFuture = new CompletableFuture<>();
+    long lastPollStartTimestampInMs = -1;
   }
 
   /**
    * Represents on-demand {@link #poll} initiated at periods of time when SqlSegmentsMetadataManager doesn't poll the database
    * periodically.
    */
-  private static class OnDemandDatabasePoll implements DatabasePoll
+  @VisibleForTesting
+  static class OnDemandDatabasePoll implements DatabasePoll
   {
     final long initiationTimeNanos = System.nanoTime();
     final CompletableFuture<Void> pollCompletionFuture = new CompletableFuture<>();
@@ -127,7 +133,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
    * called at the same time if two different threads are calling them. This might be possible if Coordinator gets and
    * drops leadership repeatedly in quick succession.
    *
-   * This lock is also used to synchronize {@link #awaitOrPerformDatabasePoll} for times when SqlSegmentsMetadataManager
+   * This lock is also used to synchronize {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} for times when SqlSegmentsMetadataManager
    * is not polling the database periodically (in other words, when the Coordinator is not the leader).
    */
   private final ReentrantReadWriteLock startStopPollLock = new ReentrantReadWriteLock();
@@ -155,7 +161,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
    * easy to forget to do.
    *
    * This field may be updated from {@link #exec}, or from whatever thread calling {@link #doOnDemandPoll} via {@link
-   * #awaitOrPerformDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager.
+   * #useLatestIfWithinDelayOrPerformNewDatabasePoll()} via one of the public methods of SqlSegmentsMetadataManager.
    */
   private volatile @MonotonicNonNull DataSourcesSnapshot dataSourcesSnapshot = null;
 
@@ -170,7 +176,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
    * Note that if there is a happens-before relationship between a call to {@link #startPollingDatabasePeriodically()}
    * (on Coordinators' leadership change) and one of the methods accessing the {@link #dataSourcesSnapshot}'s state in
    * this class the latter is guaranteed to await for the initiated periodic poll. This is because when the latter
-   * method calls to {@link #awaitLatestDatabasePoll()} via {@link #awaitOrPerformDatabasePoll}, they will
+   * method calls to {@link #useLatestSnapshotIfWithinDelay()} via {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll}, they will
    * see the latest {@link PeriodicDatabasePoll} value (stored in this field, latestDatabasePoll, in {@link
    * #startPollingDatabasePeriodically()}) and to await on its {@link PeriodicDatabasePoll#firstPollCompletionFuture}.
    *
@@ -185,7 +191,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
    * SegmentsMetadataManager} and guarantee that it always returns consistent and relatively up-to-date data from methods
    * like {@link #getImmutableDataSourceWithUsedSegments}, while avoiding excessive repetitive polls. The last part
    * is achieved via "hooking on" other polls by awaiting on {@link PeriodicDatabasePoll#firstPollCompletionFuture} or
-   * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #awaitOrPerformDatabasePoll} method
+   * {@link OnDemandDatabasePoll#pollCompletionFuture}, see {@link #useLatestIfWithinDelayOrPerformNewDatabasePoll} method
    * implementation for details.
    *
    * Note: the overall implementation of periodic/on-demand polls is not completely optimal: for example, when the
@@ -194,7 +200,7 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
    * during Coordinator leadership switches is not a priority.
    *
    * This field is {@code volatile} because it's checked and updated in a double-checked locking manner in {@link
-   * #awaitOrPerformDatabasePoll()}.
+   * #useLatestIfWithinDelayOrPerformNewDatabasePoll()}.
    */
   private volatile @Nullable DatabasePoll latestDatabasePoll = null;
 
@@ -311,6 +317,22 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
   private Runnable createPollTaskForStartOrder(long startOrder, PeriodicDatabasePoll periodicDatabasePoll)
   {
     return () -> {
+      // If latest poll was an OnDemandDatabasePoll that started less than periodicPollDelay,
+      // We will wait for (periodicPollDelay - currentTime - LatestOnDemandDatabasePollStartTime) then check again.
+      try {
+        long periodicPollDelayNanos = TimeUnit.MILLISECONDS.toNanos(periodicPollDelay.getMillis());
+        while (latestDatabasePoll != null
+               && latestDatabasePoll instanceof OnDemandDatabasePoll
+               && ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation() < periodicPollDelayNanos) {
+          long sleepNano = periodicPollDelayNanos
+                           - ((OnDemandDatabasePoll) latestDatabasePoll).nanosElapsedFromInitiation();
+          TimeUnit.NANOSECONDS.sleep(sleepNano);
+        }
+      }
+      catch (Exception e) {
+        log.debug(e, "Exception found while waiting for next periodic poll");
+      }
+
       // poll() is synchronized together with startPollingDatabasePeriodically(), stopPollingDatabasePeriodically() and
       // isPollingDatabasePeriodically() to ensure that when stopPollingDatabasePeriodically() exits, poll() won't
       // actually run anymore after that (it could only enter the synchronized section and exit immediately because the
@@ -320,8 +342,10 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
       lock.lock();
       try {
         if (startOrder == currentStartPollingOrder) {
+          periodicDatabasePoll.lastPollStartTimestampInMs = System.currentTimeMillis();
           poll();
           periodicDatabasePoll.firstPollCompletionFuture.complete(null);
+          latestDatabasePoll = periodicDatabasePoll;
         } else {
           log.debug("startOrder = currentStartPollingOrder = %d, skipping poll()", startOrder);
         }
@@ -381,16 +405,16 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
     }
   }
 
-  private void awaitOrPerformDatabasePoll()
+  private void useLatestIfWithinDelayOrPerformNewDatabasePoll()
   {
-    // Double-checked locking with awaitLatestDatabasePoll() call playing the role of the "check".
-    if (awaitLatestDatabasePoll()) {
+    // Double-checked locking with useLatestSnapshotIfWithinDelay() call playing the role of the "check".
+    if (useLatestSnapshotIfWithinDelay()) {
       return;
     }
     ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
     lock.lock();
     try {
-      if (awaitLatestDatabasePoll()) {
+      if (useLatestSnapshotIfWithinDelay()) {
         return;
       }
       OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
@@ -403,11 +427,17 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
   }
 
   /**
-   * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is
-   * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise,
-   * meaning that a new on-demand database poll should be initiated.
+   * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * made not longer than {@link #periodicPollDelay} from current time.
+   * This method does wait untill completion for if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * already in the process of polling the database.
+   * This means that any method using this check can read from snapshot that is
+   * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old.
    */
-  private boolean awaitLatestDatabasePoll()
+  @VisibleForTesting
+  boolean useLatestSnapshotIfWithinDelay()
   {
     DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
     if (latestDatabasePoll instanceof PeriodicDatabasePoll) {
@@ -430,6 +460,49 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
     return false;
   }
 
+  /**
+   * This method will always force a database poll if there is no ongoing database poll. This method will then
+   * waits for the new poll or the ongoing poll to completes before returning.
+   * This means that any method using this check can be sure that the latest poll for the snapshot was completed after
+   * this method was called.
+   */
+  @VisibleForTesting
+  void forceOrWaitOngoingDatabasePoll()
+  {
+    long checkStartTime = System.currentTimeMillis();
+    ReentrantReadWriteLock.WriteLock lock = startStopPollLock.writeLock();
+    lock.lock();
+    try {
+      DatabasePoll latestDatabasePoll = this.latestDatabasePoll;
+      try {
+        //Verify if there was a periodic poll completed while we were waiting for the lock
+        if (latestDatabasePoll instanceof PeriodicDatabasePoll
+            && ((PeriodicDatabasePoll) latestDatabasePoll).lastPollStartTimestampInMs > checkStartTime) {
+          return;
+        }
+        // Verify if there was a on-demand poll completed while we were waiting for the lock
+        if (latestDatabasePoll instanceof OnDemandDatabasePoll) {
+          long checkStartTimeNanos = TimeUnit.MILLISECONDS.toNanos(checkStartTime);
+          OnDemandDatabasePoll latestOnDemandPoll = (OnDemandDatabasePoll) latestDatabasePoll;
+          if (latestOnDemandPoll.initiationTimeNanos > checkStartTimeNanos) {
+            return;
+          }
+        }
+      }
+      catch (Exception e) {
+        // Latest poll was unsuccessful, try to do a new poll
+        log.debug(e, "Latest poll was unsuccessful. Starting a new poll...");
+      }
+      // Force a database poll
+      OnDemandDatabasePoll onDemandDatabasePoll = new OnDemandDatabasePoll();
+      this.latestDatabasePoll = onDemandDatabasePoll;
+      doOnDemandPoll(onDemandDatabasePoll);
+    }
+    finally {
+      lock.unlock();
+    }
+  }
+
   private void doOnDemandPoll(OnDemandDatabasePoll onDemandPoll)
   {
     try {
@@ -857,19 +930,44 @@ public class SqlSegmentsMetadataManager implements SegmentsMetadataManager
   @Override
   public DataSourcesSnapshot getSnapshotOfDataSourcesWithAllUsedSegments()
   {
-    awaitOrPerformDatabasePoll();
+    useLatestIfWithinDelayOrPerformNewDatabasePoll();
     return dataSourcesSnapshot;
   }
 
+  @VisibleForTesting
+  DataSourcesSnapshot getDataSourcesSnapshot()
+  {
+    return dataSourcesSnapshot;
+  }
+
+  @VisibleForTesting
+  DatabasePoll getLatestDatabasePoll()
+  {
+    return latestDatabasePoll;
+  }
+
+
   @Override
   public Iterable<DataSegment> iterateAllUsedSegments()
   {
-    awaitOrPerformDatabasePoll();
-    return () -> dataSourcesSnapshot
-        .getDataSourcesWithAllUsedSegments()
-        .stream()
-        .flatMap(dataSource -> dataSource.getSegments().stream())
-        .iterator();
+    useLatestIfWithinDelayOrPerformNewDatabasePoll();
+    return dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot();
+  }
+
+  @Override
+  public Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource,
+                                                                                                    Interval interval,
+                                                                                                    boolean requiresLatest)
+  {
+    if (requiresLatest) {
+      forceOrWaitOngoingDatabasePoll();
+    } else {
+      useLatestIfWithinDelayOrPerformNewDatabasePoll();
+    }
+    VersionedIntervalTimeline<String, DataSegment> usedSegmentsTimeline
+        = dataSourcesSnapshot.getUsedSegmentsTimelinesPerDataSource().get(datasource);
+    return Optional.fromNullable(usedSegmentsTimeline)
+                   .transform(timeline -> timeline.findNonOvershadowedObjectsInInterval(interval, Partitions.ONLY_COMPLETE));
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
index 36a414e..c4de364 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
@@ -257,14 +257,23 @@ public class DruidCoordinator
    */
   public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
   {
+    final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
+    return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments);
+  }
+
+  /**
+   * @return tier -> { dataSource -> underReplicationCount } map
+   */
+  public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegments(
+      Iterable<DataSegment> dataSegments
+  )
+  {
     final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
 
     if (segmentReplicantLookup == null) {
       return underReplicationCountsPerDataSourcePerTier;
     }
 
-    final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
-
     final DateTime now = DateTimes.nowUtc();
 
     for (final DataSegment segment : dataSegments) {
@@ -320,7 +329,7 @@ public class DruidCoordinator
 
     for (ImmutableDruidDataSource dataSource : dataSources) {
       final Set<DataSegment> segments = Sets.newHashSet(dataSource.getSegments());
-      final int numUsedSegments = segments.size();
+      final int numPublishedSegments = segments.size();
 
       // remove loaded segments
       for (DruidServer druidServer : serverInventoryView.getInventory()) {
@@ -333,10 +342,10 @@ public class DruidCoordinator
           }
         }
       }
-      final int numUnloadedSegments = segments.size();
+      final int numUnavailableSegments = segments.size();
       loadStatus.put(
           dataSource.getName(),
-          100 * ((double) (numUsedSegments - numUnloadedSegments) / (double) numUsedSegments)
+          100 * ((double) (numPublishedSegments - numUnavailableSegments) / (double) numPublishedSegments)
       );
     }
 
diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index b6d310f..f88d1c7 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -22,11 +22,13 @@ package org.apache.druid.server.http;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.inject.Inject;
 import com.sun.jersey.spi.container.ResourceFilters;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.druid.client.CoordinatorServerView;
 import org.apache.druid.client.DruidDataSource;
@@ -49,6 +51,7 @@ import org.apache.druid.metadata.UnknownSegmentIdsException;
 import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.server.coordination.DruidServerMetadata;
+import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.rules.LoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.http.security.DatasourceResourceFilter;
@@ -96,12 +99,14 @@ import java.util.stream.Collectors;
 public class DataSourcesResource
 {
   private static final Logger log = new Logger(DataSourcesResource.class);
+  private static final long DEFAULT_LOADSTATUS_INTERVAL_OFFSET = 14 * 24 * 60 * 60 * 1000;
 
   private final CoordinatorServerView serverInventoryView;
   private final SegmentsMetadataManager segmentsMetadataManager;
   private final MetadataRuleManager metadataRuleManager;
   private final IndexingServiceClient indexingServiceClient;
   private final AuthorizerMapper authorizerMapper;
+  private final DruidCoordinator coordinator;
 
   @Inject
   public DataSourcesResource(
@@ -109,7 +114,8 @@ public class DataSourcesResource
       SegmentsMetadataManager segmentsMetadataManager,
       MetadataRuleManager metadataRuleManager,
       @Nullable IndexingServiceClient indexingServiceClient,
-      AuthorizerMapper authorizerMapper
+      AuthorizerMapper authorizerMapper,
+      DruidCoordinator coordinator
   )
   {
     this.serverInventoryView = serverInventoryView;
@@ -117,6 +123,7 @@ public class DataSourcesResource
     this.metadataRuleManager = metadataRuleManager;
     this.indexingServiceClient = indexingServiceClient;
     this.authorizerMapper = authorizerMapper;
+    this.coordinator = coordinator;
   }
 
   @GET
@@ -391,6 +398,130 @@ public class DataSourcesResource
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    if (forceMetadataRefresh == null) {
+      return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("Invalid request. forceMetadataRefresh must be specified")
+          .build();
+    }
+    final Interval theInterval;
+    if (interval == null) {
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - DEFAULT_LOADSTATUS_INTERVAL_OFFSET, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        forceMetadataRefresh
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (Iterables.size(segments.get()) == 0) {
+      return Response
+          .status(Response.Status.NO_CONTENT)
+          .entity("No used segment found for the given datasource and interval")
+          .build();
+    }
+
+    if (simple != null) {
+      // Calculate response for simple mode
+      SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get());
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              segmentsLoadStatistics.getNumUnavailableSegments()
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate response for full mode
+      Map<String, Object2LongMap<String>> segmentLoadMap
+          = coordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments.get());
+      if (segmentLoadMap.isEmpty()) {
+        return Response.serverError()
+                       .entity("Coordinator segment replicant lookup is not initialized yet. Try again later.")
+                       .build();
+      }
+      return Response.ok(segmentLoadMap).build();
+    } else {
+      // Calculate response for default mode
+      SegmentsLoadStatistics segmentsLoadStatistics = computeSegmentLoadStatistics(segments.get());
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              100 * ((double) (segmentsLoadStatistics.getNumLoadedSegments()) / (double) segmentsLoadStatistics.getNumPublishedSegments())
+          )
+      ).build();
+    }
+  }
+
+  private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable<DataSegment> segments)
+  {
+    Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+    int numPublishedSegments = 0;
+    int numUnavailableSegments = 0;
+    int numLoadedSegments = 0;
+    for (DataSegment segment : segments) {
+      numPublishedSegments++;
+      if (!segmentLoadInfos.containsKey(segment.getId())) {
+        numUnavailableSegments++;
+      } else {
+        numLoadedSegments++;
+      }
+    }
+    return new SegmentsLoadStatistics(numPublishedSegments, numUnavailableSegments, numLoadedSegments);
+  }
+
+  private static class SegmentsLoadStatistics
+  {
+    private int numPublishedSegments;
+    private int numUnavailableSegments;
+    private int numLoadedSegments;
+
+    SegmentsLoadStatistics(
+        int numPublishedSegments,
+        int numUnavailableSegments,
+        int numLoadedSegments
+    )
+    {
+      this.numPublishedSegments = numPublishedSegments;
+      this.numUnavailableSegments = numUnavailableSegments;
+      this.numLoadedSegments = numLoadedSegments;
+    }
+
+    public int getNumPublishedSegments()
+    {
+      return numPublishedSegments;
+    }
+
+    public int getNumUnavailableSegments()
+    {
+      return numUnavailableSegments;
+    }
+
+    public int getNumLoadedSegments()
+    {
+      return numLoadedSegments;
+    }
+  }
+
   /**
    * The property names belong to the public HTTP JSON API.
    */
diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
index be6354a..57df440 100644
--- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
+++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java
@@ -20,11 +20,13 @@
 package org.apache.druid.metadata;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Optional;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
+import org.apache.druid.client.DataSourcesSnapshot;
 import org.apache.druid.client.ImmutableDruidDataSource;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -43,6 +45,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 
@@ -117,7 +120,7 @@ public class SqlSegmentsMetadataManagerTest
   {
     TestDerbyConnector connector = derbyConnectorRule.getConnector();
     SegmentsMetadataManagerConfig config = new SegmentsMetadataManagerConfig();
-    config.setPollDuration(Period.seconds(1));
+    config.setPollDuration(Period.seconds(3));
     sqlSegmentsMetadataManager = new SqlSegmentsMetadataManager(
         jsonMapper,
         Suppliers.ofInstance(config),
@@ -148,30 +151,124 @@ public class SqlSegmentsMetadataManagerTest
   }
 
   @Test
-  public void testPoll()
+  public void testPollPeriodically()
   {
+    DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertNull(dataSourcesSnapshot);
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
-    sqlSegmentsMetadataManager.poll();
     Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    // This call make sure that the first poll is completed
+    sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay();
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
     Assert.assertEquals(
         ImmutableSet.of("wikipedia"),
         sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
     );
     Assert.assertEquals(
         ImmutableList.of("wikipedia"),
-        sqlSegmentsMetadataManager
-            .getImmutableDataSourcesWithAllUsedSegments()
-            .stream()
-            .map(ImmutableDruidDataSource::getName)
-            .collect(Collectors.toList())
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
     );
     Assert.assertEquals(
         ImmutableSet.of(segment1, segment2),
-        ImmutableSet.copyOf(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments("wikipedia").getSegments())
+        ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
     );
     Assert.assertEquals(
         ImmutableSet.of(segment1, segment2),
-        ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments())
+        ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
+    );
+  }
+
+  @Test
+  public void testPollOnDemand()
+  {
+    DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertNull(dataSourcesSnapshot);
+    // This should return false and not wait/poll anything as we did not schedule periodic poll
+    Assert.assertFalse(sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay());
+    Assert.assertNull(dataSourcesSnapshot);
+    // This call will force on demand poll
+    sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll();
+    Assert.assertFalse(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll);
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertEquals(
+        ImmutableSet.of("wikipedia"),
+        sqlSegmentsMetadataManager.retrieveAllDataSourceNames()
+    );
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia"),
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.copyOf(dataSourcesSnapshot.getDataSource("wikipedia").getSegments())
+    );
+    Assert.assertEquals(
+        ImmutableSet.of(segment1, segment2),
+        ImmutableSet.copyOf(dataSourcesSnapshot.iterateAllUsedSegmentsInSnapshot())
+    );
+  }
+
+  @Test(timeout = 60_000)
+  public void testPollPeriodicallyAndOnDemandInterleave() throws Exception
+  {
+    DataSourcesSnapshot dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertNull(dataSourcesSnapshot);
+    sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
+    Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    // This call make sure that the first poll is completed
+    sqlSegmentsMetadataManager.useLatestSnapshotIfWithinDelay();
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia"),
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
+    );
+    final String newDataSource2 = "wikipedia2";
+    final DataSegment newSegment2 = createNewSegment1(newDataSource2);
+    publisher.publishSegment(newSegment2);
+
+    // This call will force on demand poll
+    sqlSegmentsMetadataManager.forceOrWaitOngoingDatabasePoll();
+    Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.OnDemandDatabasePoll);
+    // New datasource should now be in the snapshot since we just force on demand poll.
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia2", "wikipedia"),
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
+    );
+
+    final String newDataSource3 = "wikipedia3";
+    final DataSegment newSegment3 = createNewSegment1(newDataSource3);
+    publisher.publishSegment(newSegment3);
+
+    // This time wait for periodic poll (not doing on demand poll so we have to wait a bit...)
+    while (sqlSegmentsMetadataManager.getDataSourcesSnapshot().getDataSource(newDataSource3) == null) {
+      Thread.sleep(1000);
+    }
+    Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically());
+    Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll);
+    dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot();
+    Assert.assertEquals(
+        ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"),
+        dataSourcesSnapshot.getDataSourcesWithAllUsedSegments()
+                           .stream()
+                           .map(ImmutableDruidDataSource::getName)
+                           .collect(Collectors.toList())
     );
   }
 
@@ -749,4 +846,46 @@ public class SqlSegmentsMetadataManagerTest
     sqlSegmentsMetadataManager.startPollingDatabasePeriodically();
     sqlSegmentsMetadataManager.stopPollingDatabasePeriodically();
   }
+
+  @Test
+  public void testIterateAllUsedNonOvershadowedSegmentsForDatasourceInterval() throws Exception
+  {
+    final Interval theInterval = Intervals.of("2012-03-15T00:00:00.000/2012-03-20T00:00:00.000");
+    Optional<Iterable<DataSegment>> segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        "wikipedia", theInterval, true
+    );
+    Assert.assertTrue(segments.isPresent());
+    Set<DataSegment> dataSegmentSet = ImmutableSet.copyOf(segments.get());
+    Assert.assertEquals(1, dataSegmentSet.size());
+    Assert.assertTrue(dataSegmentSet.contains(segment1));
+
+    final DataSegment newSegment2 = createSegment(
+        "wikipedia",
+        "2012-03-16T00:00:00.000/2012-03-17T00:00:00.000",
+        "2017-10-15T20:19:12.565Z",
+        "index/y=2017/m=10/d=15/2017-10-16T20:19:12.565Z/0/index.zip",
+        0
+    );
+    publisher.publishSegment(newSegment2);
+
+    // New segment is not returned since we call without force poll
+    segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        "wikipedia", theInterval, false
+    );
+    Assert.assertTrue(segments.isPresent());
+    dataSegmentSet = ImmutableSet.copyOf(segments.get());
+    Assert.assertEquals(1, dataSegmentSet.size());
+    Assert.assertTrue(dataSegmentSet.contains(segment1));
+
+    // New segment is returned since we call with force poll
+    segments = sqlSegmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        "wikipedia", theInterval, true
+    );
+    Assert.assertTrue(segments.isPresent());
+    dataSegmentSet = ImmutableSet.copyOf(segments.get());
+    Assert.assertEquals(2, dataSegmentSet.size());
+    Assert.assertTrue(dataSegmentSet.contains(segment1));
+    Assert.assertTrue(dataSegmentSet.contains(newSegment2));
+  }
+
 }
diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index 10a7f97..39e02ae 100644
--- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -19,11 +19,14 @@
 
 package org.apache.druid.server.http;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import it.unimi.dsi.fastutil.objects.Object2LongOpenHashMap;
 import org.apache.druid.client.CoordinatorServerView;
 import org.apache.druid.client.DruidDataSource;
 import org.apache.druid.client.DruidServer;
@@ -39,6 +42,7 @@ import org.apache.druid.query.SegmentDescriptor;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
+import org.apache.druid.server.coordinator.DruidCoordinator;
 import org.apache.druid.server.coordinator.rules.IntervalDropRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
@@ -176,7 +180,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server, request);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+        new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null);
     Response response = dataSourcesResource.getQueryableDataSources("full", null, request);
     Set<ImmutableDruidDataSource> result = (Set<ImmutableDruidDataSource>) response.getEntity();
     Assert.assertEquals(200, response.getStatus());
@@ -250,7 +254,7 @@ public class DataSourcesResourceTest
       }
     };
 
-    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper);
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, authMapper, null);
     Response response = dataSourcesResource.getQueryableDataSources("full", null, request);
     Set<ImmutableDruidDataSource> result = (Set<ImmutableDruidDataSource>) response.getEntity();
 
@@ -289,7 +293,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server, request);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
+        new DataSourcesResource(inventoryView, null, null, null, AuthTestUtils.TEST_AUTHORIZER_MAPPER, null);
     Response response = dataSourcesResource.getQueryableDataSources(null, "simple", request);
     Assert.assertEquals(200, response.getStatus());
     List<Map<String, Object>> results = (List<Map<String, Object>>) response.getEntity();
@@ -313,7 +317,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getDataSource("datasource1", "full");
     ImmutableDruidDataSource result = (ImmutableDruidDataSource) response.getEntity();
     Assert.assertEquals(200, response.getStatus());
@@ -329,7 +333,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
     Assert.assertEquals(204, dataSourcesResource.getDataSource("none", null).getStatus());
     EasyMock.verify(inventoryView, server);
   }
@@ -347,7 +351,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView, server);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@@ -380,7 +384,7 @@ public class DataSourcesResourceTest
     EasyMock.expect(inventoryView.getInventory()).andReturn(ImmutableList.of(server, server2, server3)).atLeastOnce();
 
     EasyMock.replay(inventoryView, server, server2, server3);
-    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null);
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result = (Map<String, Map<String, Object>>) response.getEntity();
@@ -418,7 +422,7 @@ public class DataSourcesResourceTest
 
     EasyMock.replay(inventoryView);
 
-    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null);
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getDataSource("datasource1", null);
     Assert.assertEquals(200, response.getStatus());
     Map<String, Map<String, Object>> result1 = (Map<String, Map<String, Object>>) response.getEntity();
@@ -463,7 +467,7 @@ public class DataSourcesResourceTest
     expectedIntervals.add(Intervals.of("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z"));
     expectedIntervals.add(Intervals.of("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z"));
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
 
     Response response = dataSourcesResource.getIntervalsWithServedSegmentsOrAllServedSegmentsPerIntervals(
         "invalidDataSource",
@@ -523,7 +527,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(inventoryView);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, null, null);
+        new DataSourcesResource(inventoryView, null, null, null, null, null);
     Response response = dataSourcesResource.getServedSegmentsInInterval(
         "invalidDataSource",
         "2010-01-01/P1D",
@@ -593,7 +597,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(indexingServiceClient, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null);
+        new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null);
     Response response = dataSourcesResource.killUnusedSegmentsInInterval("datasource1", interval);
 
     Assert.assertEquals(200, response.getStatus());
@@ -607,7 +611,7 @@ public class DataSourcesResourceTest
     IndexingServiceClient indexingServiceClient = EasyMock.createStrictMock(IndexingServiceClient.class);
     EasyMock.replay(indexingServiceClient, server);
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null);
+        new DataSourcesResource(inventoryView, null, null, indexingServiceClient, null, null);
     try {
       Response response =
           dataSourcesResource.markAsUnusedAllSegmentsOrKillUnusedSegmentsInInterval("datasource", "true", "???");
@@ -630,7 +634,7 @@ public class DataSourcesResourceTest
     Rule loadRule = new IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), null);
     Rule dropRule = new IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null);
+        new DataSourcesResource(inventoryView, null, databaseRuleManager, null, null, null);
 
     // test dropped
     EasyMock.expect(databaseRuleManager.getRulesWithDefault("dataSource1"))
@@ -699,7 +703,7 @@ public class DataSourcesResourceTest
     EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(true).once();
     EasyMock.replay(segmentsMetadataManager);
 
-    DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null);
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString());
     Assert.assertEquals(200, response.getStatus());
@@ -713,7 +717,7 @@ public class DataSourcesResourceTest
     EasyMock.expect(segmentsMetadataManager.markSegmentAsUsed(segment.getId().toString())).andReturn(false).once();
     EasyMock.replay(segmentsMetadataManager);
 
-    DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null);
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(null, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markSegmentAsUsed(segment.getDataSource(), segment.getId().toString());
     Assert.assertEquals(200, response.getStatus());
@@ -734,7 +738,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -757,7 +761,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -780,7 +784,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -803,7 +807,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -821,7 +825,7 @@ public class DataSourcesResourceTest
     EasyMock.replay(segmentsMetadataManager, inventoryView, server);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -835,7 +839,7 @@ public class DataSourcesResourceTest
   public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadNoArguments()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -848,7 +852,7 @@ public class DataSourcesResourceTest
   public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadBothArguments()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -861,7 +865,7 @@ public class DataSourcesResourceTest
   public void testMarkAsUsedNonOvershadowedSegmentsInvalidPayloadEmptyArray()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments(
         "datasource1",
@@ -874,7 +878,7 @@ public class DataSourcesResourceTest
   public void testMarkAsUsedNonOvershadowedSegmentsNoPayload()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null);
     Assert.assertEquals(400, response.getStatus());
@@ -1026,7 +1030,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
     Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity());
@@ -1049,7 +1053,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
     Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
@@ -1074,7 +1078,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
     Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(500, response.getStatus());
     Assert.assertNotNull(response.getEntity());
@@ -1096,7 +1100,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
     Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("numChangedSegments", 1), response.getEntity());
@@ -1119,7 +1123,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
     Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(200, response.getStatus());
     Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity());
@@ -1143,7 +1147,7 @@ public class DataSourcesResourceTest
         new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null);
 
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
     Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload);
     Assert.assertEquals(500, response.getStatus());
     Assert.assertNotNull(response.getEntity());
@@ -1154,7 +1158,7 @@ public class DataSourcesResourceTest
   public void testMarkSegmentsAsUnusedNullPayload()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", null);
     Assert.assertEquals(400, response.getStatus());
@@ -1169,7 +1173,7 @@ public class DataSourcesResourceTest
   public void testMarkSegmentsAsUnusedInvalidPayload()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
         new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null);
@@ -1183,7 +1187,7 @@ public class DataSourcesResourceTest
   public void testMarkSegmentsAsUnusedInvalidPayloadBothArguments()
   {
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null);
+        new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
 
     final DataSourcesResource.MarkDataSourceSegmentsPayload payload =
         new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"), ImmutableSet.of());
@@ -1193,6 +1197,251 @@ public class DataSourcesResourceTest
     Assert.assertNotNull(response.getEntity());
   }
 
+  @Test
+  public void testGetDatasourceLoadstatusForceMetadataRefreshNull()
+  {
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
+    Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", null, null, null, null);
+    Assert.assertEquals(400, response.getStatus());
+  }
+
+  @Test
+  public void testGetDatasourceLoadstatusNoSegmentForInterval()
+  {
+    List<DataSegment> segments = ImmutableList.of();
+    // Test when datasource fully loaded
+    EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq(
+        "datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    EasyMock.replay(segmentsMetadataManager);
+
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(
+        inventoryView,
+        segmentsMetadataManager,
+        null,
+        null,
+        null,
+        null
+    );
+    Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
+    Assert.assertEquals(204, response.getStatus());
+  }
+
+  @Test
+  public void testGetDatasourceLoadstatusDefault()
+  {
+    DataSegment datasource1Segment1 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        10
+    );
+
+    DataSegment datasource1Segment2 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-22/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        20
+    );
+    DataSegment datasource2Segment1 = new DataSegment(
+        "datasource2",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        30
+    );
+    List<DataSegment> segments = ImmutableList.of(datasource1Segment1, datasource1Segment2);
+    Map<SegmentId, SegmentLoadInfo> completedLoadInfoMap = ImmutableMap.of(
+        datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1),
+        datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2),
+        datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1)
+    );
+    Map<SegmentId, SegmentLoadInfo> halfLoadedInfoMap = ImmutableMap.of(
+        datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1)
+    );
+
+    // Test when datasource fully loaded
+    EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once();
+    EasyMock.replay(segmentsMetadataManager, inventoryView);
+
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
+    Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(1, ((Map) response.getEntity()).size());
+    Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
+    Assert.assertEquals(100.0, ((Map) response.getEntity()).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager, inventoryView);
+    EasyMock.reset(segmentsMetadataManager, inventoryView);
+
+    // Test when datasource half loaded
+    EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once();
+    EasyMock.replay(segmentsMetadataManager, inventoryView);
+
+    dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
+    response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, null);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(1, ((Map) response.getEntity()).size());
+    Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
+    Assert.assertEquals(50.0, ((Map) response.getEntity()).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager, inventoryView);
+  }
+
+  @Test
+  public void testGetDatasourceLoadstatusSimple()
+  {
+    DataSegment datasource1Segment1 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        10
+    );
+
+    DataSegment datasource1Segment2 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-22/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        20
+    );
+    DataSegment datasource2Segment1 = new DataSegment(
+        "datasource2",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        30
+    );
+    List<DataSegment> segments = ImmutableList.of(datasource1Segment1, datasource1Segment2);
+    Map<SegmentId, SegmentLoadInfo> completedLoadInfoMap = ImmutableMap.of(
+        datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1),
+        datasource1Segment2.getId(), new SegmentLoadInfo(datasource1Segment2),
+        datasource2Segment1.getId(), new SegmentLoadInfo(datasource2Segment1)
+    );
+    Map<SegmentId, SegmentLoadInfo> halfLoadedInfoMap = ImmutableMap.of(
+        datasource1Segment1.getId(), new SegmentLoadInfo(datasource1Segment1)
+    );
+
+    // Test when datasource fully loaded
+    EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(completedLoadInfoMap).once();
+    EasyMock.replay(segmentsMetadataManager, inventoryView);
+
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
+    Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(1, ((Map) response.getEntity()).size());
+    Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
+    Assert.assertEquals(0, ((Map) response.getEntity()).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager, inventoryView);
+    EasyMock.reset(segmentsMetadataManager, inventoryView);
+
+    // Test when datasource half loaded
+    EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    EasyMock.expect(inventoryView.getSegmentLoadInfos()).andReturn(halfLoadedInfoMap).once();
+    EasyMock.replay(segmentsMetadataManager, inventoryView);
+
+    dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, null);
+    response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, "simple", null);
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(1, ((Map) response.getEntity()).size());
+    Assert.assertTrue(((Map) response.getEntity()).containsKey("datasource1"));
+    Assert.assertEquals(1, ((Map) response.getEntity()).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager, inventoryView);
+  }
+
+  @Test
+  public void testGetDatasourceLoadstatusFull()
+  {
+    DataSegment datasource1Segment1 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-01/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        10
+    );
+
+    DataSegment datasource1Segment2 = new DataSegment(
+        "datasource1",
+        Intervals.of("2010-01-22/P1D"),
+        "",
+        null,
+        null,
+        null,
+        null,
+        0x9,
+        20
+    );
+    List<DataSegment> segments = ImmutableList.of(datasource1Segment1, datasource1Segment2);
+
+    final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+    Object2LongMap<String> tier1 = new Object2LongOpenHashMap<>();
+    tier1.put("datasource1", 0L);
+    Object2LongMap<String> tier2 = new Object2LongOpenHashMap<>();
+    tier2.put("datasource1", 3L);
+    underReplicationCountsPerDataSourcePerTier.put("tier1", tier1);
+    underReplicationCountsPerDataSourcePerTier.put("tier2", tier2);
+
+    // Test when datasource fully loaded
+    EasyMock.expect(segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(EasyMock.eq("datasource1"), EasyMock.anyObject(Interval.class), EasyMock.anyBoolean()))
+            .andReturn(Optional.of(segments)).once();
+    DruidCoordinator druidCoordinator = EasyMock.createMock(DruidCoordinator.class);
+    EasyMock.expect(druidCoordinator.computeUnderReplicationCountsPerDataSourcePerTierForSegments(segments))
+            .andReturn(underReplicationCountsPerDataSourcePerTier).once();
+
+    EasyMock.replay(segmentsMetadataManager, druidCoordinator);
+
+    DataSourcesResource dataSourcesResource = new DataSourcesResource(inventoryView, segmentsMetadataManager, null, null, null, druidCoordinator);
+    Response response = dataSourcesResource.getDatasourceLoadstatus("datasource1", true, null, null, "full");
+    Assert.assertEquals(200, response.getStatus());
+    Assert.assertNotNull(response.getEntity());
+    Assert.assertEquals(2, ((Map) response.getEntity()).size());
+    Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier1")).size());
+    Assert.assertEquals(1, ((Map) ((Map) response.getEntity()).get("tier2")).size());
+    Assert.assertEquals(0L, ((Map) ((Map) response.getEntity()).get("tier1")).get("datasource1"));
+    Assert.assertEquals(3L, ((Map) ((Map) response.getEntity()).get("tier2")).get("datasource1"));
+    EasyMock.verify(segmentsMetadataManager);
+  }
+
   private DruidServerMetadata createRealtimeServerMetadata(String name)
   {
     return createServerMetadata(name, ServerType.REALTIME);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org