You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by GitBox <gi...@apache.org> on 2020/06/02 04:13:08 UTC

[GitHub] [druid] maytasm opened a new pull request #9965: API to verify a datasource has the latest ingested data

maytasm opened a new pull request #9965:
URL: https://github.com/apache/druid/pull/9965


   API to verify a datasource has the latest ingested data
   
   ### Description
   
   This PR address https://github.com/apache/druid/issues/5721
   
   The existing loadstatus API reads segments from SqlSegmentsMetadataManager of the Coordinator which caches segments in memory and periodically updates them. Hence, there can be a race condition as this API implementation compares segments metadata from the mentioned cache with published segments in historicals. Particularly, when there is a new ingestion after the initial load of the datasource, the cache still only contains the metadata of old segments. The API would compares list of old segments with what is published by historical and returns that everything is available when the new segments are not actually available yet. 
   
   This new API will fix this problem. The new API will be able to do the following:
   - new api takes in datasource. This will returns false if any used segment (of the past 2 weeks) of the given datasource are not available to be query (i.e. not loaded onto historical yet). Return true otherwise. The interval of 2 weeks above is not finalized yet. We can decide later what is a good default number
   
   - (same) new api takes in datasource and a time interval (start + end): This will returns false if any used segment (between the given start and given end time) of the given datasource are not available to be query (i.e. not loaded onto historical yet). Return true otherwise.
   
   Note that the above are both the same API. The time interval is an optional parameter. The time interval referred above is the timestamp of the data in the segment (nothing to do with when the segment is ingested). This can be the same time interval as the time interval the user want to query data from. Basically if the user wants to query from x to y then they can call this new api with the datasource and time interval x to y. This will ensure that all segments of the datasource for the timestamp from x to y is ready to be query (loaded onto historical).
   
   Important differencees between this API from the existing coordinator loadstatus API:
   - Takes datasource (required) to be able to check faster (iterate smaller number of segments)
   - Takes interval (optional) to be able to check faster (iterate smaller number of segments)
   - **IMPORATANT**. Takes boolean firstCheck. If this is true, this will force poll the metadata source to get latest published segment information.
   
   The workflow will be :
   
   1) submit ingestion task
   
   2) poll task api until task succeeded
   
   3) poll the new api with datasource, interval, and firstCheck=true once. If false, go to step 4, otherwise the data is available and user can query.
   
   4) poll the new api with datasource, interval, and firstCheck=false until return true. After true, data is available and user can query.
   
   This PR has:
   - [x] been self-reviewed.
      - [ ] using the [concurrency checklist](https://github.com/apache/druid/blob/master/dev/code-review/concurrency.md) (Remove this item if the PR doesn't have any relation to concurrency.)
   - [ ] added documentation for new or modified features or behaviors.
   - [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
   - [ ] added or updated version, license, or notice information in [licenses.yaml](https://github.com/apache/druid/blob/master/licenses.yaml)
   - [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
   - [ ] added unit tests or modified existing tests to cover new code paths.
   - [ ] added integration tests.
   - [ ] been tested in a test Druid cluster.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r437818716



##########
File path: server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
##########
@@ -403,11 +425,16 @@ private void awaitOrPerformDatabasePoll()
   }
 
   /**
-   * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is
-   * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise,
-   * meaning that a new on-demand database poll should be initiated.
+   * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * made not longer than {@link #periodicPollDelay} from current time.
+   * This method does wait untill completion for if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * alrady in the process of polling the database.
+   * This means that any method using this check can read from snapshot that is
+   * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old.
    */
-  private boolean awaitLatestDatabasePoll()
+  private boolean useLatestSnapshotIfWithinDelay()

Review comment:
       Just saying awaitLatestDatabasePoll is misleading as it does not always guarantee wait on latest database poll. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r437817506



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +393,43 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("firstCheck") @Nullable final Boolean firstCheck
+  )
+  {
+    if (serverInventoryView == null || serverInventoryView.getSegmentLoadInfos() == null) {
+      return Response.ok(ImmutableMap.of("loaded", false)).build();
+    }
+    // Force poll
+    Interval theInterval = interval == null ? Intervals.ETERNITY : Intervals.of(interval);
+    boolean requiresMetadataStorePoll = firstCheck == null ? true :firstCheck;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);

Review comment:
       serverInventoryView and segmentLoadInfos are never null




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439700231



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r441224367



##########
File path: docs/ingestion/faq.md
##########
@@ -66,6 +66,18 @@ Other common reasons that hand-off fails are as follows:
 
 Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) .
 
+## How do I know when I can make query to Druid after submitting ingestion task?

Review comment:
       Yes. Updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r440541949



##########
File path: server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -256,15 +256,24 @@ public boolean isLeader()
    * @return tier -> { dataSource -> underReplicationCount } map
    */
   public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
+  {
+    final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
+    return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments);
+  }
+
+  /**
+   * @return tier -> { dataSource -> underReplicationCount } map
+   */
+  public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegments(

Review comment:
       Keeping the `segmentReplicantLookup` as possibly stale for now. This still ensures that we will never return true (available) when it’s false (not available) since we force refresh metadata. Although we can return false (not available) when it’s true (available) for up to a period of coordinator run longer for the full format response. This problem will only affects the full format. We can loop back to this if we find that having the option to force refresh the  `segmentReplicantLookup` is useful. If that is the case then we can use the existing query param, forceMetadataRefresh, to force refresh `segmentReplicantLookup` too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439707431



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {
+          if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) {

Review comment:
       Since this is a bug in the original Coordinator loadstatus API. I'll fix this in a follow-up PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439179110



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;

Review comment:
       I really don't think this should default to `true` since it is a heavy operation, but also, since the docs recommend polling the API with the not default option to determine when your segments are all available and only calling with the default option once.

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode

Review comment:
       resposne -> response

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {
+          if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) {
+            continue;
+          }
+          ((LoadRule) rule)
+              .getTieredReplicants()
+              .forEach((final String tier, final Integer ruleReplicants) -> {
+                Integer currentReplicantsRetVal = segmentsInCluster.get(segment.getId(), tier);
+                int currentReplicants = currentReplicantsRetVal == null ? 0 : currentReplicantsRetVal;
+                Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
+                    .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
+                ((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
+                    .addTo(dataSourceName, Math.max(ruleReplicants - currentReplicants, 0));
+              });
+          break; // only the first matching rule applies
+        }
+      }
+      return Response.ok(underReplicationCountsPerDataSourcePerTier).build();
+    } else {
+      // Calculate resposne for default mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUsedSegments = 0;

Review comment:
       super nit: it would probably be consistent to try to match the terminology that appears in the system segments table, which uses 'published' and 'available' for used and loaded
   *  `numUsedSegments` -> `numPublishedSegments`
   *  `numUnloadedSegments` -> `numUnavailableSegments`

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {

Review comment:
       It seems like there is quite a lot of logic in this API entry point method, it would be worth breaking each of these blocks out into methods dedicated for each response so it's a bit easier to follow

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {
+          if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) {
+            continue;
+          }
+          ((LoadRule) rule)
+              .getTieredReplicants()
+              .forEach((final String tier, final Integer ruleReplicants) -> {
+                Integer currentReplicantsRetVal = segmentsInCluster.get(segment.getId(), tier);
+                int currentReplicants = currentReplicantsRetVal == null ? 0 : currentReplicantsRetVal;
+                Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
+                    .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
+                ((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
+                    .addTo(dataSourceName, Math.max(ruleReplicants - currentReplicants, 0));
+              });
+          break; // only the first matching rule applies
+        }
+      }
+      return Response.ok(underReplicationCountsPerDataSourcePerTier).build();
+    } else {
+      // Calculate resposne for default mode

Review comment:
       resposne -> response

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode

Review comment:
       resposne -> response

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {

Review comment:
       Hmm, is there a way to re-arrange this without iterating the entire set of segments twice? If not it would maybe be worth pushing this into `DruidCoordinator`, at least if force refresh is true, since it potentially has `segmentReplicantLookup` already built, or exposing it to this resource in some manner.

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {
+          if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) {

Review comment:
       Yeah, I think you need to handle `BroadcastDistributionRule` here too if you want to be totally complete, however `CoordinatorResource` loadstatus api call has this problem too, so it would probably be ok to fix both in a follow-up PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439707525



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r440538226



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,44 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading by Datasource
+
+You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following APIs.
+An example workflow for this is:

Review comment:
       I think this example is useful. Especially when user is trying to verify if segments are loaded and available right after ingestion task is done. The workflow also point out the difference with the coordinator loadstatus API (and how they have different use case). I think the https://druid.apache.org/docs/latest/ingestion/faq.html might be a better place for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r437818847



##########
File path: server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
##########
@@ -113,6 +114,18 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set<String> segmentIds)
    */
   Iterable<DataSegment> iterateAllUsedSegments();
 
+  /**
+   * Returns an iterable to go over all used and non-overshadowed segments of given data sources over given interval.
+   * The order in which segments are iterated is unspecified.
+   * If {@param requiresLatest} is true then a force metadatastore poll will be triggered. This can cause a longer
+   * response time but will ensure that the latest segment information (at the time this method is called) is returned.
+   * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll
+   * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used.
+   */
+  Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource,

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439700084



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439700125



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r437818519



##########
File path: server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
##########
@@ -403,11 +425,16 @@ private void awaitOrPerformDatabasePoll()
   }
 
   /**
-   * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is
-   * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise,
-   * meaning that a new on-demand database poll should be initiated.
+   * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * made not longer than {@link #periodicPollDelay} from current time.
+   * This method does wait untill completion for if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * alrady in the process of polling the database.
+   * This means that any method using this check can read from snapshot that is
+   * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old.
    */
-  private boolean awaitLatestDatabasePoll()
+  private boolean useLatestSnapshotIfWithinDelay()

Review comment:
       I really dont like the old name. This method most of the time does not wait (even though the method is called await). For example, if the latest poll is PeriodicDatabasePoll, it will never wait and just return the last poll. Even if there is a on-going PeriodicDatabasePoll, it does not wait and return the last poll. If the latest poll is OnDemandDatabasePoll, it will only wait if the latest is older than pollPeriod. This means that most of the time this method does not await and return last poll.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439136963



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {

Review comment:
       Probably will have to do something different for the BroadcastDistributionRule




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r441157983



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +397,131 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    if (forceMetadataRefresh == null) {
+      return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("Invalid request. forceMetadataRefresh must be specified")
+          .build();
+    }
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;

Review comment:
       Could be defined as a static final.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439136134



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {

Review comment:
       Note to self: Check how this works with the boardcast rule




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439707410



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {
+          if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) {
+            continue;
+          }
+          ((LoadRule) rule)
+              .getTieredReplicants()
+              .forEach((final String tier, final Integer ruleReplicants) -> {
+                Integer currentReplicantsRetVal = segmentsInCluster.get(segment.getId(), tier);
+                int currentReplicants = currentReplicantsRetVal == null ? 0 : currentReplicantsRetVal;
+                Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
+                    .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
+                ((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
+                    .addTo(dataSourceName, Math.max(ruleReplicants - currentReplicants, 0));
+              });
+          break; // only the first matching rule applies
+        }
+      }
+      return Response.ok(underReplicationCountsPerDataSourcePerTier).build();
+    } else {
+      // Calculate resposne for default mode

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439700716



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 
+If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
+4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. 
+Continue polling until all segments are loaded. Once all segments are loaded you can now query the data.
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true`
+will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given.
+If no used segments found for the given inputs, this API returns 100% as the value.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on pull request #9965:
URL: https://github.com/apache/druid/pull/9965#issuecomment-643567018


   > The doc looks basically good. I've added some suggestions, but all minor rewording issues.
   > 
   > One thought, did you consider putting this content lower on the page, with the other datasource APIs? Then you'd get the interval explanation in that section with it. It now comes after the cluster-wide segment loading API section, which has that advantage, but you could always link from there to the new datasource segment loading section. In fact, maybe the existing https://druid.apache.org/docs/latest/operations/api-reference.html#segment-loading section should be renamed to "Segment loading by cluster".
   
   I think it is better to keep this API with the Segment Loading section. This is because...
   1) The person using this API is probably more interested in knowing if segments have been loaded or not rather than the metadata information of the datasource. 
   2) The response of this API contains no information related to the metadata information of the datasource at all. The response only contains information regarding the progress of the segment loading. 
   3) The interval explanation does not apply to this API. The interval explanation in the datasource APIs section is regarding interval in the URI path. Here, the interval is use as a query param.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439136134



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {

Review comment:
       Note to self: Check how this works with the boardcast rule

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {

Review comment:
       Probably will have to do something different for the BroadcastDistributionRule




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439701586



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;

Review comment:
       The reason I have the default as true is to prevent operator mistake by forgetting to set to true on the first call. Basically, you will get the correct result if you make every call with true. However, you will not get the correct result (and can be making query when segments are not yet loaded!) if you make every call with false. Hence, having default to true to a safer option imo.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] sthetland commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
sthetland commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r438362630



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource

Review comment:
       ```suggestion
   #### Segment Loading by Datasource
   ```

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 
+If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
+4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. 
+Continue polling until all segments are loaded. Once all segments are loaded you can now query the data.
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true`

Review comment:
       Maybe indicate that true is the default here? and remove the sentence that comes later ("`forceMetadataRefresh` will be set to true if not given.")
   
   ```suggestion
   over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh` to true (the default)
   ```

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.

Review comment:
       ```suggestion
   You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following APIs.
   ```

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task

Review comment:
       ```suggestion
   1. Submit your ingestion task.
   ```

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 
+If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
+4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. 
+Continue polling until all segments are loaded. Once all segments are loaded you can now query the data.
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true`
+will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given.
+If no used segments found for the given inputs, this API returns 100% as the value.
+
+ * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). This does not include replication. Setting `forceMetadataRefresh=true` 

Review comment:
       "This does not include replication" Meaning, replicated segments?  

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 

Review comment:
       maybe call it loadstatus? 
   ```suggestion
   3. Poll the datasource loadstatus API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 
   ```

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 
+If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
+4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. 
+Continue polling until all segments are loaded. Once all segments are loaded you can now query the data.
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true`
+will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given.
+If no used segments found for the given inputs, this API returns 100% as the value.

Review comment:
       Maybe add the positive case as well, if this is the case? 
   ```suggestion
   If all segments have been loaded or no used segments are found for the given inputs, this API returns 100% as the value.
   ```

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.

Review comment:
       ```suggestion
   2. Repeatedly poll the Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439700651



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 
+If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
+4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. 
+Continue polling until all segments are loaded. Once all segments are loaded you can now query the data.
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true`

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] jihoonson commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
jihoonson commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r441125726



##########
File path: docs/ingestion/faq.md
##########
@@ -66,6 +66,18 @@ Other common reasons that hand-off fails are as follows:
 
 Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) .
 
+## How do I know when I can make query to Druid after submitting ingestion task?
+
+You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following workflow.
+1. Submit your ingestion task.
+2. Repeatedly poll the [Overlord's tasks API](../operations/api-reference.md#tasks) ( `/druid/indexer/v1/task/{taskId}/status`) until your task is shown to be successfully completed.
+3. Poll the [Segment Loading by Datasource API](../operations/api-reference.md#segment-loading-by-datasource) (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with 
+`forceMetadataRefresh=true` and `interval=<INTERVAL_OF_INGESTED_DATA>` once.

Review comment:
       I think it would be nice to warn one more time here what will happen with `forceMetadataRefresh=true`. It could also be mentioned that this API will refresh not only the specified datasource, but all datasources.

##########
File path: docs/ingestion/faq.md
##########
@@ -66,6 +66,18 @@ Other common reasons that hand-off fails are as follows:
 
 Make sure to include the `druid-hdfs-storage` and all the hadoop configuration, dependencies (that can be obtained by running command `hadoop classpath` on a machine where hadoop has been setup) in the classpath. And, provide necessary HDFS settings as described in [deep storage](../dependencies/deep-storage.md) .
 
+## How do I know when I can make query to Druid after submitting ingestion task?

Review comment:
       I think this applies to only batch ingestion. In streaming ingestion, each row becomes queryable once it's consumed by a realtime task.

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,35 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading by Datasource
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given 
+datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. 

Review comment:
       Same for other APIs.

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,35 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading by Datasource
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given 
+datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. 

Review comment:
       It would be nice to mention that it will refresh all datasources here too.

##########
File path: server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
##########
@@ -403,11 +427,17 @@ private void awaitOrPerformDatabasePoll()
   }
 
   /**
-   * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is
-   * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise,
-   * meaning that a new on-demand database poll should be initiated.
+   * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * made not longer than {@link #periodicPollDelay} from current time.
+   * This method does wait untill completion for if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * alrady in the process of polling the database.

Review comment:
       typo: alrady -> already




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r441227082



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +397,131 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("forceMetadataRefresh") final Boolean forceMetadataRefresh,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    if (forceMetadataRefresh == null) {
+      return Response
+          .status(Response.Status.BAD_REQUEST)
+          .entity("Invalid request. forceMetadataRefresh must be specified")
+          .build();
+    }
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm merged pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm merged pull request #9965:
URL: https://github.com/apache/druid/pull/9965


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r437817099



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +393,43 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("firstCheck") @Nullable final Boolean firstCheck

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439707520



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {
+        for (final Rule rule : rules) {
+          if (!(rule instanceof LoadRule && rule.appliesTo(segment, now))) {
+            continue;
+          }
+          ((LoadRule) rule)
+              .getTieredReplicants()
+              .forEach((final String tier, final Integer ruleReplicants) -> {
+                Integer currentReplicantsRetVal = segmentsInCluster.get(segment.getId(), tier);
+                int currentReplicants = currentReplicantsRetVal == null ? 0 : currentReplicantsRetVal;
+                Object2LongMap<String> underReplicationPerDataSource = underReplicationCountsPerDataSourcePerTier
+                    .computeIfAbsent(tier, ignored -> new Object2LongOpenHashMap<>());
+                ((Object2LongOpenHashMap<String>) underReplicationPerDataSource)
+                    .addTo(dataSourceName, Math.max(ruleReplicants - currentReplicants, 0));
+              });
+          break; // only the first matching rule applies
+        }
+      }
+      return Response.ok(underReplicationCountsPerDataSourcePerTier).build();
+    } else {
+      // Calculate resposne for default mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUsedSegments = 0;

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439700161



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r441226736



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,35 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading by Datasource
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given 
+datasource over the given interval (or last 2 weeks if interval is not given). `forceMetadataRefresh` is required to be set. 
+Setting `forceMetadataRefresh` to true will force the coordinator to poll latest segment metadata from the metadata store. 

Review comment:
       Done

##########
File path: server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
##########
@@ -403,11 +427,17 @@ private void awaitOrPerformDatabasePoll()
   }
 
   /**
-   * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is
-   * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise,
-   * meaning that a new on-demand database poll should be initiated.
+   * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * made not longer than {@link #periodicPollDelay} from current time.
+   * This method does wait untill completion for if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * alrady in the process of polling the database.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on pull request #9965:
URL: https://github.com/apache/druid/pull/9965#issuecomment-643183427


   Note to self: iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval probably deserves a similar disclaimer in javadocs as iterateAllUsedSegments suggesting that is potentially not cheap to iterate probably so do so with caree


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r440497521



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,44 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading by Datasource
+
+You can verify if segments created by a recent ingestion task are loaded onto historicals and available for querying using the following APIs.
+An example workflow for this is:

Review comment:
       This example workflow seems useful, but also out of place given the rest of this document, which strictly documents API requests and responses instead of illustrating an example application of how to use the API. I'm not sure where else it should live, or if it should just be removed, or if maybe it is fine, but I think it is worth discussing.

##########
File path: docs/operations/api-reference.md
##########
@@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment
 
  * `/druid/coordinator/v1/loadstatus?simple`
 
-Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication.
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replica of segments.
 
 * `/druid/coordinator/v1/loadstatus?full`
 
-Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes replication.
+Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This include replica of segments.

Review comment:
       "This include replica of segments." doesn't quite seem right, maybe "This includes segment replication counts."

##########
File path: server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java
##########
@@ -256,15 +256,24 @@ public boolean isLeader()
    * @return tier -> { dataSource -> underReplicationCount } map
    */
   public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTier()
+  {
+    final Iterable<DataSegment> dataSegments = segmentsMetadataManager.iterateAllUsedSegments();
+    return computeUnderReplicationCountsPerDataSourcePerTierForSegments(dataSegments);
+  }
+
+  /**
+   * @return tier -> { dataSource -> underReplicationCount } map
+   */
+  public Map<String, Object2LongMap<String>> computeUnderReplicationCountsPerDataSourcePerTierForSegments(

Review comment:
       I think it's ok that `segmentReplicantLookup` could potentially be stale since it is only updated on coordinator runs, since the refreshed segment metadata would at least ensure that the stale data would be under counting replication levels, rather than potentially falsely reporting that everything is available (when forcing refresh).

##########
File path: docs/operations/api-reference.md
##########
@@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment
 
  * `/druid/coordinator/v1/loadstatus?simple`
 
-Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication.
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replica of segments.

Review comment:
       "This does not include replica of segments." -> "This does not include segment replication counts." 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439700145



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r440537206



##########
File path: docs/operations/api-reference.md
##########
@@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment
 
  * `/druid/coordinator/v1/loadstatus?simple`
 
-Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication.
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replica of segments.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439700894



##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 
+If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
+4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. 
+Continue polling until all segments are loaded. Once all segments are loaded you can now query the data.
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true`
+will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given.
+If no used segments found for the given inputs, this API returns 100% as the value.
+
+ * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). This does not include replication. Setting `forceMetadataRefresh=true` 

Review comment:
       Yes.
   Maybe "This does not include replica of segments"

##########
File path: docs/operations/api-reference.md
##########
@@ -114,6 +114,41 @@ Returns the number of segments to load and drop, as well as the total segment lo
 
 Returns the serialized JSON of segments to load and drop for each Historical process.
 
+
+#### Segment Loading for Datasource
+
+These APIs can be used to verify if segments created by recent ingestion task are loaded onto historicals and available for query.
+An example workflow for this is:
+1. Submit your ingestion task
+2. Repeatedly poll Overlord's task API ( `/druid/indexer/v1/task/{taskId}/status`) until task is completed and succeeded.
+3. Poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=true` once. 
+If there are segments not yet loaded, continue to step 4, otherwise you can now query the data.
+4. Repeatedly poll Segment Loading for Datasource API (`/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus`) with `forceMetadataRefresh=false`. 
+Continue polling until all segments are loaded. Once all segments are loaded you can now query the data.
+
+##### GET
+
+* `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the percentage of segments actually loaded in the cluster versus segments that should be loaded in the cluster for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). Setting `forceMetadataRefresh=true`
+will force the coordinator to poll latest segment metadata from the metadata store. `forceMetadataRefresh` will be set to true if not given.
+If no used segments found for the given inputs, this API returns 100% as the value.
+
+ * `/druid/coordinator/v1/datasources/{dataSourceName}/loadstatus?simple&forceMetadataRefresh={boolean}&interval={myInterval}`
+
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for the given datasource 
+over the given interval (or last 2 weeks if interval is not given). This does not include replication. Setting `forceMetadataRefresh=true` 

Review comment:
       Yes.
   Maybe "This does not include replica of segments"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439701625



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;

Review comment:
       Setting to false is an optional optimization that can be done on the calls after the first call.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439707193



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;

Review comment:
       Making this field mandatory instead. 400 bad request will be returned if forceMetadataRefresh is not given. This will make sure user read and understand the doc when using this API. To use this API properly, you will have to change the flag in the flow between first call and subsequent call. Hence, there is no “default” as the flag are for different cases. Basically both flags are as default as the other flag and are needed for different step in the flow.

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439707213



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r439707391



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +396,123 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("forceMetadataRefresh") @Nullable final Boolean forceMetadataRefresh,
+      @QueryParam("simple") @Nullable final String simple,
+      @QueryParam("full") @Nullable final String full
+  )
+  {
+    final Interval theInterval;
+    if (interval == null) {
+      long defaultIntervalOffset = 14 * 24 * 60 * 60 * 1000;
+      long currentTimeInMs = System.currentTimeMillis();
+      theInterval = Intervals.utc(currentTimeInMs - defaultIntervalOffset, currentTimeInMs);
+    } else {
+      theInterval = Intervals.of(interval.replace('_', '/'));
+    }
+
+    boolean requiresMetadataStorePoll = forceMetadataRefresh == null ? true : forceMetadataRefresh;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);
+    }
+
+    if (simple != null) {
+      // Calculate resposne for simple mode
+      Map<SegmentId, SegmentLoadInfo> segmentLoadInfos = serverInventoryView.getSegmentLoadInfos();
+      int numUnloadedSegments = 0;
+      for (DataSegment segment : segments.get()) {
+        if (!segmentLoadInfos.containsKey(segment.getId())) {
+          numUnloadedSegments++;
+        }
+      }
+      return Response.ok(
+          ImmutableMap.of(
+              dataSourceName,
+              numUnloadedSegments
+          )
+      ).build();
+    } else if (full != null) {
+      // Calculate resposne for full mode
+      final Map<String, Object2LongMap<String>> underReplicationCountsPerDataSourcePerTier = new HashMap<>();
+      final List<Rule> rules = metadataRuleManager.getRulesWithDefault(dataSourceName);
+      final Table<SegmentId, String, Integer> segmentsInCluster = HashBasedTable.create();
+      final DateTime now = DateTimes.nowUtc();
+
+      for (DataSegment segment : segments.get()) {
+        for (DruidServer druidServer : serverInventoryView.getInventory()) {
+          String tier = druidServer.getTier();
+          SegmentId segmentId = segment.getId();
+          DruidDataSource druidDataSource = druidServer.getDataSource(dataSourceName);
+          if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) {
+            Integer numReplicants = segmentsInCluster.get(segmentId, tier);
+            if (numReplicants == null) {
+              numReplicants = 0;
+            }
+            segmentsInCluster.put(segmentId, tier, numReplicants + 1);
+          }
+        }
+      }
+      for (DataSegment segment : segments.get()) {

Review comment:
       Removed this code in DataSourcesResource. Reuse the code for calculating the underReplicationCountsPerDataSourcePerTier in DruidCoordinator by making the call to DruidCoordinator. This basically reuse segmentReplicantLookup in DruidCoordinator. This can make sure that the behavior is consistent between the full format of the new API and the existing coordinator loadstatus API. For example, if there is a bug in the full format coordinator loadstatus API where it is ignoring broadcast rule, then we just have to remember to fix it in one place ;)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] clintropolis commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
clintropolis commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r434946314



##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +393,43 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("firstCheck") @Nullable final Boolean firstCheck
+  )
+  {
+    if (serverInventoryView == null || serverInventoryView.getSegmentLoadInfos() == null) {
+      return Response.ok(ImmutableMap.of("loaded", false)).build();
+    }
+    // Force poll
+    Interval theInterval = interval == null ? Intervals.ETERNITY : Intervals.of(interval);
+    boolean requiresMetadataStorePoll = firstCheck == null ? true :firstCheck;
+
+    Optional<Iterable<DataSegment>> segments = segmentsMetadataManager.iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
+        dataSourceName,
+        theInterval,
+        requiresMetadataStorePoll
+    );
+
+    if (!segments.isPresent()) {
+      return logAndCreateDataSourceNotFoundResponse(dataSourceName);

Review comment:
       It seems a little off that not having `serverInventoryView` or it not being initialized would return a `{"loaded":false}` but a datasource not existing would be an empty response. I think this response is fine, but maybe the `serverInventoryView == null` case should be an error response indicating that information in unavailable

##########
File path: server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
##########
@@ -391,6 +393,43 @@ public Response getServedSegmentsInInterval(
     return getServedSegmentsInInterval(dataSourceName, full != null, theInterval::contains);
   }
 
+  @GET
+  @Path("/{dataSourceName}/loadstatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ResourceFilters(DatasourceResourceFilter.class)
+  public Response getDatasourceLoadstatus(
+      @PathParam("dataSourceName") String dataSourceName,
+      @QueryParam("interval") @Nullable final String interval,
+      @QueryParam("firstCheck") @Nullable final Boolean firstCheck

Review comment:
       I think this parameter should be something like `forceMetadataPoll` or `forceMetadataRefresh` something

##########
File path: server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java
##########
@@ -403,11 +425,16 @@ private void awaitOrPerformDatabasePoll()
   }
 
   /**
-   * If the latest {@link DatabasePoll} is a {@link PeriodicDatabasePoll}, or an {@link OnDemandDatabasePoll} that is
-   * made not longer than {@link #periodicPollDelay} from now, awaits for it and returns true; returns false otherwise,
-   * meaning that a new on-demand database poll should be initiated.
+   * This method returns true without waiting for database poll if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * made not longer than {@link #periodicPollDelay} from current time.
+   * This method does wait untill completion for if the latest {@link DatabasePoll} is a
+   * {@link PeriodicDatabasePoll} that has not completed it's first poll, or an {@link OnDemandDatabasePoll} that is
+   * alrady in the process of polling the database.
+   * This means that any method using this check can read from snapshot that is
+   * up to {@link SqlSegmentsMetadataManager#periodicPollDelay} old.
    */
-  private boolean awaitLatestDatabasePoll()
+  private boolean useLatestSnapshotIfWithinDelay()

Review comment:
       nit: i think the old name was better, and still works with the new method being named `forceOrWaitOngoingDatabasePoll`

##########
File path: server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java
##########
@@ -113,6 +114,18 @@ int markAsUsedNonOvershadowedSegments(String dataSource, Set<String> segmentIds)
    */
   Iterable<DataSegment> iterateAllUsedSegments();
 
+  /**
+   * Returns an iterable to go over all used and non-overshadowed segments of given data sources over given interval.
+   * The order in which segments are iterated is unspecified.
+   * If {@param requiresLatest} is true then a force metadatastore poll will be triggered. This can cause a longer
+   * response time but will ensure that the latest segment information (at the time this method is called) is returned.
+   * If {@param requiresLatest} is false then segment information from stale snapshot of up to the last periodic poll
+   * period {@link SqlSegmentsMetadataManager#periodicPollDelay} will be used.
+   */
+  Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(String datasource,

Review comment:
       nit: formatting seems strange here, i suggest: 
   ```
     Optional<Iterable<DataSegment>> iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval(
         String datasource,
         Interval interval,
         boolean requiresLatest
     );
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm removed a comment on pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm removed a comment on pull request #9965:
URL: https://github.com/apache/druid/pull/9965#issuecomment-643183427


   Note to self: iterateAllUsedNonOvershadowedSegmentsForDatasourceInterval probably deserves a similar disclaimer in javadocs as iterateAllUsedSegments suggesting that is potentially not cheap to iterate probably so do so with caree


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org


[GitHub] [druid] maytasm commented on a change in pull request #9965: API to verify a datasource has the latest ingested data

Posted by GitBox <gi...@apache.org>.
maytasm commented on a change in pull request #9965:
URL: https://github.com/apache/druid/pull/9965#discussion_r440536991



##########
File path: docs/operations/api-reference.md
##########
@@ -96,11 +96,11 @@ Returns the percentage of segments actually loaded in the cluster versus segment
 
  * `/druid/coordinator/v1/loadstatus?simple`
 
-Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replication.
+Returns the number of segments left to load until segments that should be loaded in the cluster are available for queries. This does not include replica of segments.
 
 * `/druid/coordinator/v1/loadstatus?full`
 
-Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This includes replication.
+Returns the number of segments left to load in each tier until segments that should be loaded in the cluster are all available. This include replica of segments.

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org