You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "klsince (via GitHub)" <gi...@apache.org> on 2023/11/08 23:22:32 UTC

[PR] [Draft] Allow optional segments [pinot]

klsince opened a new pull request, #11978:
URL: https://github.com/apache/pinot/pull/11978

   This PR tries to address issue: https://github.com/apache/pinot/issues/11965, basically by extending the NewSegment concept added for instanceSelector early on. The NewSegment concept was added to handle the case when new consuming segments are added in IdealState but not online in ExternalView yet, as servers may take time to load new consuming segments and report the status back to Helix. Instead of reporting such new segments as unavailable segments, broker simply skips them.
   
   But simply skipping them at broker side could cause the wrong query results as detailed in the new issue. e.g. for upsert table, the server could start to ingest new records into the new segments and invalidate the records in existing segments, even before the broker could mark those new consuming segments as online, thus queries could not see the new records in new consuming segments during a short window. This issue could cause inconsistent query results, particularly for upsert table.
   
   So in this PR, we track those new segments that not online in ExternalView, and pass them to servers, instead of skipping them at broker side. The servers can skip them if they are not available, otherwise, process they for correct query results, particular for upsert table.
   
   TODO: only support for SingleConnectionBrokerRequestHandler in this PR. Probably add support for GrpcServerRequest (based on protobuf) and multi-stage engine in a separate PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1395012284


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -594,6 +594,10 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
       long routingStartTimeNs = System.nanoTime();
       Map<ServerInstance, List<String>> offlineRoutingTable = null;
       Map<ServerInstance, List<String>> realtimeRoutingTable = null;
+      Map<ServerInstance, List<String>> optionalOfflineRoutingTable = null;
+      Map<ServerInstance, List<String>> optionalRealtimeRoutingTable = null;
+      Set<ServerInstance> offlineServerInstances = null;
+      Set<ServerInstance> realtimeServerInstances = null;

Review Comment:
   It is much simpler if we change routing table to `Map<ServerInstance, Pair<List<String>, List<String>>` where the first list is mandatory segments and the second list is option segments. We may also change the `RoutingTable._serverInstanceToSegmentsMap` type



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406646223


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   Actually, to simplify this merging logic, I'll use empty list in Pair<List, List> even if there is no optional segments, but when to creating the InstanceRequest, I'll set optionalSegment to null if it's empty, as I found there are places in multi-staged query that don't set this field when creating InstanceRequest, so don't set this field (i.e. leave it as null) to be more backward compatible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1396329598


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java:
##########
@@ -594,6 +594,10 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
       long routingStartTimeNs = System.nanoTime();
       Map<ServerInstance, List<String>> offlineRoutingTable = null;
       Map<ServerInstance, List<String>> realtimeRoutingTable = null;
+      Map<ServerInstance, List<String>> optionalOfflineRoutingTable = null;
+      Map<ServerInstance, List<String>> optionalRealtimeRoutingTable = null;
+      Set<ServerInstance> offlineServerInstances = null;
+      Set<ServerInstance> realtimeServerInstances = null;

Review Comment:
   sgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1399779572


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -111,9 +112,9 @@ public TimeBoundaryInfo getTimeBoundary(
       @ApiResponse(code = 404, message = "Routing not found"),
       @ApiResponse(code = 500, message = "Internal server error")
   })
-  public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
+  public Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> getRoutingTable(

Review Comment:
   got it. I added new APIs `/v2/...` to include optional segments in return 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406646223


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   Actually, to simplify this merging logic, I'll use empty list in Pair<List, List> even if there is no optional segments, but when to creating the InstanceRequest, I'll set optionalSegment to null if it's empty, as I found there are places in multi-staged query that don't set this field when creating InstanceRequest. So leaving this field as null if empty is a bit more backward compatible



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406646223


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   To simplify this merging logic, I'll use empty list in Pair<List, List> even if there is no optional segments. But when creating the InstanceRequest, I'll set optionalSegment to null if it's empty, as I found there are places in multi-staged query that don't set this field when creating InstanceRequest. So leaving this field as null if it's empty is a bit more backward compatible, than setting it to an empty list.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406615196


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   sgtm
   
   well, to be effecient, I'd still need to build a second Map<String, List<String>> to track the list of optional segments on each server, so that I don't have to loop over the map from optional segment to server repetitively.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1399673377


##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java:
##########
@@ -113,7 +115,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
     String rawTableName = TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName());
     long scatterGatherStartTimeNs = System.nanoTime();
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable,
+        _queryRouter.submitQueryWithOptionalSegments(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable,

Review Comment:
   sgtm, just that I see submitQuery() got called in `spark-common` module, so will update over there too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1408554417


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -152,7 +189,40 @@ public Map<ServerInstance, List<String>> getRoutingTableForQuery(
       @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
       @Context HttpHeaders httpHeaders) {
     BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
+    checkAccessControl(brokerRequest, httpHeaders);
+    RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
+    if (routingTable != null) {
+      return removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap());
+    } else {
+      throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/debug/routingTableWithOptionalSegments/sql")
+  @ManualAuthorization
+  @ApiOperation(value = "Get the routing table for a query, including optional segments")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Routing table"),
+      @ApiResponse(code = 404, message = "Routing not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public Map<ServerInstance, Pair<List<String>, List<String>>> getRoutingTableForQueryWithOptionalSegments(
+      @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
+      @Context HttpHeaders httpHeaders) {
+    Map<ServerInstance, Pair<List<String>, List<String>>> result;

Review Comment:
   sharp eye



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406594529


##########
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java:
##########
@@ -195,16 +197,18 @@ void markQueryDone(long requestId) {
     _asyncQueryResponseMap.remove(requestId);
   }
 
-  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, List<String> segments) {
+  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest,
+      Pair<List<String>, List<String>> segments) {
     InstanceRequest instanceRequest = new InstanceRequest();
     instanceRequest.setRequestId(requestId);
     instanceRequest.setQuery(brokerRequest);
     Map<String, String> queryOptions = brokerRequest.getPinotQuery().getQueryOptions();
     if (queryOptions != null) {
       instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)));
     }
-    instanceRequest.setSearchSegments(segments);
+    instanceRequest.setSearchSegments(segments.getLeft());
     instanceRequest.setBrokerId(_brokerId);
+    instanceRequest.setOptionalSegments(segments.getRight());

Review Comment:
   I'll make contract as just using `null` if no optional segments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406646223


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   Actually, to simplify this merging logic, I'll define the contract of the Pair<List, List> as using empty list if there is no optional segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] [Draft] Allow optional segments [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11978:
URL: https://github.com/apache/pinot/pull/11978#issuecomment-1802940006

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11978](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (3af5143) into [master](https://app.codecov.io/gh/apache/pinot/commit/972b555cc5609a88002ac2c4501ece247b51cebe?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (972b555) will **decrease** coverage by `61.45%`.
   > The diff coverage is `0.00%`.
   
   > :exclamation: Current head 3af5143 differs from pull request most recent head 508cd71. Consider uploading reports for the commit 508cd71 to get more accurate results
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #11978       +/-   ##
   =============================================
   - Coverage     61.44%    0.00%   -61.45%     
   + Complexity     1141        6     -1135     
   =============================================
     Files          2385     2309       -76     
     Lines        129189   125481     -3708     
     Branches      19998    19444      -554     
   =============================================
   - Hits          79381        6    -79375     
   - Misses        44061   125475    +81414     
   + Partials       5747        0     -5747     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `<0.01% <0.00%> (ø)` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `<0.01% <0.00%> (ø)` | |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `<0.01% <0.00%> (ø)` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `<0.01% <0.00%> (-61.31%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `<0.01% <0.00%> (-61.43%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `<0.01% <0.00%> (-61.27%)` | :arrow_down: |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `<0.01% <0.00%> (-61.45%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/11978/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...requesthandler/MultiStageBrokerRequestHandler.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcmVxdWVzdGhhbmRsZXIvTXVsdGlTdGFnZUJyb2tlclJlcXVlc3RIYW5kbGVyLmphdmE=) | `0.00% <ø> (-17.69%)` | :arrow_down: |
   | [...thandler/SingleConnectionBrokerRequestHandler.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcmVxdWVzdGhhbmRsZXIvU2luZ2xlQ29ubmVjdGlvbkJyb2tlclJlcXVlc3RIYW5kbGVyLmphdmE=) | `0.00% <ø> (-17.73%)` | :arrow_down: |
   | [...t/ConcurrentMapPartitionUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQ29uY3VycmVudE1hcFBhcnRpdGlvblVwc2VydE1ldGFkYXRhTWFuYWdlci5qYXZh) | `0.00% <ø> (-85.42%)` | :arrow_down: |
   | [...roker/requesthandler/GrpcBrokerRequestHandler.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcmVxdWVzdGhhbmRsZXIvR3JwY0Jyb2tlclJlcXVlc3RIYW5kbGVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...roker/requesthandler/BaseBrokerRequestHandler.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcmVxdWVzdGhhbmRsZXIvQmFzZUJyb2tlclJlcXVlc3RIYW5kbGVyLmphdmE=) | `0.00% <0.00%> (-45.78%)` | :arrow_down: |
   | [...ceselector/StrictReplicaGroupInstanceSelector.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL1N0cmljdFJlcGxpY2FHcm91cEluc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `0.00% <0.00%> (-98.42%)` | :arrow_down: |
   | [...e/pinot/core/query/request/ServerQueryRequest.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9yZXF1ZXN0L1NlcnZlclF1ZXJ5UmVxdWVzdC5qYXZh) | `0.00% <0.00%> (-51.07%)` | :arrow_down: |
   | [...stanceselector/MultiStageReplicaGroupSelector.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL011bHRpU3RhZ2VSZXBsaWNhR3JvdXBTZWxlY3Rvci5qYXZh) | `0.00% <0.00%> (-66.67%)` | :arrow_down: |
   | [...core/query/executor/ServerQueryExecutorV1Impl.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9xdWVyeS9leGVjdXRvci9TZXJ2ZXJRdWVyeUV4ZWN1dG9yVjFJbXBsLmphdmE=) | `0.00% <0.00%> (-63.44%)` | :arrow_down: |
   | [...va/org/apache/pinot/core/routing/RoutingTable.java](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9yb3V0aW5nL1JvdXRpbmdUYWJsZS5qYXZh) | `0.00% <0.00%> (-87.50%)` | :arrow_down: |
   | ... and [6 more](https://app.codecov.io/gh/apache/pinot/pull/11978?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   ... and [1975 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11978/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: Codecov offers a browser extension for seamless coverage viewing on GitHub. Try it in [Chrome](https://chrome.google.com/webstore/detail/codecov/gedikamndpbemklijjkncpnolildpbgo) or [Firefox](https://addons.mozilla.org/en-US/firefox/addon/codecov/) today!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406615196


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   sgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince merged PR #11978:
URL: https://github.com/apache/pinot/pull/11978


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406615196


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   sgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1397913935


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -111,9 +112,9 @@ public TimeBoundaryInfo getTimeBoundary(
       @ApiResponse(code = 404, message = "Routing not found"),
       @ApiResponse(code = 500, message = "Internal server error")
   })
-  public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
+  public Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> getRoutingTable(

Review Comment:
   Let's not change these APIs. I believe they are used by Trino to get the routing tables, and this change will break them. We may add a TODO and introduce new APIs for the optional segments



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java:
##########
@@ -18,24 +18,46 @@
  */
 package org.apache.pinot.core.routing;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.transport.ServerInstance;
 
 
 public class RoutingTable {
-  private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  // Optional segments are those not online as in ExternalView but might have been online on servers already, e.g.
+  // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that
+  // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers
+  // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results.
+  private final Map<ServerInstance, Pair<List<String>/*non-optional segments*/, List<String>/*optional segments*/>>
+      _serverInstanceToSegmentsMap;
   private final List<String> _unavailableSegments;
   private final int _numPrunedSegments;
 
   public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments,
       int numPrunedSegments) {
-    _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
+    this(serverInstanceToSegmentsMap, Collections.emptyMap(), unavailableSegments, numPrunedSegments);
+  }
+
+  public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap,

Review Comment:
   Suggest keeping this special handling at the caller side. `RoutingTable` should be a thin wrapper class without any complex logic



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +610,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());

Review Comment:
   Merge it into one map here, and directly set it into the `RoutingTable`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java:
##########
@@ -113,7 +115,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
     String rawTableName = TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName());
     long scatterGatherStartTimeNs = System.nanoTime();
     AsyncQueryResponse asyncQueryResponse =
-        _queryRouter.submitQuery(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable,
+        _queryRouter.submitQueryWithOptionalSegments(requestId, rawTableName, offlineBrokerRequest, offlineRoutingTable,

Review Comment:
   Suggest directly modifying the `submitQuery()` without introducing a new method. QueryRouting is specifically built for this request handler



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java:
##########
@@ -430,8 +436,10 @@ public Set<String> getServingInstances() {
 
   /**
    * Selects the server instances for the given segments based on the request id and segment states. Returns a map
-   * from segment to selected server instance hosting the segment.
+   * from segment to selected server instance hosting the segment. The optional segments are used to get the new
+   * segments that is not online yet. Instead of simply skipping them by broker at routing time, we can let servers
+   * decide how to handle them.
    */
   abstract Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,

Review Comment:
   Return a `Pair` instead of passing in a `Map`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406600662


##########
pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java:
##########
@@ -18,27 +18,39 @@
  */
 package org.apache.pinot.core.routing;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.transport.ServerInstance;
 
 
 public class RoutingTable {
-  private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  // Optional segments are those not online as in ExternalView but might have been online on servers already, e.g.
+  // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that
+  // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers
+  // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results.
+  private final Map<ServerInstance, Pair<List<String>, List<String>/*optional segments*/>> _serverInstanceToSegmentsMap;
   private final List<String> _unavailableSegments;
   private final int _numPrunedSegments;
 
-  public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments,
-      int numPrunedSegments) {
+  public RoutingTable(Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap,

Review Comment:
   server side logic handles null, and I'll document that the contract is to use null for empty optional segment list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1406646223


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   Actually, to simplify this merging logic, I'll use empty list in Pair<List, List> even if there is no optional segments, but when to creating the InstanceRequest, I'll set optionalSegment to null if it's empty, as I found there are places in multi-staged query that don't set this field when creating InstanceRequest. So leaving this field as null if empty is a bit more backward compatible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1401383312


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -114,28 +116,55 @@ public TimeBoundaryInfo getTimeBoundary(
   public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName) {
     Map<String, Map<ServerInstance, List<String>>> result = new TreeMap<>();
+    getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType,
+        routingTable.getServerInstanceToSegmentsMap(false)));
+    if (!result.isEmpty()) {
+      return result;
+    } else {
+      throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND);
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/v2/debug/routingTable/{tableName}")

Review Comment:
   Not a big fan of `v2`
   ```suggestion
     @Path("/debug/routingTableWithOptionalSegments/{tableName}")
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -152,7 +181,40 @@ public Map<ServerInstance, List<String>> getRoutingTableForQuery(
       @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
       @Context HttpHeaders httpHeaders) {
     BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
+    checkAccessControl(brokerRequest, httpHeaders);
+    RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
+    if (routingTable != null) {
+      return routingTable.getServerInstanceToSegmentsMap(false);
+    } else {
+      throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/v2/debug/routingTable/sql")

Review Comment:
   Same here
   ```suggestion
     @Path("/debug/routingTableWithOptionalSegments/sql")
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java:
##########
@@ -18,27 +18,39 @@
  */
 package org.apache.pinot.core.routing;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.transport.ServerInstance;
 
 
 public class RoutingTable {
-  private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  // Optional segments are those not online as in ExternalView but might have been online on servers already, e.g.
+  // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that
+  // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers
+  // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results.
+  private final Map<ServerInstance, Pair<List<String>, List<String>/*optional segments*/>> _serverInstanceToSegmentsMap;
   private final List<String> _unavailableSegments;
   private final int _numPrunedSegments;
 
-  public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments,
-      int numPrunedSegments) {
+  public RoutingTable(Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap,
+      List<String> unavailableSegments, int numPrunedSegments) {
     _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
     _unavailableSegments = unavailableSegments;
     _numPrunedSegments = numPrunedSegments;
   }
 
-  public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
+  public Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap() {
     return _serverInstanceToSegmentsMap;
   }
 
+  public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap(boolean optionalSegments) {

Review Comment:
   Let's not handle this within `RoutingTable`. We can handle it in `PinotBrokerDebug`



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java:
##########
@@ -54,9 +55,10 @@ public BalancedInstanceSelector(String tableNameWithType, ZkHelixPropertyStore<Z
   }
 
   @Override
-  Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
-      Map<String, String> queryOptions) {
+  Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId,
+      SegmentStates segmentStates, Map<String, String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+    Map<String, String> optionalSegmentToInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));

Review Comment:
   We don't need to initialize a big map here because most segments shouldn't be optional. Default capacity should be fine. Same for other selectors



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java:
##########
@@ -195,16 +197,18 @@ void markQueryDone(long requestId) {
     _asyncQueryResponseMap.remove(requestId);
   }
 
-  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, List<String> segments) {
+  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest,
+      Pair<List<String>, List<String>> segments) {
     InstanceRequest instanceRequest = new InstanceRequest();
     instanceRequest.setRequestId(requestId);
     instanceRequest.setQuery(brokerRequest);
     Map<String, String> queryOptions = brokerRequest.getPinotQuery().getQueryOptions();
     if (queryOptions != null) {
       instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)));
     }
-    instanceRequest.setSearchSegments(segments);
+    instanceRequest.setSearchSegments(segments.getLeft());
     instanceRequest.setBrokerId(_brokerId);
+    instanceRequest.setOptionalSegments(segments.getRight());

Review Comment:
   What is the contract here when there is no optional segments? Should it be an empty list or `null`? Let's fix the contract and make sure it is properly applied



##########
pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java:
##########
@@ -18,27 +18,39 @@
  */
 package org.apache.pinot.core.routing;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.transport.ServerInstance;
 
 
 public class RoutingTable {
-  private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  // Optional segments are those not online as in ExternalView but might have been online on servers already, e.g.
+  // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that
+  // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers
+  // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results.
+  private final Map<ServerInstance, Pair<List<String>, List<String>/*optional segments*/>> _serverInstanceToSegmentsMap;
   private final List<String> _unavailableSegments;
   private final int _numPrunedSegments;
 
-  public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments,
-      int numPrunedSegments) {
+  public RoutingTable(Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap,

Review Comment:
   By reading the code, seems the optional segments are either non empty list or `null`. Let's document this contract.
   Does the current code handle the case of empty list properly?



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,16 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getSegmentToInstanceMap());
+    Map<ServerInstance, List<String>> serverInstanceToOptionalSegmentsMap =
+        getServerInstanceToSegmentsMap(tableNameWithType, selectionResult.getOptionalSegmentToInstanceMap());
+    return new RoutingTable(merge(serverInstanceToSegmentsMap, serverInstanceToOptionalSegmentsMap),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());

Review Comment:
   Can we merge these 3 methods into one (i.e. process mandatory and optional together)?
   IMO that has 2 benefits:
   - Logic more clear
   - Reduce overhead, only need to maintain one map instead of 3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1408529178


##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java:
##########
@@ -152,7 +189,40 @@ public Map<ServerInstance, List<String>> getRoutingTableForQuery(
       @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
       @Context HttpHeaders httpHeaders) {
     BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
+    checkAccessControl(brokerRequest, httpHeaders);
+    RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
+    if (routingTable != null) {
+      return removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap());
+    } else {
+      throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/debug/routingTableWithOptionalSegments/sql")
+  @ManualAuthorization
+  @ApiOperation(value = "Get the routing table for a query, including optional segments")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Routing table"),
+      @ApiResponse(code = 404, message = "Routing not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public Map<ServerInstance, Pair<List<String>, List<String>>> getRoutingTableForQueryWithOptionalSegments(
+      @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
+      @Context HttpHeaders httpHeaders) {
+    Map<ServerInstance, Pair<List<String>, List<String>>> result;

Review Comment:
   (minor) Remove



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -610,7 +611,42 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
+    return new RoutingTable(getServerInstanceToSegmentsMap(tableNameWithType, selectionResult),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());
+  }
+
+  private Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap(String tableNameWithType,
+      InstanceSelector.SelectionResult selectionResult) {
+    Map<ServerInstance, Pair<List<String>, List<String>>> merged = new HashMap<>();
+    for (Map.Entry<String, String> entry : selectionResult.getSegmentToInstanceMap().entrySet()) {
+      ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
+      if (serverInstance != null) {
+        Pair<List<String>, List<String>> pair =
+            merged.computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), new ArrayList<>()));
+        pair.getLeft().add(entry.getKey());
+      } else {
+        // Should not happen in normal case unless encountered unexpected exception when updating routing entries
+        _brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L);
+      }
+    }
+    for (Map.Entry<String, String> entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) {
+      ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
+      if (serverInstance != null) {
+        Pair<List<String>, List<String>> pair = merged.get(serverInstance);
+        // Skip servers that don't have non-optional segments, so that servers always get some non-optional segments
+        // to process, to be backward compatible.
+        // TODO: allow servers only with optional segments
+        if (pair != null) {
+          pair.getRight().add(entry.getKey());
+        }
+      }
+      // TODO: Report missing server metrics when we allow servers only with optional segments.
+    }
+    return merged;
+  }
+
+  private Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap(String tableNameWithType,

Review Comment:
   This can also be removed



##########
pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java:
##########
@@ -195,16 +198,23 @@ void markQueryDone(long requestId) {
     _asyncQueryResponseMap.remove(requestId);
   }
 
-  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, List<String> segments) {
+  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest,
+      Pair<List<String>, List<String>> segments) {
     InstanceRequest instanceRequest = new InstanceRequest();
     instanceRequest.setRequestId(requestId);
     instanceRequest.setQuery(brokerRequest);
     Map<String, String> queryOptions = brokerRequest.getPinotQuery().getQueryOptions();
     if (queryOptions != null) {
       instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)));
     }
-    instanceRequest.setSearchSegments(segments);
+    instanceRequest.setSearchSegments(segments.getLeft());
     instanceRequest.setBrokerId(_brokerId);
+    if (CollectionUtils.isNotEmpty(segments.getRight())) {
+      // Don't set this field, i.e. leave it as null, if there is no optional segment at all, to be more backward
+      // compatible, as there are places like in multi-staged query engine where this field is not set today when

Review Comment:
   ```suggestion
         // compatible, as there are places like in multi-stage query engine where this field is not set today when
   ```



##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -621,8 +657,22 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
         _brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L);
       }
     }
-    return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments(),
-        selectionResult.getNumPrunedSegments());
+    return serverInstanceToSegmentsMap;
+  }
+
+  private static Map<ServerInstance, Pair<List<String>, List<String>>> merge(

Review Comment:
   Remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Allow optional segments that can be skipped by servers without failing the query [pinot]

Posted by "klsince (via GitHub)" <gi...@apache.org>.
klsince commented on code in PR #11978:
URL: https://github.com/apache/pinot/pull/11978#discussion_r1408548323


##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -621,8 +657,22 @@ public RoutingTable getRoutingTable(BrokerRequest brokerRequest, long requestId)
         _brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L);
       }
     }
-    return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments(),
-        selectionResult.getNumPrunedSegments());
+    return serverInstanceToSegmentsMap;
+  }
+
+  private static Map<ServerInstance, Pair<List<String>, List<String>>> merge(

Review Comment:
   :face_palm:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org