You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/09 15:44:41 UTC

[GitHub] [pinot] saurabhd336 opened a new pull request, #9356: Allow setting custom time boundary for hybrid table queries

saurabhd336 opened a new pull request, #9356:
URL: https://github.com/apache/pinot/pull/9356

   Instructions:
   1. The PR has to be tagged with at least one of the following labels (*):
      1. `feature`
      2. `bugfix`
      3. `performance`
      4. `ui`
      5. `backward-incompat`
      6. `release-notes` (**)
   2. Remove these instructions before publishing the PR.
    
   (*) Other labels to consider:
   - `testing`
   - `dependencies`
   - `docker`
   - `kubernetes`
   - `observability`
   - `security`
   - `code-style`
   - `extension-point`
   - `refactor`
   - `cleanup`
   
   (**) Use `release-notes` label for scenarios like:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r981049829


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/enforceQueryTimeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/validate";

Review Comment:
   Could you elaborate this one? You mean not have the validate API return the maxTimestamps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r981044531


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -105,6 +106,14 @@ public TimeBoundaryManager(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecor
   @SuppressWarnings("unused")
   public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
     // Bulk load time info for all online segments
+    String enforcedTimeBoundary = idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r985385484


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,
+      boolean idealStateReffered) {

Review Comment:
   Ack



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,

Review Comment:
   Ack



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,

Review Comment:
   Ack



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,
+      boolean idealStateReffered) {
+    TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+    Long finalTimeBoundaryMs = null;
+    boolean isEnforced = false;
+    boolean validTimeBoundaryFound = false;
+
+    if (enforcedTimeBoundary != null) {

Review Comment:
   I couldn't seem to find a way to simplify this without using 2 different TimeBoundaryInfo instances for explicitly set and derived time boundaries. Have added comments for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r981047279


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/enforceQueryTimeBoundary")

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r976307842


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -731,4 +730,8 @@ public static class Range {
       public static final String UPPER_UNBOUNDED = DELIMITER + UNBOUNDED + UPPER_EXCLUSIVE;
     }
   }
+
+  public static class IdealState {
+    public static final String QUERY_TIME_BOUNDARY = "query.time.boundary";

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r985368922


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -822,4 +821,8 @@ public static class Range {
       public static final String UPPER_UNBOUNDED = DELIMITER + UNBOUNDED + UPPER_EXCLUSIVE;
     }
   }
+
+  public static class IdealState {
+    public static final String QUERY_TIME_BOUNDARY = "HYBRID_TABLE_TIME_BOUNDARY";

Review Comment:
   Ack



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -20,7 +20,6 @@
 
 import java.io.File;
 
-

Review Comment:
   Ack



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +526,59 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/allSegmentsLoaded")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Validates if the ideal state matches with the segmentstate on this server", notes =
+      "Validates if the ideal state matches with the segmentstate on this server")

Review Comment:
   Ack



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +526,59 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/allSegmentsLoaded")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Validates if the ideal state matches with the segmentstate on this server", notes =
+      "Validates if the ideal state matches with the segmentstate on this server")
+  public TableSegmentValidationInfo validateTableSegmentState(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableNameWithType")
+          String tableNameWithType) {
+    // Get table current ideal state
+    IdealState tableIdealState = HelixHelper.getTableIdealState(_serverInstance.getHelixManager(), tableNameWithType);
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+
+    // Validate segments in idealstate which belong to this server
+    long maxEndTime = -1;
+    Map<String, Map<String, String>> instanceStatesMap = tableIdealState.getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> kv : instanceStatesMap.entrySet()) {
+      String segmentName = kv.getKey();
+      if (kv.getValue().containsKey(_instanceId)) {
+        // Segment hosted by this server. Validate segment state
+        SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+        try {
+          String segmentState = kv.getValue().get(_instanceId);
+
+          switch (segmentState) {
+            case CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING:
+              // Only validate presence of segment
+              if (segmentDataManager == null) {
+                return new TableSegmentValidationInfo(false, -1);
+              }
+              break;
+            case CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE:
+              // Validate segment CRC
+              SegmentZKMetadata zkMetadata =
+                  ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(),
+                      tableNameWithType, segmentName);

Review Comment:
   Ack



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java:
##########
@@ -21,10 +21,12 @@
 public class TimeBoundaryInfo {
   private final String _timeColumn;
   private final String _timeValue;
+  private final boolean _enforced;

Review Comment:
   Ack



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
+          assert is != null;
+          is.getRecord().setSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY, timeBoundaryFinal);
+          return is;
+        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not update time boundary",
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+
+    return new SuccessResponse("Time boundary updated successfully to " + timeBoundaryFinal);
+  }
+
+  @DELETE
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Delete hybrid table query time boundary", notes = "Delete hybrid table query time boundary")
+  public SuccessResponse deleteEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(

Review Comment:
   Ack. Added check for offline table ideal state not being null.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
+          assert is != null;
+          is.getRecord().setSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY, timeBoundaryFinal);
+          return is;
+        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not update time boundary",
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+
+    return new SuccessResponse("Time boundary updated successfully to " + timeBoundaryFinal);
+  }
+
+  @DELETE
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Delete hybrid table query time boundary", notes = "Delete hybrid table query time boundary")
+  public SuccessResponse deleteEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Delete the timeBoundary in tableIdealState
+    IdealState idealState =
+        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
+          assert is != null;

Review Comment:
   Ack. Added ideal state not being null check.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
+          assert is != null;

Review Comment:
   Ack. Added ideal state != null precondition check



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);

Review Comment:
   Set to PRECONDITION_FAILED (412)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;

Review Comment:
   Ack



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);

Review Comment:
   Ack



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")

Review Comment:
   Ack



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,
+      boolean idealStateReffered) {
+    TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+    Long finalTimeBoundaryMs = null;
+    boolean isEnforced = false;
+    boolean validTimeBoundaryFound = false;
+
+    if (enforcedTimeBoundary != null) {
+      finalTimeBoundaryMs = enforcedTimeBoundary;
+      isEnforced = true;
+      validTimeBoundaryFound = true;
+      LOGGER.info("Enforced table time boundary in use: {} for table: {}", enforcedTimeBoundary, _offlineTableName);
+    } else if (idealStateReffered || !currentTimeBoundaryInfo.isEnforced()) {
+      if (maxEndTimeMs > 0) {
+        finalTimeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
+        validTimeBoundaryFound = true;
+      } else {
+        LOGGER.warn("Failed to find segment with valid end time for table: {}, no time boundary generated",
+            _offlineTableName);
+      }
+    } else {
+      validTimeBoundaryFound = true;
+      LOGGER.info("Skipping time boundary update since enforced time boundary exists");
+    }
+
+    if (validTimeBoundaryFound) {
+      if (finalTimeBoundaryMs != null) {
+        String timeBoundary = _timeFormatSpec.fromMillisToFormat(finalTimeBoundaryMs);
+        if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {

Review Comment:
   Ack



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,
+      boolean idealStateReffered) {
+    TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+    Long finalTimeBoundaryMs = null;
+    boolean isEnforced = false;
+    boolean validTimeBoundaryFound = false;
+
+    if (enforcedTimeBoundary != null) {
+      finalTimeBoundaryMs = enforcedTimeBoundary;
+      isEnforced = true;
+      validTimeBoundaryFound = true;
+      LOGGER.info("Enforced table time boundary in use: {} for table: {}", enforcedTimeBoundary, _offlineTableName);
+    } else if (idealStateReffered || !currentTimeBoundaryInfo.isEnforced()) {
+      if (maxEndTimeMs > 0) {
+        finalTimeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
+        validTimeBoundaryFound = true;
+      } else {
+        LOGGER.warn("Failed to find segment with valid end time for table: {}, no time boundary generated",
+            _offlineTableName);
+      }
+    } else {
+      validTimeBoundaryFound = true;
+      LOGGER.info("Skipping time boundary update since enforced time boundary exists");
+    }
+
+    if (validTimeBoundaryFound) {
+      if (finalTimeBoundaryMs != null) {

Review Comment:
   Good catch! Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r984958874


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -20,7 +20,6 @@
 
 import java.io.File;
 
-

Review Comment:
   (format) shouldn't remove



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")

Review Comment:
   ```suggestion
     @Path("tables/{tableName}/timeBoundary")
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
+          assert is != null;
+          is.getRecord().setSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY, timeBoundaryFinal);
+          return is;
+        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not update time boundary",
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+
+    return new SuccessResponse("Time boundary updated successfully to " + timeBoundaryFinal);
+  }
+
+  @DELETE
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Delete hybrid table query time boundary", notes = "Delete hybrid table query time boundary")
+  public SuccessResponse deleteEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(

Review Comment:
   We probably don't want to check the realtime table. I can see a corner case where user wants to remove the realtime table and keep only the offline table.
   
   We can check the offline table ideal state exist, and has the time boundary set



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +526,59 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/allSegmentsLoaded")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Validates if the ideal state matches with the segmentstate on this server", notes =
+      "Validates if the ideal state matches with the segmentstate on this server")
+  public TableSegmentValidationInfo validateTableSegmentState(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableNameWithType")
+          String tableNameWithType) {
+    // Get table current ideal state
+    IdealState tableIdealState = HelixHelper.getTableIdealState(_serverInstance.getHelixManager(), tableNameWithType);
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+
+    // Validate segments in idealstate which belong to this server
+    long maxEndTime = -1;
+    Map<String, Map<String, String>> instanceStatesMap = tableIdealState.getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> kv : instanceStatesMap.entrySet()) {
+      String segmentName = kv.getKey();
+      if (kv.getValue().containsKey(_instanceId)) {
+        // Segment hosted by this server. Validate segment state
+        SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+        try {
+          String segmentState = kv.getValue().get(_instanceId);
+
+          switch (segmentState) {
+            case CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING:
+              // Only validate presence of segment
+              if (segmentDataManager == null) {
+                return new TableSegmentValidationInfo(false, -1);
+              }
+              break;
+            case CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE:
+              // Validate segment CRC
+              SegmentZKMetadata zkMetadata =
+                  ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(),
+                      tableNameWithType, segmentName);

Review Comment:
   Consider throwing proper exception when `zkMetadata` is `null`, or it will get NPE here



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);

Review Comment:
   Let's create a helper method for validating the segment state. We may add a separate API for it which can be useful for debugging



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);

Review Comment:
   We should pick a different status



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,
+      boolean idealStateReffered) {
+    TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+    Long finalTimeBoundaryMs = null;
+    boolean isEnforced = false;
+    boolean validTimeBoundaryFound = false;
+
+    if (enforcedTimeBoundary != null) {

Review Comment:
   This part is quite complicated. It will be good if we can simplify the logic, or put some comments explaining the logic under each if branch



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,

Review Comment:
   (minor) Consider following the same convention of using -1 to represent the invalid value



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,
+      boolean idealStateReffered) {
+    TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+    Long finalTimeBoundaryMs = null;
+    boolean isEnforced = false;
+    boolean validTimeBoundaryFound = false;
+
+    if (enforcedTimeBoundary != null) {
+      finalTimeBoundaryMs = enforcedTimeBoundary;
+      isEnforced = true;
+      validTimeBoundaryFound = true;
+      LOGGER.info("Enforced table time boundary in use: {} for table: {}", enforcedTimeBoundary, _offlineTableName);
+    } else if (idealStateReffered || !currentTimeBoundaryInfo.isEnforced()) {
+      if (maxEndTimeMs > 0) {
+        finalTimeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
+        validTimeBoundaryFound = true;
+      } else {
+        LOGGER.warn("Failed to find segment with valid end time for table: {}, no time boundary generated",
+            _offlineTableName);
+      }
+    } else {
+      validTimeBoundaryFound = true;
+      LOGGER.info("Skipping time boundary update since enforced time boundary exists");
+    }
+
+    if (validTimeBoundaryFound) {
+      if (finalTimeBoundaryMs != null) {
+        String timeBoundary = _timeFormatSpec.fromMillisToFormat(finalTimeBoundaryMs);
+        if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {

Review Comment:
   Here we also want to check if the explicitly set flag is the same



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +526,59 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/allSegmentsLoaded")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Validates if the ideal state matches with the segmentstate on this server", notes =
+      "Validates if the ideal state matches with the segmentstate on this server")

Review Comment:
   (minor)
   ```suggestion
     @ApiOperation(value = "Validates if the ideal state matches with the segment state on this server", notes =
         "Validates if the ideal state matches with the segment state on this server")
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
+          assert is != null;
+          is.getRecord().setSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY, timeBoundaryFinal);
+          return is;
+        }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
+
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not update time boundary",
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+
+    return new SuccessResponse("Time boundary updated successfully to " + timeBoundaryFinal);
+  }
+
+  @DELETE
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Delete hybrid table query time boundary", notes = "Delete hybrid table query time boundary")
+  public SuccessResponse deleteEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Delete the timeBoundary in tableIdealState
+    IdealState idealState =
+        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
+          assert is != null;

Review Comment:
   We cannot make this assert because the input is can be `null`, and we should throw exception in that case.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -822,4 +821,8 @@ public static class Range {
       public static final String UPPER_UNBOUNDED = DELIMITER + UNBOUNDED + UPPER_EXCLUSIVE;
     }
   }
+
+  public static class IdealState {
+    public static final String QUERY_TIME_BOUNDARY = "HYBRID_TABLE_TIME_BOUNDARY";

Review Comment:
   ```suggestion
       public static final String HYBRID_TABLE_TIME_BOUNDARY = "HYBRID_TABLE_TIME_BOUNDARY";
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;
+    // Validate all responses
+    for (String response : serviceResponse._httpResponses.values()) {
+      TableSegmentValidationInfo tableSegmentValidationInfo =
+          JsonUtils.stringToObject(response, TableSegmentValidationInfo.class);
+      if (!tableSegmentValidationInfo.isValid()) {
+        throw new ControllerApplicationException(LOGGER, "Table segment validation failed",
+            Response.Status.INTERNAL_SERVER_ERROR);
+      }
+      timeBoundary = Math.max((timeBoundary == null) ? -1 : timeBoundary, tableSegmentValidationInfo.getMaxTimestamp());
+    }
+
+    if (timeBoundary == null) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    final String timeBoundaryFinal = String.valueOf(timeBoundary);
+
+    // Set the timeBoundary in tableIdealState
+    IdealState idealState =
+        HelixHelper.updateIdealState(_pinotHelixResourceManager.getHelixZkManager(), offlineTableName, is -> {
+          assert is != null;

Review Comment:
   We cannot make this assert because the input is can be `null`, and we should throw exception in that case.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,108 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/timeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the hybrid table (without type suffix)", required = true) @PathParam("tableName")
+          String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/allSegmentsLoaded";
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    if (serviceResponse._failedResponseCount > 0) {
+      throw new ControllerApplicationException(LOGGER, "Could not validate table segment status",
+          Response.Status.SERVICE_UNAVAILABLE);
+    }
+
+    Long timeBoundary = null;

Review Comment:
   ```suggestion
       long timeBoundaryMs = -1;
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,
+      boolean idealStateReffered) {
+    TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
+    Long finalTimeBoundaryMs = null;
+    boolean isEnforced = false;
+    boolean validTimeBoundaryFound = false;
+
+    if (enforcedTimeBoundary != null) {
+      finalTimeBoundaryMs = enforcedTimeBoundary;
+      isEnforced = true;
+      validTimeBoundaryFound = true;
+      LOGGER.info("Enforced table time boundary in use: {} for table: {}", enforcedTimeBoundary, _offlineTableName);
+    } else if (idealStateReffered || !currentTimeBoundaryInfo.isEnforced()) {
+      if (maxEndTimeMs > 0) {
+        finalTimeBoundaryMs = maxEndTimeMs - _timeOffsetMs;
+        validTimeBoundaryFound = true;
+      } else {
+        LOGGER.warn("Failed to find segment with valid end time for table: {}, no time boundary generated",
+            _offlineTableName);
+      }
+    } else {
+      validTimeBoundaryFound = true;
+      LOGGER.info("Skipping time boundary update since enforced time boundary exists");
+    }
+
+    if (validTimeBoundaryFound) {
+      if (finalTimeBoundaryMs != null) {

Review Comment:
   We always want to update the metrics. Currently when user refresh a segment, the metric won't be updated.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,

Review Comment:
   (minor) We don't need to synchronize on it because it can only be called in the synchronized method



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/TimeBoundaryInfo.java:
##########
@@ -21,10 +21,12 @@
 public class TimeBoundaryInfo {
   private final String _timeColumn;
   private final String _timeValue;
+  private final boolean _enforced;

Review Comment:
   (optional) `_explicitlySet` might be more clear



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -142,17 +153,42 @@ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Null
     return endTimeMs;
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
-    if (maxEndTimeMs > 0) {
-      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
-      TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
-        LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
+  private synchronized void updateTimeBoundaryInfo(Long enforcedTimeBoundary, long maxEndTimeMs,
+      boolean idealStateReffered) {

Review Comment:
   (minor)
   ```suggestion
         boolean idealStateReferred) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r984236830


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -105,6 +106,14 @@ public TimeBoundaryManager(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecor
   @SuppressWarnings("unused")
   public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
     // Bulk load time info for all online segments
+    String enforcedTimeBoundary = idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);

Review Comment:
   It is needed to correctly update the metric that tracks the difference b/w current time boundary and max(end time across all segments)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -167,6 +181,14 @@ private void updateTimeBoundaryInfo(long maxEndTimeMs) {
   @SuppressWarnings("unused")
   public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
       Set<String> onlineSegments) {
+    String enforcedTimeBoundary = idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);

Review Comment:
   It is needed to correctly update the metric that tracks the difference b/w current time boundary and max(end time across all segments)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r981066458


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -63,6 +63,7 @@ public class TimeBoundaryManager {
   private final Map<String, Long> _endTimeMsMap = new HashMap<>();
 
   private volatile TimeBoundaryInfo _timeBoundaryInfo;
+  private volatile TimeBoundaryInfo _enforcedTimeBoundaryInfo;

Review Comment:
   I figured some tricky edge cases with that which could lead to wrong time boundaries being used especially during cases like concurrent segment refreshes along with enforced time boundary being set / unset. This felt like the safest..
   We can discuss this offline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r981056267


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +529,47 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Validates if the ideal state matches with the segmentstate on this server", notes =
+      "Validates if the ideal state matches with the segmentstate on this server")
+  public TableSegmentValidationInfo validateTableIdealState(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableNameWithType")
+          String tableNameWithType) {
+    // Get table current ideal state
+    IdealState tableIdealState = HelixHelper.getTableIdealState(_helixManager, tableNameWithType);
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+
+    // Validate segments in idealstate which belong to this server
+    long maxEndTime = -1;
+    Map<String, Map<String, String>> instanceStatesMap = tableIdealState.getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> kv : instanceStatesMap.entrySet()) {
+      String segmentName = kv.getKey();
+      if (kv.getValue().containsKey(_instanceId)) {

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9356:
URL: https://github.com/apache/pinot/pull/9356#issuecomment-1250638391

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9356?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9356](https://codecov.io/gh/apache/pinot/pull/9356?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6a45644) into [master](https://codecov.io/gh/apache/pinot/commit/2d6665b8e5fa0842ef67b3d9896c5e04ecad78e9?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (2d6665b) will **decrease** coverage by `45.23%`.
   > The diff coverage is `8.33%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9356       +/-   ##
   =============================================
   - Coverage     69.73%   24.50%   -45.24%     
   + Complexity     4787       53     -4734     
   =============================================
     Files          1890     1879       -11     
     Lines        100756   100496      -260     
     Branches      15350    15328       -22     
   =============================================
   - Hits          70266    24625    -45641     
   - Misses        25507    73381    +47874     
   + Partials       4983     2490     -2493     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `24.50% <8.33%> (-0.21%)` | :arrow_down: |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9356?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../restlet/resources/TableSegmentValidationInfo.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vcmVzdGxldC9yZXNvdXJjZXMvVGFibGVTZWdtZW50VmFsaWRhdGlvbkluZm8uamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...oller/api/resources/PinotTableRestletResource.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90VGFibGVSZXN0bGV0UmVzb3VyY2UuamF2YQ==) | `23.34% <0.00%> (-40.08%)` | :arrow_down: |
   | [...che/pinot/server/api/resources/TablesResource.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zZXJ2ZXIvYXBpL3Jlc291cmNlcy9UYWJsZXNSZXNvdXJjZS5qYXZh) | `39.81% <0.00%> (-4.17%)` | :arrow_down: |
   | [...va/org/apache/pinot/spi/utils/CommonConstants.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQ29tbW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <0.00%> (-27.70%)` | :arrow_down: |
   | [...oker/routing/timeboundary/TimeBoundaryManager.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy90aW1lYm91bmRhcnkvVGltZUJvdW5kYXJ5TWFuYWdlci5qYXZh) | `60.82% <29.41%> (-30.54%)` | :arrow_down: |
   | [...inot/server/starter/helix/AdminApiApplication.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zZXJ2ZXIvc3RhcnRlci9oZWxpeC9BZG1pbkFwaUFwcGxpY2F0aW9uLmphdmE=) | `82.81% <100.00%> (+0.27%)` | :arrow_up: |
   | [.../pinot/server/starter/helix/BaseServerStarter.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc2VydmVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zZXJ2ZXIvc3RhcnRlci9oZWxpeC9CYXNlU2VydmVyU3RhcnRlci5qYXZh) | `49.62% <100.00%> (-6.74%)` | :arrow_down: |
   | [...in/java/org/apache/pinot/spi/utils/BytesUtils.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQnl0ZXNVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/trace/BaseRecording.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdHJhY2UvQmFzZVJlY29yZGluZy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/trace/NoOpRecording.java](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdHJhY2UvTm9PcFJlY29yZGluZy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1383 more](https://codecov.io/gh/apache/pinot/pull/9356/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r975920438


##########
pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java:
##########
@@ -112,7 +111,7 @@ private void testDailyPushTable(String rawTableName, TableConfig tableConfig, Ti
     Map<String, String> offlineInstanceStateMap = Collections.singletonMap("server", OFFLINE);
     Set<String> onlineSegments = new HashSet<>();
     // NOTE: Ideal state is not used in the current implementation.
-    IdealState idealState = mock(IdealState.class);
+    IdealState idealState = new IdealState("");

Review Comment:
   Do we need to change this?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -105,6 +106,14 @@ public TimeBoundaryManager(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecor
   @SuppressWarnings("unused")
   public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
     // Bulk load time info for all online segments
+    String enforcedTimeBoundary = idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);

Review Comment:
   When time boundary is explicitly set, we want to skip fetching the segment ZK metadata because they won't be used



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +529,47 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/validate")

Review Comment:
   Suggest more explicit API, such as `/allSegmentsLoaded`



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -731,4 +730,8 @@ public static class Range {
       public static final String UPPER_UNBOUNDED = DELIMITER + UNBOUNDED + UPPER_EXCLUSIVE;
     }
   }
+
+  public static class IdealState {
+    public static final String QUERY_TIME_BOUNDARY = "query.time.boundary";

Review Comment:
   (optional) I feel `TIME_BOUNDARY` is more concise. To be more specific, `HYBRID_TABLE_TIME_BOUNDARY` might also be good



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -63,6 +63,7 @@ public class TimeBoundaryManager {
   private final Map<String, Long> _endTimeMsMap = new HashMap<>();
 
   private volatile TimeBoundaryInfo _timeBoundaryInfo;
+  private volatile TimeBoundaryInfo _enforcedTimeBoundaryInfo;

Review Comment:
   (minor) We shouldn't need to keep them separate. We can directly update the `_timeBoundaryInfo` when the time is set in IS



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/enforceQueryTimeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName)
+      throws Exception {
+    // Validate its a hybrid table
+    if (!_pinotHelixResourceManager.hasRealtimeTable(tableName) || !_pinotHelixResourceManager.hasOfflineTable(
+        tableName)) {
+      throw new ControllerApplicationException(LOGGER, "Table isn't a hybrid table", Response.Status.BAD_REQUEST);
+    }
+
+    String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+
+    // Call all servers to validate offline table state
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(offlineTableName);
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/tables/" + offlineTableName + "/validate";

Review Comment:
   Let's make a separate API for the validate because it can also be useful for other purposes



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +529,47 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/validate")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Validates if the ideal state matches with the segmentstate on this server", notes =
+      "Validates if the ideal state matches with the segmentstate on this server")
+  public TableSegmentValidationInfo validateTableIdealState(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableNameWithType")
+          String tableNameWithType) {
+    // Get table current ideal state
+    IdealState tableIdealState = HelixHelper.getTableIdealState(_helixManager, tableNameWithType);
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+
+    // Validate segments in idealstate which belong to this server
+    long maxEndTime = -1;
+    Map<String, Map<String, String>> instanceStatesMap = tableIdealState.getRecord().getMapFields();
+    for (Map.Entry<String, Map<String, String>> kv : instanceStatesMap.entrySet()) {
+      String segmentName = kv.getKey();
+      if (kv.getValue().containsKey(_instanceId)) {

Review Comment:
   We want to check the state of the server in IS. For `ONLINE`, check the CRC; For `CONSUMING`, check if the segment exist; Skip other states.



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/AdminApiApplication.java:
##########
@@ -58,7 +59,7 @@ public class AdminApiApplication extends ResourceConfig {
   private HttpServer _httpServer;
 
   public AdminApiApplication(ServerInstance instance, AccessControlFactory accessControlFactory,
-      PinotConfiguration serverConf) {
+      PinotConfiguration serverConf, HelixManager helixManager) {

Review Comment:
   (minor) We can pass `HelixManager` through the `ServerInstance`. We already pass it in the `ServerInstance` constructor



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/enforceQueryTimeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName)

Review Comment:
   The input has to be raw table name, or the table validation will fail. Consider revising the parameter description to be more explicit



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/enforceQueryTimeBoundary")

Review Comment:
   Suggest simplifying it. Same for the DELETE API
   ```suggestion
     @Path("table/{tableName}/timeBoundary")
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -167,6 +181,14 @@ private void updateTimeBoundaryInfo(long maxEndTimeMs) {
   @SuppressWarnings("unused")
   public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
       Set<String> onlineSegments) {
+    String enforcedTimeBoundary = idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);

Review Comment:
   Same here. When the time boundary is explicitly set, we want to skip the `_endTimeMsMap` updates. Same for `refreshSegment()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r981051282


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -514,4 +529,47 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
     }
     return segmentConsumerInfoList;
   }
+
+  @GET
+  @Path("tables/{tableNameWithType}/validate")

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r984236830


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -105,6 +106,14 @@ public TimeBoundaryManager(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecor
   @SuppressWarnings("unused")
   public void init(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) {
     // Bulk load time info for all online segments
+    String enforcedTimeBoundary = idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);

Review Comment:
   It is needed to correctly update the metric shows the difference b/w current time boundary and max(end time across all segments)



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java:
##########
@@ -167,6 +181,14 @@ private void updateTimeBoundaryInfo(long maxEndTimeMs) {
   @SuppressWarnings("unused")
   public synchronized void onAssignmentChange(IdealState idealState, ExternalView externalView,
       Set<String> onlineSegments) {
+    String enforcedTimeBoundary = idealState.getRecord().getSimpleField(CommonConstants.IdealState.QUERY_TIME_BOUNDARY);

Review Comment:
   It is needed to correctly update the metric shows the difference b/w current time boundary and max(end time across all segments)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r981049195


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -843,6 +849,106 @@ public Map<String, Map<String, String>> getControllerJobs(
     return result;
   }
 
+  @POST
+  @Path("table/{tableName}/enforceQueryTimeBoundary")
+  @ApiOperation(value = "Set hybrid table query time boundary based on offline segments' metadata",
+      notes = "Set hybrid table query time boundary based on offline segments' metadata")
+  public SuccessResponse setEnforcedQueryTimeBoundary(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName)

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #9356: Allow setting custom time boundary for hybrid table queries

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #9356:
URL: https://github.com/apache/pinot/pull/9356#discussion_r981060963


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/AdminApiApplication.java:
##########
@@ -58,7 +59,7 @@ public class AdminApiApplication extends ResourceConfig {
   private HttpServer _httpServer;
 
   public AdminApiApplication(ServerInstance instance, AccessControlFactory accessControlFactory,
-      PinotConfiguration serverConf) {
+      PinotConfiguration serverConf, HelixManager helixManager) {

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org