You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Aravind-Suresh (via GitHub)" <gi...@apache.org> on 2023/07/26 11:07:32 UTC

[GitHub] [pinot] Aravind-Suresh opened a new pull request, #11183: Reload Table API to reload table data

Aravind-Suresh opened a new pull request, #11183:
URL: https://github.com/apache/pinot/pull/11183

   Fixes #10916.
   
   **Approach**: This PR adds a new Reload Table API to reload table configurations. More details can be found in #10916.
   
   **Test plan**: TODO (will update the draft PR in a while).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281376726


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -696,44 +696,9 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
       totalSegments += entry.getValue().size();
     }
     serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
-    serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
-    serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
-
-    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
-      String responseString = streamResponse.getValue();
-      try {
-        ServerReloadControllerJobStatusResponse response =
-            JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class);
-        serverReloadControllerJobStatusResponse.setSuccessCount(
-            serverReloadControllerJobStatusResponse.getSuccessCount() + response.getSuccessCount());
-      } catch (Exception e) {
-        serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-            serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1
-        );
-      }
-    }
-
-    // Add ZK fields
-    serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
-
-    // Add derived fields
-    long submissionTime =
-        Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    double timeElapsedInMinutes = ((double) System.currentTimeMillis() - (double) submissionTime) / (1000.0 * 60.0);
-    int remainingSegments = serverReloadControllerJobStatusResponse.getTotalSegmentCount()
-        - serverReloadControllerJobStatusResponse.getSuccessCount();
-
-    double estimatedRemainingTimeInMinutes = -1;
-    if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
-      estimatedRemainingTimeInMinutes =
-          ((double) remainingSegments / (double) serverReloadControllerJobStatusResponse.getSuccessCount())
-              * timeElapsedInMinutes;
-    }
-
-    serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
-    serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
-
-    return serverReloadControllerJobStatusResponse;
+    serverReloadControllerJobStatusResponse.setTotalCount(totalSegments);
+    return ResourceUtils.buildServerReloadControllerJobStatusResponse(controllerJobZKMetadata,

Review Comment:
   I was thinking of deriving percentage by the number of servers that have completed the reload / total number of servers hosting that table. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1300782094


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -509,6 +544,44 @@ public void addOrReplaceSegment(String tableNameWithType, String segmentName)
     }
   }
 
+  @Override
+  public void reloadTable(String tableNameWithType, boolean force, SegmentRefreshSemaphore segmentRefreshSemaphore) {
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    AtomicReference<TableDataManager> toBeShutdown = new AtomicReference<>();
+    _tableDataManagerMap.computeIfPresent(tableNameWithType, (k, tdm) -> {
+      // create only if a table data manager doesn't exist
+      // or the current configuration and the new configuration are different
+      if (!force && tdm.getTableDataManagerConfig().getTableConfig().equals(tableConfig)) {
+        LOGGER.info("Not reloading table as the table data manager config is not changed");
+        return tdm;
+      }
+      // this will load all the segments and start a new consuming segment
+      // for the time being we will be consuming 2X the memory here
+      // once all the segments are loaded we swap the new table data manager with the old one
+      LOGGER.info("Recreating table data manager for table: {} as we received a reload table request",
+          tableNameWithType);
+      TableDataManager newTdm = createTableDataManager(k, tableConfig);
+      List<SegmentDataManager> segments = tdm.acquireAllSegments();
+      try {
+        snapshotAndAddSegments(newTdm, tableNameWithType);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      for (SegmentDataManager sdm : segments) {
+        tdm.releaseSegment(sdm);
+      }
+      LOGGER.info("All segments loaded for table: {}", tableNameWithType);
+      toBeShutdown.set(tdm);
+      return newTdm;
+    });
+    // shutting the old table data manager
+    TableDataManager oldTdm = toBeShutdown.get();

Review Comment:
   True - added a shutdown with a delay for now. We can accurately track and do an auto-shutdown based on reference counting later (in a follow-up PR).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281375112


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -104,6 +110,70 @@ public Response pauseConsumption(
     }
   }
 
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/reload")
+  @ApiOperation(value = "Reloads the table across all the servers", notes = "This would reconstruct the table data"
+      + "manager in case of configuration changes. Example usage: trigger after converting the upsert mode"
+      + "from full to partial.")
+  public SuccessResponse reloadTable(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") @Nullable
+      String tableName,
+      @ApiParam(value = "Whether to force the reload (if table config is not updated, reload will not be done)")
+      @QueryParam("force") boolean force,
+      @Context HttpHeaders httpHeaders, @Context Request request) {
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    // checking if another reload job exists
+    // if so, fail the request with the ongoing reload job's id
+    Map<String, Map<String, String>> reloadJobs =
+        _pinotHelixResourceManager.getAllJobsForTable(tableNameWithType,
+            Collections.singleton(ControllerJobType.RELOAD_TABLE));
+    if (reloadJobs.size() > 0) {
+      String jobId = reloadJobs.keySet().iterator().next();

Review Comment:
   Removed it - thought of avoiding multiple reload jobs at the same time. But looks like it's not possible at the moment because these controller jobs do not have state associated and as you said this gives all the historical + current jobs. So, removed this validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Reload Table API to reload table data [pinot]

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana closed pull request #11183: Reload Table API to reload table data
URL: https://github.com/apache/pinot/pull/11183


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281411894


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -242,6 +242,30 @@ public void addRealtimeSegment(String realtimeTableName, String segmentName)
     LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName);
   }
 
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because

Review Comment:
   Oh, didn't notice that this is within the `computeIfPresent()`. So the race condition is not there, but it will block all the queries at the same time. In order to solve this, we still need to use lock so that queries can still acquire the old table data manager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #11183: Reload Table API to reload table data

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11183:
URL: https://github.com/apache/pinot/pull/11183#issuecomment-1659591542

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11183](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (76ed7f6) into [master](https://app.codecov.io/gh/apache/pinot/commit/98e482c91139a00a4d24b5a6350e98945461998f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (98e482c) will **increase** coverage by `0.00%`.
   > Report is 14 commits behind head on master.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff            @@
   ##           master   #11183     +/-   ##
   =========================================
     Coverage    0.11%    0.11%             
   =========================================
     Files        2226     2175     -51     
     Lines      119357   117332   -2025     
     Branches    18059    17844    -215     
   =========================================
     Hits          137      137             
   + Misses     119200   117175   -2025     
     Partials       20       20             
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1temurin11 | `?` | |
   | integration1temurin17 | `?` | |
   | integration1temurin20 | `?` | |
   | unittests1temurin11 | `?` | |
   | unittests1temurin20 | `?` | |
   | unittests2temurin11 | `?` | |
   | unittests2temurin17 | `0.11% <0.00%> (-0.01%)` | :arrow_down: |
   | unittests2temurin20 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files Changed](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...ache/pinot/common/messages/TableReloadMessage.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWVzc2FnZXMvVGFibGVSZWxvYWRNZXNzYWdlLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...mmon/metadata/controllerjob/ControllerJobType.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvY29udHJvbGxlcmpvYi9Db250cm9sbGVySm9iVHlwZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...apache/pinot/controller/BaseControllerStarter.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9CYXNlQ29udHJvbGxlclN0YXJ0ZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...ller/api/resources/PinotRealtimeTableResource.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90UmVhbHRpbWVUYWJsZVJlc291cmNlLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...ler/api/resources/PinotSegmentRestletResource.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90U2VnbWVudFJlc3RsZXRSZXNvdXJjZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../pinot/controller/api/resources/ResourceUtils.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Jlc291cmNlVXRpbHMuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...urces/ServerReloadControllerJobStatusResponse.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1NlcnZlclJlbG9hZENvbnRyb2xsZXJKb2JTdGF0dXNSZXNwb25zZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ntroller/helix/core/PinotHelixResourceManager.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL1Bpbm90SGVsaXhSZXNvdXJjZU1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../core/realtime/PinotLLCRealtimeSegmentManager.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL3JlYWx0aW1lL1Bpbm90TExDUmVhbHRpbWVTZWdtZW50TWFuYWdlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../pinot/core/data/manager/BaseTableDataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvQmFzZVRhYmxlRGF0YU1hbmFnZXIuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | ... and [4 more](https://app.codecov.io/gh/apache/pinot/pull/11183?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   ... and [88 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11183/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Reload Table API to reload table data [pinot]

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on PR #11183:
URL: https://github.com/apache/pinot/pull/11183#issuecomment-1876246952

   Closing this since Aravind has left Uber and is unable to work on this. I have created a new PR which is the same as this PR but rebased on latest master: #12216


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281374635


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -104,6 +110,70 @@ public Response pauseConsumption(
     }
   }
 
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/reload")
+  @ApiOperation(value = "Reloads the table across all the servers", notes = "This would reconstruct the table data"
+      + "manager in case of configuration changes. Example usage: trigger after converting the upsert mode"
+      + "from full to partial.")
+  public SuccessResponse reloadTable(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") @Nullable
+      String tableName,
+      @ApiParam(value = "Whether to force the reload (if table config is not updated, reload will not be done)")
+      @QueryParam("force") boolean force,
+      @Context HttpHeaders httpHeaders, @Context Request request) {
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);

Review Comment:
   Good point, updated this API to have an optional tableTypeStr param.
   
   If it's not passed - both the offline and realtime versions (whichever is available) of the table would be reloaded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281278417


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -242,6 +242,30 @@ public void addRealtimeSegment(String realtimeTableName, String segmentName)
     LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName);
   }
 
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because

Review Comment:
   That particular race condition is avoided because this logic is put inside _tableDataManagerMap.compute method which ends up locking the corresponding hashNode until the logic completes.
   
   Have seen similar patterns in other functions of the code -- so I maintained the same.
   
   I'll try to clarify it in an in-code comment -- let me know if I'm missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1300760260


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -242,6 +242,30 @@ public void addRealtimeSegment(String realtimeTableName, String segmentName)
     LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName);
   }
 
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because

Review Comment:
   The `getTableDataManager` method uses `get` which doesn't lock the underlying hash-node. The add/delete/reload methods do a compute* on the hashmap which would invoke lock. So this locking wouldn't happen - the only case that remains is what you called out below about existing queries in the old table data manager. So I'll add a delayed shutdown there.
   
   Let me know if I'm missing something.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281277544


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -170,11 +172,13 @@ public class PinotLLCRealtimeSegmentManager {
   private final int _deepstoreUploadRetryTimeoutMs;
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
 
   private volatile boolean _isStopping = false;
 
   public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      ControllerMetrics controllerMetrics, Executor executor, HttpConnectionManager connectionManager) {

Review Comment:
   Good catch! Made this change while trying an alternate way of implementing reloadTable -- will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281409133


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java:
##########
@@ -20,15 +20,25 @@
 
 import java.util.Map;
 
+
 public class ServerReloadControllerJobStatusResponse {
   private double _timeElapsedInMinutes;
   private double _estimatedTimeRemainingInMinutes;
   private int _totalSegmentCount;
+  private int _totalCount;

Review Comment:
   Since the target is table, which is always single, I don't think this response should be reused. If we want to reuse this, then we need to track the segments already loaded in the new table data manager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281409809


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -696,44 +696,9 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
       totalSegments += entry.getValue().size();
     }
     serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
-    serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
-    serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
-
-    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
-      String responseString = streamResponse.getValue();
-      try {
-        ServerReloadControllerJobStatusResponse response =
-            JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class);
-        serverReloadControllerJobStatusResponse.setSuccessCount(
-            serverReloadControllerJobStatusResponse.getSuccessCount() + response.getSuccessCount());
-      } catch (Exception e) {
-        serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-            serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1
-        );
-      }
-    }
-
-    // Add ZK fields
-    serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
-
-    // Add derived fields
-    long submissionTime =
-        Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    double timeElapsedInMinutes = ((double) System.currentTimeMillis() - (double) submissionTime) / (1000.0 * 60.0);
-    int remainingSegments = serverReloadControllerJobStatusResponse.getTotalSegmentCount()
-        - serverReloadControllerJobStatusResponse.getSuccessCount();
-
-    double estimatedRemainingTimeInMinutes = -1;
-    if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
-      estimatedRemainingTimeInMinutes =
-          ((double) remainingSegments / (double) serverReloadControllerJobStatusResponse.getSuccessCount())
-              * timeElapsedInMinutes;
-    }
-
-    serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
-    serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
-
-    return serverReloadControllerJobStatusResponse;
+    serverReloadControllerJobStatusResponse.setTotalCount(totalSegments);
+    return ResourceUtils.buildServerReloadControllerJobStatusResponse(controllerJobZKMetadata,

Review Comment:
   I don't think that is accurate though. All servers will more or less complete at the same time if the segments are evenly distributed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1300781779


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java:
##########
@@ -438,4 +439,49 @@ public int getReplication() {
     }
     return replication;
   }
+
+  @Override
+  public boolean equals(Object o) {

Review Comment:
   Yes, this is not required if we use the ZNode version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281223068


##########
pinot-common/src/main/java/org/apache/pinot/common/messages/TableReloadMessage.java:
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.common.messages;
+
+import java.util.UUID;
+import org.apache.helix.model.Message;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+
+/**
+ * This (Helix) message is sent from the controller to brokers when a request is received to reload the table.
+ *
+ * NOTE: We keep the table name as a separate key instead of using the Helix PARTITION_NAME so that this message can be
+ *       used for any resource.
+ */
+public class TableReloadMessage extends Message {
+    public static final String RELOAD_TABLE_MSG_SUB_TYPE = "RELOAD_TABLE";

Review Comment:
   (format) Please reformat all the changes with [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide)



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/controllerjob/ControllerJobType.java:
##########
@@ -19,5 +19,5 @@
 package org.apache.pinot.common.metadata.controllerjob;
 
 public enum ControllerJobType {
-  RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE
+  RELOAD_SEGMENT, FORCE_COMMIT, TABLE_REBALANCE, RELOAD_TABLE

Review Comment:
   (minor) Move it following `RELOAD_SEGMENT`



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -242,6 +242,30 @@ public void addRealtimeSegment(String realtimeTableName, String segmentName)
     LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName);
   }
 
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because

Review Comment:
   This one has race condition because when we create the new table data manager, there can be other operations applied to the old table data manager (add/remove/replace segment) which are not reflected in the new table data manager. We need to lock up the old table data manager until it is fully replaced



##########
pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java:
##########
@@ -420,10 +420,14 @@ private void setUpPinotController() {
         new PinotHelixTaskResourceManager(_helixResourceManager, new TaskDriver(_helixParticipantManager),
             _config.getPinotTaskExpireTimeInMs());
 
+    _connectionManager = new MultiThreadedHttpConnectionManager();
+    _connectionManager.getParams().setConnectionTimeout(_config.getServerAdminRequestTimeoutSeconds() * 1000);
+
     // Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
     LOGGER.info("Starting realtime segment manager");
     _pinotLLCRealtimeSegmentManager =
-        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
+        new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics, _executorService,

Review Comment:
   Revert the changes in this file



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -170,11 +172,13 @@ public class PinotLLCRealtimeSegmentManager {
   private final int _deepstoreUploadRetryTimeoutMs;
   private final FileUploadDownloadClient _fileUploadDownloadClient;
   private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
+  private final Executor _executor;
+  private final HttpConnectionManager _connectionManager;
 
   private volatile boolean _isStopping = false;
 
   public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager helixResourceManager, ControllerConf controllerConf,
-      ControllerMetrics controllerMetrics) {
+      ControllerMetrics controllerMetrics, Executor executor, HttpConnectionManager connectionManager) {

Review Comment:
   These 2 fields are not used. Please revert the related changes



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -104,6 +110,70 @@ public Response pauseConsumption(
     }
   }
 
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/reload")
+  @ApiOperation(value = "Reloads the table across all the servers", notes = "This would reconstruct the table data"
+      + "manager in case of configuration changes. Example usage: trigger after converting the upsert mode"
+      + "from full to partial.")
+  public SuccessResponse reloadTable(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") @Nullable
+      String tableName,
+      @ApiParam(value = "Whether to force the reload (if table config is not updated, reload will not be done)")
+      @QueryParam("force") boolean force,
+      @Context HttpHeaders httpHeaders, @Context Request request) {
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);

Review Comment:
   This should apply to both offline and real-time table



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -104,6 +110,70 @@ public Response pauseConsumption(
     }
   }
 
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/reload")
+  @ApiOperation(value = "Reloads the table across all the servers", notes = "This would reconstruct the table data"
+      + "manager in case of configuration changes. Example usage: trigger after converting the upsert mode"
+      + "from full to partial.")
+  public SuccessResponse reloadTable(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") @Nullable
+      String tableName,
+      @ApiParam(value = "Whether to force the reload (if table config is not updated, reload will not be done)")
+      @QueryParam("force") boolean force,
+      @Context HttpHeaders httpHeaders, @Context Request request) {
+    String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    // checking if another reload job exists
+    // if so, fail the request with the ongoing reload job's id
+    Map<String, Map<String, String>> reloadJobs =
+        _pinotHelixResourceManager.getAllJobsForTable(tableNameWithType,
+            Collections.singleton(ControllerJobType.RELOAD_TABLE));
+    if (reloadJobs.size() > 0) {
+      String jobId = reloadJobs.keySet().iterator().next();

Review Comment:
   This doesn't seem correct. We should still all table reload even if the table was reloaded before. We track both the current jobs and the old finished jobs



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java:
##########
@@ -20,15 +20,25 @@
 
 import java.util.Map;
 
+
 public class ServerReloadControllerJobStatusResponse {
   private double _timeElapsedInMinutes;
   private double _estimatedTimeRemainingInMinutes;
   private int _totalSegmentCount;
+  private int _totalCount;

Review Comment:
   What does total count stands for? Seems you use it for both segment count and server count, but we already have stats for both of them



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -104,6 +110,70 @@ public Response pauseConsumption(
     }
   }
 
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)

Review Comment:
   Add `@Authorize` annotation to these APIs



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java:
##########
@@ -438,4 +439,49 @@ public int getReplication() {
     }
     return replication;
   }
+
+  @Override
+  public boolean equals(Object o) {

Review Comment:
   No need to override them



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -696,44 +696,9 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
       totalSegments += entry.getValue().size();
     }
     serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
-    serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
-    serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
-
-    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
-      String responseString = streamResponse.getValue();
-      try {
-        ServerReloadControllerJobStatusResponse response =
-            JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class);
-        serverReloadControllerJobStatusResponse.setSuccessCount(
-            serverReloadControllerJobStatusResponse.getSuccessCount() + response.getSuccessCount());
-      } catch (Exception e) {
-        serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-            serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1
-        );
-      }
-    }
-
-    // Add ZK fields
-    serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
-
-    // Add derived fields
-    long submissionTime =
-        Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    double timeElapsedInMinutes = ((double) System.currentTimeMillis() - (double) submissionTime) / (1000.0 * 60.0);
-    int remainingSegments = serverReloadControllerJobStatusResponse.getTotalSegmentCount()
-        - serverReloadControllerJobStatusResponse.getSuccessCount();
-
-    double estimatedRemainingTimeInMinutes = -1;
-    if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
-      estimatedRemainingTimeInMinutes =
-          ((double) remainingSegments / (double) serverReloadControllerJobStatusResponse.getSuccessCount())
-              * timeElapsedInMinutes;
-    }
-
-    serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
-    serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
-
-    return serverReloadControllerJobStatusResponse;
+    serverReloadControllerJobStatusResponse.setTotalCount(totalSegments);
+    return ResourceUtils.buildServerReloadControllerJobStatusResponse(controllerJobZKMetadata,

Review Comment:
   Can this logic be shared? For table reload, there is a single target, and I don't see the point of tracking the percentage



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -509,6 +544,44 @@ public void addOrReplaceSegment(String tableNameWithType, String segmentName)
     }
   }
 
+  @Override
+  public void reloadTable(String tableNameWithType, boolean force, SegmentRefreshSemaphore segmentRefreshSemaphore) {
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    AtomicReference<TableDataManager> toBeShutdown = new AtomicReference<>();
+    _tableDataManagerMap.computeIfPresent(tableNameWithType, (k, tdm) -> {
+      // create only if a table data manager doesn't exist
+      // or the current configuration and the new configuration are different
+      if (!force && tdm.getTableDataManagerConfig().getTableConfig().equals(tableConfig)) {

Review Comment:
   We can compare the ZNode version of the table config to identify if it is changed



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -509,6 +544,44 @@ public void addOrReplaceSegment(String tableNameWithType, String segmentName)
     }
   }
 
+  @Override
+  public void reloadTable(String tableNameWithType, boolean force, SegmentRefreshSemaphore segmentRefreshSemaphore) {
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+    AtomicReference<TableDataManager> toBeShutdown = new AtomicReference<>();
+    _tableDataManagerMap.computeIfPresent(tableNameWithType, (k, tdm) -> {
+      // create only if a table data manager doesn't exist
+      // or the current configuration and the new configuration are different
+      if (!force && tdm.getTableDataManagerConfig().getTableConfig().equals(tableConfig)) {
+        LOGGER.info("Not reloading table as the table data manager config is not changed");
+        return tdm;
+      }
+      // this will load all the segments and start a new consuming segment
+      // for the time being we will be consuming 2X the memory here
+      // once all the segments are loaded we swap the new table data manager with the old one
+      LOGGER.info("Recreating table data manager for table: {} as we received a reload table request",
+          tableNameWithType);
+      TableDataManager newTdm = createTableDataManager(k, tableConfig);
+      List<SegmentDataManager> segments = tdm.acquireAllSegments();
+      try {
+        snapshotAndAddSegments(newTdm, tableNameWithType);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      for (SegmentDataManager sdm : segments) {
+        tdm.releaseSegment(sdm);
+      }
+      LOGGER.info("All segments loaded for table: {}", tableNameWithType);
+      toBeShutdown.set(tdm);
+      return newTdm;
+    });
+    // shutting the old table data manager
+    TableDataManager oldTdm = toBeShutdown.get();

Review Comment:
   We cannot directly shut down the data manager:
   1. Segments are not released yet
   2. There might be queries still on the old data manager



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1281277201


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java:
##########
@@ -20,15 +20,25 @@
 
 import java.util.Map;
 
+
 public class ServerReloadControllerJobStatusResponse {
   private double _timeElapsedInMinutes;
   private double _estimatedTimeRemainingInMinutes;
   private int _totalSegmentCount;
+  private int _totalCount;

Review Comment:
   So this class was used for reloading all segments API - wanted to share the same response class here and tried to generify this. Progress is successCount / totalCount (didn't want to overload totalSegmentCount for reloadTable as that's very specific to segments and not table as such).
   
   The progress for this is how many servers have reloaded their table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1307681623


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -218,22 +226,51 @@ public synchronized void shutDown() {
   public void addRealtimeSegment(String realtimeTableName, String segmentName)
       throws Exception {
     LOGGER.info("Adding segment: {} to table: {}", segmentName, realtimeTableName);
-    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName);
+    Pair<TableConfig, ZNRecord> tableConfigInfo =
+        ZKMetadataProvider.getTableConfigWithZNRecord(_propertyStore, realtimeTableName);
+    TableConfig tableConfig = tableConfigInfo.getLeft();
     Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", realtimeTableName);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig);
     Preconditions.checkState(schema != null, "Failed to find schema for table: %s", realtimeTableName);
     SegmentZKMetadata zkMetadata =
         ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, realtimeTableName, segmentName);
     Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName,
         realtimeTableName);
-    _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> createTableDataManager(k, tableConfig))
+    _tableDataManagerMap.computeIfAbsent(realtimeTableName,
+            k -> createTableDataManager(k, tableConfig, tableConfigInfo.getRight()))
         .addSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), zkMetadata);
     LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName);
   }
 
-  private TableDataManager createTableDataManager(String tableNameWithType, TableConfig tableConfig) {
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because
+    // of the following race condition
+    // say we have 8 kafka partitions and we are ingesting into 8 consuming segments at the moment
+    // say there were 8 sealed segments before this (one for each partition)
+    // now, if we end up iterating through the 16 segments and add them one by one
+    // say during this progress one of them becomes committed and the state would be updated
+    // then we would read the stale zk state (the commit happens after we read this config)
+    // and add the committed segment as consuming segment
+    // to avoid such race conditions, this method does a snapshot-ed read of all the segments
+    // for this table in one shot
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName);

Review Comment:
   Why do we take another table config here? We should pass the one from the caller to ensure the version is correctly compared and set



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -310,6 +311,13 @@ public static TableConfig getTableConfig(ZkHelixPropertyStore<ZNRecord> property
         AccessOption.PERSISTENT));
   }
 
+  public static Pair<TableConfig, ZNRecord> getTableConfigWithZNRecord(ZkHelixPropertyStore<ZNRecord> propertyStore,
+      String tableNameWithType) {
+    ZNRecord znRecord = propertyStore.get(constructPropertyStorePathForResourceConfig(tableNameWithType), null,

Review Comment:
   Can you double check if this will set the node version into the `ZNRecord`? I think you'll need to pass in a `Stat` to gather that info



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java:
##########
@@ -86,6 +89,31 @@ public void reset() {
     LOGGER.info("Reset called");
   }
 
+  private class ReloadTableMessageHandler extends MessageHandler {
+    final String _tableNameWithType;
+    final boolean _force;
+
+    ReloadTableMessageHandler(TableReloadMessage tableReloadMessage, NotificationContext context) {
+      super(tableReloadMessage, context);
+      _tableNameWithType = tableReloadMessage.getTableNameWithType();
+      _force = tableReloadMessage.isForce();
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() {
+      _instanceDataManager.reloadTable(_tableNameWithType, _force, _segmentRefreshSemaphore);
+      HelixTaskResult result = new HelixTaskResult();
+      result.setSuccess(true);
+      return result;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+      LOGGER.error("Got error while refreshing table config for table: {} (error code: {}, error type: {})",

Review Comment:
   (minor) Revise the error message



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -242,6 +242,30 @@ public void addRealtimeSegment(String realtimeTableName, String segmentName)
     LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName);
   }
 
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because

Review Comment:
   I did some research, and seems the behavior of `get()` during `compute()` on the same key is java version dependent (might block), but typically non-blocking.
   Assuming it is non-blocking, then we need to ensure making all other write operations (e.g. add segment, remove segment, reload segment etc.) blocking so that they are not applied to the old data manager.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -104,6 +109,72 @@ public Response pauseConsumption(
     }
   }
 
+  @POST
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/{tableName}/reload")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.RELOAD_TABLE)
+  @ApiOperation(value = "Reloads the table across all the servers", notes = "This would reconstruct the table data"
+      + "manager in case of configuration changes. Example usage: trigger after converting the upsert mode"
+      + "from full to partial.")
+  public SuccessResponse reloadTable(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName")
+      String tableName,
+      @ApiParam(value = "Whether to force the reload (if table config is not updated, reload will not be done)")
+      @QueryParam("force") boolean force,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+      @Context HttpHeaders httpHeaders, @Context Request request) {
+    List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
+        tableName, Constants.validateTableType(tableTypeStr), LOGGER);
+    if (tableNamesWithType.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find table: " + tableName,
+          Response.Status.NOT_FOUND);
+    }
+    // TODO: check if another ongoing reload job exists and fail the request
+    // this cannot be done right now because reload controller job doesn't have a status field
+    StringBuilder response = new StringBuilder();
+    for (String tableNameWithType: tableNamesWithType) {
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadTable(tableNameWithType, force);
+      boolean zkJobMetaWriteSuccess = false;
+      if (msgInfo.getLeft() > 0) {
+        try {
+          if (_pinotHelixResourceManager.addNewReloadTableJob(tableNameWithType, msgInfo.getRight(),
+              msgInfo.getLeft())) {
+            zkJobMetaWriteSuccess = true;
+          } else {
+            LOGGER.error("Failed to add reload table job meta into zookeeper for table: {}", tableNameWithType);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Failed to add reload segment job meta into zookeeper for table: {}", tableNameWithType, e);
+        }
+        response.append(String.format("Submitted reload table job for table: %s, with id: %s, sent %d reload messages."
+                + " Job meta ZK storage status: %s", tableNameWithType, msgInfo.getRight(), msgInfo.getLeft(),
+            zkJobMetaWriteSuccess ? "SUCCESS" : "FAILED")).append(". ");
+      } else {
+        throw new ControllerApplicationException(LOGGER, "Failed to find table: " + tableNameWithType,
+            Response.Status.NOT_FOUND);
+      }
+    }
+    return new SuccessResponse(response.toString().trim());
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/tables/tableReloadStatus/{jobId}")

Review Comment:
   Need `@Authorize` here as well



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -218,22 +226,51 @@ public synchronized void shutDown() {
   public void addRealtimeSegment(String realtimeTableName, String segmentName)
       throws Exception {
     LOGGER.info("Adding segment: {} to table: {}", segmentName, realtimeTableName);
-    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName);
+    Pair<TableConfig, ZNRecord> tableConfigInfo =
+        ZKMetadataProvider.getTableConfigWithZNRecord(_propertyStore, realtimeTableName);
+    TableConfig tableConfig = tableConfigInfo.getLeft();
     Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", realtimeTableName);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig);
     Preconditions.checkState(schema != null, "Failed to find schema for table: %s", realtimeTableName);
     SegmentZKMetadata zkMetadata =
         ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, realtimeTableName, segmentName);
     Preconditions.checkState(zkMetadata != null, "Failed to find ZK metadata for segment: %s, table: %s", segmentName,
         realtimeTableName);
-    _tableDataManagerMap.computeIfAbsent(realtimeTableName, k -> createTableDataManager(k, tableConfig))
+    _tableDataManagerMap.computeIfAbsent(realtimeTableName,
+            k -> createTableDataManager(k, tableConfig, tableConfigInfo.getRight()))
         .addSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema), zkMetadata);
     LOGGER.info("Added segment: {} to table: {}", segmentName, realtimeTableName);
   }
 
-  private TableDataManager createTableDataManager(String tableNameWithType, TableConfig tableConfig) {
+  private void snapshotAndAddSegments(TableDataManager tableDataManager, String realtimeTableName)
+      throws Exception {
+    // we need to do an atomic snapshot here and then add segments because
+    // of the following race condition
+    // say we have 8 kafka partitions and we are ingesting into 8 consuming segments at the moment
+    // say there were 8 sealed segments before this (one for each partition)
+    // now, if we end up iterating through the 16 segments and add them one by one
+    // say during this progress one of them becomes committed and the state would be updated
+    // then we would read the stale zk state (the commit happens after we read this config)
+    // and add the committed segment as consuming segment
+    // to avoid such race conditions, this method does a snapshot-ed read of all the segments
+    // for this table in one shot
+    TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, realtimeTableName);
+    Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", realtimeTableName);
+    Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableConfig);
+    List<SegmentZKMetadata> segments = ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, realtimeTableName);
+    for (SegmentZKMetadata segment : segments) {
+      String segmentName = segment.getSegmentName();
+      tableDataManager.addSegment(segmentName, new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema),

Review Comment:
   This API is used for RT table only. It will throw exception for OFFLINE table



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Aravind-Suresh commented on a diff in pull request #11183: Reload Table API to reload table data

Posted by "Aravind-Suresh (via GitHub)" <gi...@apache.org>.
Aravind-Suresh commented on code in PR #11183:
URL: https://github.com/apache/pinot/pull/11183#discussion_r1300763301


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -696,44 +696,9 @@ public ServerReloadControllerJobStatusResponse getReloadJobStatus(
       totalSegments += entry.getValue().size();
     }
     serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
-    serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
-    serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
-
-    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
-      String responseString = streamResponse.getValue();
-      try {
-        ServerReloadControllerJobStatusResponse response =
-            JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class);
-        serverReloadControllerJobStatusResponse.setSuccessCount(
-            serverReloadControllerJobStatusResponse.getSuccessCount() + response.getSuccessCount());
-      } catch (Exception e) {
-        serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(
-            serverReloadControllerJobStatusResponse.getTotalServerCallsFailed() + 1
-        );
-      }
-    }
-
-    // Add ZK fields
-    serverReloadControllerJobStatusResponse.setMetadata(controllerJobZKMetadata);
-
-    // Add derived fields
-    long submissionTime =
-        Long.parseLong(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
-    double timeElapsedInMinutes = ((double) System.currentTimeMillis() - (double) submissionTime) / (1000.0 * 60.0);
-    int remainingSegments = serverReloadControllerJobStatusResponse.getTotalSegmentCount()
-        - serverReloadControllerJobStatusResponse.getSuccessCount();
-
-    double estimatedRemainingTimeInMinutes = -1;
-    if (serverReloadControllerJobStatusResponse.getSuccessCount() > 0) {
-      estimatedRemainingTimeInMinutes =
-          ((double) remainingSegments / (double) serverReloadControllerJobStatusResponse.getSuccessCount())
-              * timeElapsedInMinutes;
-    }
-
-    serverReloadControllerJobStatusResponse.setTimeElapsedInMinutes(timeElapsedInMinutes);
-    serverReloadControllerJobStatusResponse.setEstimatedTimeRemainingInMinutes(estimatedRemainingTimeInMinutes);
-
-    return serverReloadControllerJobStatusResponse;
+    serverReloadControllerJobStatusResponse.setTotalCount(totalSegments);
+    return ResourceUtils.buildServerReloadControllerJobStatusResponse(controllerJobZKMetadata,

Review Comment:
   Makes sense, will remove this related refactoring and introduce a boolean state to track whether the operation is completed by all servers or not done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org