You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/06/03 12:00:02 UTC

[GitHub] [pinot] saurabhd336 opened a new pull request, #8828: (WIP) Add server API for task status

saurabhd336 opened a new pull request, #8828:
URL: https://github.com/apache/pinot/pull/8828

   Instructions:
   1. The PR has to be tagged with at least one of the following labels (*):
      1. `feature`
      2. `bugfix`
      3. `performance`
      4. `ui`
      5. `backward-incompat`
      6. `release-notes` (**)
   2. Remove these instructions before publishing the PR.
    
   (*) Other labels to consider:
   - `testing`
   - `dependencies`
   - `docker`
   - `kubernetes`
   - `observability`
   - `security`
   - `code-style`
   - `extension-point`
   - `refactor`
   - `cleanup`
   
   (**) Use `release-notes` label for scenarios like:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r928533794


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",
+            tableNameWithType, segmentName, e);
+      }
+      return new SuccessResponse("Sent " + msgInfo + " reload messages");

Review Comment:
   Ack. Added both the job id as well as success / failure status of ZK write



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -563,14 +662,24 @@ public SuccessResponse reloadAllSegments(
     if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) {
       tableTypeFromRequest = TableType.OFFLINE;
     }
-    List<String> tableNamesWithType = ResourceUtils
-        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER);
-    Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>();
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
+            LOGGER);
+    Map<String, Pair<Integer, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
-      numMessagesSentPerTable.put(tableNameWithType, numMsgSent);
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      perTableMsgData.put(tableNameWithType, msgInfo);
+      // Store in ZK
+      try {
+        if (!_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(),
+            msgInfo.getLeft())) {
+          LOGGER.error("Failed to add reload all segments job meta into zookeeper for table {}", tableNameWithType);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload all segments job meta into zookeeper for table {}", tableNameWithType, e);

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931815219


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(jobId);
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find job metadata for id: {}", jobId, e);
+    }
+    return null;
+  }
+
+  /**
+   * Returns a Map of jobId to job's ZK metadata for the given table
+   * @param tableNameWithType the table for which jobs are to be fetched
+   * @return A Map of jobId to job properties
+   */
+  public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) {
+    String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1);
+      Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields();
+      return controllerJobs.entrySet().stream().filter(
+          job -> job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE)
+              .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find controller job node for table : {}", tableNameWithType, e);
+    }
+
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Adds a new reload segment job metadata into ZK
+   * @param tableNameWithType Table for which job is to be added
+   * @param segmentName Name of the segment being reloaded
+   * @param jobId job's UUID
+   * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  /**
+   * Adds a new reload segment job metadata into ZK
+   * @param tableNameWithType Table for which job is to be added
+   * @param jobId job's UUID
+   * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private boolean addReloadJobToZK(String tableNameWithType, String taskId, Map<String, String> jobMetadata) {

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931804308


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +599,99 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
+          Status.NOT_FOUND);
+    }
+
+    String tableNameWithType =
+        controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE)
+        .equals(ControllerJobType.RELOAD_SEGMENT.toString())) {
+      // No need to query servers where this segment is not supposed to be hosted
+      serverToSegments = serverToSegments.entrySet().stream().filter(kv -> kv.getValue()
+              .contains(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME)))
+          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint =
+          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp="
+              + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME);
+      if (singleSegmentName != null) {
+        reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName;
+      }
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse =
+        new ServerReloadControllerJobStatusResponse();
+    serverReloadControllerJobStatusResponse.setSuccessCount(0);
+    serverReloadControllerJobStatusResponse.setTotalSegmentCount(
+        _pinotHelixResourceManager.getSegmentsCount(tableNameWithType));

Review Comment:
   This is a new function that I had added
   https://github.com/apache/pinot/pull/8828/files#diff-2417e5cd604d78cf5622d8d00deaa5bcada5100e2980345ed80ac234ed5ff27eR2662
   
   idealState.getPartitionSet().size() does include all replicas right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r902143102


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +556,70 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableNameWithType}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableNameWithType") String tableNameWithType,

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r893069831


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadTaskStatusCache.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+
+public class SegmentReloadTaskStatusCache {
+
+  public static class SegmentReloadStatusValue {
+    private final long _totalSegmentCount;
+    private final AtomicLong _successCount;
+
+    SegmentReloadStatusValue(long totalSegmentCount) {
+      _totalSegmentCount = totalSegmentCount;
+      _successCount = new AtomicLong(0);
+    }
+
+    @JsonIgnore
+    public void incrementSuccess() {
+      _successCount.addAndGet(1);
+    }
+
+    public long getTotalSegmentCount() {
+      return _totalSegmentCount;
+    }
+
+    public long getSuccessCount() {
+      return _successCount.get();
+    }
+  }
+
+  private SegmentReloadTaskStatusCache() {
+  }
+
+  private static final Map<String, SegmentReloadStatusValue> SEGMENT_RELOAD_STATUS_MAP = new ConcurrentHashMap<>();

Review Comment:
   Should I replace this map with a LRU map instead then? I think that might achieve something close to this and we can specify max size when initialising the map



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r893072567


##########
pinot-common/src/main/java/org/apache/pinot/common/messages/SegmentReloadMessage.java:
##########
@@ -35,6 +35,9 @@ public class SegmentReloadMessage extends Message {
 
   private static final String FORCE_DOWNLOAD_KEY = "forceDownload";
 
+  // todo (saurabh) : getMsgId() returns different id on the server side than the one being set at controller. Check

Review Comment:
   Yeah the getMsgId does fetch the record's zk id but for some reason I was seeing this difference on server and controller side. I'll look into this more and update the PR accordingly if we are able to use msgId itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r896973453


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(

Review Comment:
   Although I could not find "retention" specification when creating ZNodes. Is there any example already in place where we might be doing that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1159334315

   > @mcvsubbu So nothing about the core functionality wrt reloading segments has been changed here. It's still largely a fire and forget operation, using helix messages that are handled by servers. This PR just associates each reload op with a trackable id, writes some additional meta about the task into ZK, and provides APIs which can be used to 1) Get the status of a given realod op task id 2) Get a historical list of reload tasks.
   
   When you start using new znodes, then we need to worry about upgrades, cleaning up of znodes, etc.  Please write up a design document on this before proceeding.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r905720173


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -556,6 +556,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class Task {

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
npawar commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r905354263


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -556,6 +556,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class Task {

Review Comment:
   rename all "Task" in this file to "Job" (and also in PinotHelixResourceManager)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.util.Map;
+
+public class ServerReloadControllerJobStatusResponse {
+  private double _timeElapsedInMinutes;
+  private double _estimatedTimeRemainingInMinutes;
+  private int _totalSegmentCount;
+  private int _successCount;
+  private int _totalServersQueried;
+  private int _totalServerCallsFailed;
+  private Map<String, String> _taskMetadata;

Review Comment:
   jobMetadata or just metadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r900345503


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +431,16 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add job meta into zookeeper ", e);

Review Comment:
   Add segment name & tablename in the log message. 
   
   Will not repeat for other log messages



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +556,70 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableNameWithType}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableNameWithType") String tableNameWithType,

Review Comment:
   I believe most other APIs have table name and table type as independent parameters. We should keep all APIs that way, and move towards this model if that is not already the case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r935162064


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class ControllerJobStatusResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Path("/controllerJob/reloadStatus/{tableNameWithType}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given task")
+  public String taskStatus(@PathParam("tableNameWithType") String tableNameWithType,

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r896414574


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TaskStatusResource.java:
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import org.apache.pinot.server.starter.helix.SegmentReloadTaskStatusCache;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class TaskStatusResource {
+  @GET
+  @Path("/task/status/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given task")
+  public String taskStatus(@PathParam("taskId") String taskId)
+      throws Exception {
+    SegmentReloadTaskStatusCache.SegmentReloadStatusValue segmentReloadStatusValue =
+        SegmentReloadTaskStatusCache.getStatus(taskId);
+
+    return JsonUtils.objectToString(segmentReloadStatusValue);

Review Comment:
   Yes. It returns "null" string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1154958296

   I've updated the PR to 
   1) Add taskMeta to ZK.
   2) Add an API to return all tasks for a table.
   
   Yet to add
   1) Retention on task ZNode
   2) Track status for single segment reload
   
   I was hoping to get the approach validated first, for all segments API. Once we are aligned on this, I'll update the PR to include changes for single segment reload.
   
   cc: @Jackie-Jiang @npawar 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1156332105

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/8828?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#8828](https://codecov.io/gh/apache/pinot/pull/8828?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (56fc4ee) into [master](https://codecov.io/gh/apache/pinot/commit/f3bde9ff674bc50ca296e22a0a8df2ded0168fa1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (f3bde9f) will **decrease** coverage by `54.73%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #8828       +/-   ##
   =============================================
   - Coverage     69.60%   14.87%   -54.74%     
   + Complexity     4997      170     -4827     
   =============================================
     Files          1806     1764       -42     
     Lines         94202    92297     -1905     
     Branches      14050    13830      -220     
   =============================================
   - Hits          65571    13726    -51845     
   - Misses        24072    77583    +53511     
   + Partials       4559      988     -3571     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `?` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `14.87% <0.00%> (-0.56%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/8828?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...he/pinot/common/messages/SegmentReloadMessage.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWVzc2FnZXMvU2VnbWVudFJlbG9hZE1lc3NhZ2UuamF2YQ==) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...ache/pinot/common/metadata/ZKMetadataProvider.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvWktNZXRhZGF0YVByb3ZpZGVyLmphdmE=) | `0.00% <0.00%> (-65.93%)` | :arrow_down: |
   | [...rg/apache/pinot/common/metadata/task/TaskType.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvdGFzay9UYXNrVHlwZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...che/pinot/common/metadata/task/TaskZKMetadata.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0YWRhdGEvdGFzay9UYXNrWktNZXRhZGF0YS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ler/api/resources/PinotSegmentRestletResource.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90U2VnbWVudFJlc3RsZXRSZXNvdXJjZS5qYXZh) | `10.18% <0.00%> (-17.06%)` | :arrow_down: |
   | [...oller/api/resources/PinotTableRestletResource.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1Bpbm90VGFibGVSZXN0bGV0UmVzb3VyY2UuamF2YQ==) | `45.60% <0.00%> (-6.91%)` | :arrow_down: |
   | [.../api/resources/ServerReloadTaskStatusResponse.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9hcGkvcmVzb3VyY2VzL1NlcnZlclJlbG9hZFRhc2tTdGF0dXNSZXNwb25zZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ntroller/helix/core/PinotHelixResourceManager.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9jb3JlL1Bpbm90SGVsaXhSZXNvdXJjZU1hbmFnZXIuamF2YQ==) | `63.19% <0.00%> (-3.81%)` | :arrow_down: |
   | [...va/org/apache/pinot/spi/utils/CommonConstants.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQ29tbW9uQ29uc3RhbnRzLmphdmE=) | `0.00% <0.00%> (-28.58%)` | :arrow_down: |
   | [...src/main/java/org/apache/pinot/sql/FilterKind.java](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcWwvRmlsdGVyS2luZC5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1393 more](https://codecov.io/gh/apache/pinot/pull/8828/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/pinot/pull/8828?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/pinot/pull/8828?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [f3bde9f...56fc4ee](https://codecov.io/gh/apache/pinot/pull/8828?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r898590984


##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -110,6 +115,10 @@ public static String constructPropertyStorePathForInstancePartitions(String inst
     return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, instancePartitionsName);
   }
 
+  public static String constructPropertyStorePathForTaskResource(String resourceName) {

Review Comment:
   ```suggestion
     public static String constructPropertyStorePathForTask(String resourceName) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -563,16 +646,26 @@ public SuccessResponse reloadAllSegments(
     if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) {
       tableTypeFromRequest = TableType.OFFLINE;
     }
-    List<String> tableNamesWithType = ResourceUtils
-        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER);
-    Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>();
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
+            LOGGER);
+    Map<String, Pair<Integer, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
-      numMessagesSentPerTable.put(tableNameWithType, numMsgSent);
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      perTableMsgData.put(tableNameWithType, msgInfo);
+      // Store in ZK
+      try {
+        _pinotHelixResourceManager.addNewReloadAllSegmentsTask(tableNameWithType, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to store task meta in zookepper ", e);
+      }
     }
-    return new SuccessResponse("Sent " + numMessagesSentPerTable + " reload messages");
+    return new SuccessResponse("Sent " + perTableMsgData + " reload messages");
   }
 
+

Review Comment:
   (nit) revert the empty lines



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -98,6 +99,10 @@ public static InstanceZKMetadata getInstanceZKMetadata(ZkHelixPropertyStore<ZNRe
     return new InstanceZKMetadata(znRecord);
   }
 
+  public static String constructPropertyStorePathForTask(String resourceName, String taskId) {

Review Comment:
   Suggest not keeping per-task level ZK metadata. It might create too many ZNode, and can be easily left over if not handled properly



##########
pinot-common/src/main/java/org/apache/pinot/common/messages/SegmentReloadMessage.java:
##########
@@ -35,6 +35,9 @@ public class SegmentReloadMessage extends Message {
 
   private static final String FORCE_DOWNLOAD_KEY = "forceDownload";
 
+  // todo (saurabh) : getMsgId() returns different id on the server side than the one being set at controller. Check

Review Comment:
   Any update on this?



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadTaskStatusCache.java:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.cache.CacheBuilder;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+
+public class SegmentReloadTaskStatusCache {
+
+  public static class SegmentReloadStatusValue {
+    private final long _totalSegmentCount;
+    private final AtomicLong _successCount;
+
+    SegmentReloadStatusValue(long totalSegmentCount) {
+      _totalSegmentCount = totalSegmentCount;
+      _successCount = new AtomicLong(0);
+    }
+
+    @JsonIgnore
+    public void incrementSuccess() {
+      _successCount.addAndGet(1);
+    }
+
+    public long getTotalSegmentCount() {
+      return _totalSegmentCount;
+    }
+
+    public long getSuccessCount() {
+      return _successCount.get();
+    }
+  }
+
+  private SegmentReloadTaskStatusCache() {
+  }
+
+  private static final ConcurrentMap<String, SegmentReloadStatusValue> SEGMENT_RELOAD_STATUS_MAP =
+      CacheBuilder.newBuilder()

Review Comment:
   Will this create a LRU cache?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930664772


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class ControllerJobStatusResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Path("/controllerJob/reloadStatus/{tableNameWithType}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given task")
+  public String taskStatus(@PathParam("tableNameWithType") String tableNameWithType,
+      @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp,
+      @QueryParam("segmentName") String segmentName)
+      throws Exception {
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+
+    if (segmentName == null) {
+      // All segments
+      List<SegmentDataManager> allSegments = tableDataManager.acquireAllSegments();
+      try {
+        long successCount = 0;
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          if (segmentDataManager.getLoadTimeMs() >= reloadJobSubmissionTimestamp) {
+            successCount++;
+          }
+        }
+        SegmentReloadStatusValue segmentReloadStatusValue =
+            new SegmentReloadStatusValue(allSegments.size(), successCount);
+        return JsonUtils.objectToString(segmentReloadStatusValue);
+      } finally {
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          tableDataManager.releaseSegment(segmentDataManager);
+        }
+      }
+    } else {
+      SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+      if (segmentDataManager == null) {
+        throw new WebApplicationException("Segment: " + segmentName + " is not found", Response.Status.NOT_FOUND);

Review Comment:
   With controller now filtering servers for the segment being queried using idealstate, this should be far less frequent.
   And yes the controller makes the API calls using `CompletionServiceHelper` which handles non 200 responses correctly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1203009290

   Let's add a `release-notes` section in the PR description to list down the new rest APIs added


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930663113


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(controllerJobResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) {
+    String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(jobsResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1);
+      Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields();
+      return controllerJobs.entrySet().stream().filter(
+          job -> job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE)
+              .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private boolean addReloadJobToZK(String tableNameWithType, String taskId, Map<String, String> jobMetadata) {
+    String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    Stat stat = new Stat();
+    ZNRecord tableJobsZnRecord = _propertyStore.get(jobResourcePath, stat, AccessOption.PERSISTENT);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r896697032


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(

Review Comment:
   Have added the API, anc the changes in realod API to store taskMetadata into ZK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930666999


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "controller.job.type";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "controller.job.table.name";
+    public static final String CONTROLLER_JOB_ID = "controller.job.id";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = "controller.job.submission.time";
+    public static final String CONTROLLER_JOB_MESSAGES_COUNT = "controller.job.messages.count";
+
+    /**
+     * Segment reload task ZK props
+     */
+    public static final Integer MAXIMUM_RELOAD_JOBS_IN_ZK = 100;

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930665635


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "controller.job.type";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "controller.job.table.name";
+    public static final String CONTROLLER_JOB_ID = "controller.job.id";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = "controller.job.submission.time";
+    public static final String CONTROLLER_JOB_MESSAGES_COUNT = "controller.job.messages.count";
+
+    /**
+     * Segment reload task ZK props

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930659917


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(controllerJobResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r898862777


##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -110,6 +115,10 @@ public static String constructPropertyStorePathForInstancePartitions(String inst
     return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, instancePartitionsName);
   }
 
+  public static String constructPropertyStorePathForTaskResource(String resourceName) {

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r898872892


##########
pinot-common/src/main/java/org/apache/pinot/common/messages/SegmentReloadMessage.java:
##########
@@ -35,6 +35,9 @@ public class SegmentReloadMessage extends Message {
 
   private static final String FORCE_DOWNLOAD_KEY = "forceDownload";
 
+  // todo (saurabh) : getMsgId() returns different id on the server side than the one being set at controller. Check

Review Comment:
   So helix, when sending a PARTICIPANT message, generates one message per participant to store into ZK. When doing that, it overrides the original message's id with a new random one
   https://github.com/apache/helix/blob/master/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java#L199
   
   It only retains the fields from the original message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Add controller API for reload segment task status [pinot]

Posted by "walterddr (via GitHub)" <gi...@apache.org>.
walterddr commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r1359063941


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +599,107 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
+          Status.NOT_FOUND);
+    }
+
+    String tableNameWithType =
+        controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    Map<String, List<String>> serverToSegments;
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.JOB_TYPE)
+        .equals(ControllerJobType.RELOAD_SEGMENT.toString())) {
+      // No need to query servers where this segment is not supposed to be hosted
+      singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+      serverToSegments = new HashMap<>();
+      List<String> segmentList = Arrays.asList(singleSegmentName);
+      _pinotHelixResourceManager.getServers(tableNameWithType, singleSegmentName).forEach(server -> {
+        serverToSegments.put(server, segmentList);
+      });
+    } else {
+      serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint =
+          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp="
+              + controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
+      if (singleSegmentName != null) {
+        reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName;
+      }
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse =
+        new ServerReloadControllerJobStatusResponse();
+    serverReloadControllerJobStatusResponse.setSuccessCount(0);
+
+    int totalSegments = 0;
+    for (Map.Entry<String, List<String>> entry: serverToSegments.entrySet()) {
+      totalSegments += entry.getValue().size();
+    }
+    serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
+    serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
+    serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
+
+    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
+      String responseString = streamResponse.getValue();
+      try {
+        ServerReloadControllerJobStatusResponse response =
+            JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class);

Review Comment:
   why are we using `ServerReloadControllerJobStatusResponse.class` in JsonUtil instead of `SegmentReloadStatusValue.class` ? does it guarantee to be always compatible? this seems like a risky assumption consider controller and server are upgraded separately during rolling upgrade



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930643047


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +598,98 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
+    String tableNameWithType = TableNameBuilder.forType(tableTypeFromRequest).tableNameWithType(tableName);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to find controller job: " + reloadJobId + " for table: " + tableNameWithType, Status.NOT_FOUND);
+    }
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE).equals(
+        ControllerJobType.RELOAD_SEGMENT.toString())) {
+      singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();

Review Comment:
   `Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);`
   Is already only getting the servers from the ideal state?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931808446


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +599,99 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
+          Status.NOT_FOUND);
+    }
+
+    String tableNameWithType =
+        controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE)
+        .equals(ControllerJobType.RELOAD_SEGMENT.toString())) {
+      // No need to query servers where this segment is not supposed to be hosted

Review Comment:
   Ack. Transformed the set of string being returned by this function to create a map segmentName -> List of servers for keeping the code logic below this uniform



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931823872


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Controller job ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "jobType";

Review Comment:
   Ack



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Controller job ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "jobType";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "tableName";
+    public static final String CONTROLLER_JOB_ID = "jobId";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = "submissionTime";
+    public static final String CONTROLLER_JOB_MESSAGES_COUNT = "messageCount";

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r898863653


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(

Review Comment:
   I've handled retention via explicitly cleaning up old tasks when new ones are created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r899769745


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    Map<String, String> taskZKMetadata = null;
+
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      instances.addAll(serverToSegments.keySet());
+
+      if (taskZKMetadata == null) {
+        taskZKMetadata = _pinotHelixResourceManager.getTaskZKMetadata(tableNameWithType, reloadTaskId);
+      }
+    }
+
+    BiMap<String, String> serverEndPoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(instances);
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/task/status/" + reloadTaskId;
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =

Review Comment:
   This one won't since its a set of serverUrls and not list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r899962024


##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -55,6 +55,7 @@ private ZKMetadataProvider() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ZKMetadataProvider.class);
   private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = "tenantIsolationEnabled";
+  private static final String PROPERTYSTORE_TASKS_PREFIX = "/TASKS";

Review Comment:
   CONTROLLER_JOB?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r899984959


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1975,6 +1977,105 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getTaskZKMetadata(String tableNameWithType, String taskId) {
+    String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(taskResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllTasksForTable(String tableNameWithType) {
+    String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableTaskRecord = _propertyStore.get(taskResourcePath, null, -1);
+      return tableTaskRecord.getMapFields();
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public void addNewReloadSegmentTask(String tableNameWithType, String segmentName, String taskId,
+      int numberOfMessagesSent) {
+    Map<String, String> taskMetadata = new HashMap<>();
+    taskMetadata.put(CommonConstants.Task.TASK_ID, taskId);
+    taskMetadata.put(CommonConstants.Task.TASK_TYPE, TaskType.RELOAD_SEGMENT.toString());
+    taskMetadata.put(CommonConstants.Task.TASK_SUBMISSION_TIME, Long.toString(System.currentTimeMillis()));
+    taskMetadata.put(CommonConstants.Task.TASK_MESSAGE_COUNT, Integer.toString(numberOfMessagesSent));
+    taskMetadata.put(CommonConstants.Task.SEGMENT_RELOAD_TASK_SEGMENT_NAME, segmentName);
+
+    String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    ZNRecord tableTaskZnRecord;
+
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      tableTaskZnRecord = _propertyStore.get(taskResourcePath, null, -1);
+      Map<String, Map<String, String>> tasks = tableTaskZnRecord.getMapFields();
+      tasks.put(taskId, taskMetadata);
+      if (tasks.size() > CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) {
+        tasks = tasks.
+            entrySet()
+            .stream()
+            .sorted(new Comparator<Map.Entry<String, Map<String, String>>>() {
+          @Override
+          public int compare(Map.Entry<String, Map<String, String>> v1, Map.Entry<String, Map<String, String>> v2) {
+            return Long.compare(Long.parseLong(v2.getValue().get(CommonConstants.Task.TASK_SUBMISSION_TIME)),
+                Long.parseLong(v1.getValue().get(CommonConstants.Task.TASK_SUBMISSION_TIME)));
+          }
+        })
+            .collect(Collectors.toList())
+            .subList(0, CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK)
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      }
+      tableTaskZnRecord.setMapFields(tasks);
+    } else {
+      tableTaskZnRecord = new ZNRecord(taskResourcePath);
+      tableTaskZnRecord.setMapField(taskId, taskMetadata);
+    }
+
+    _propertyStore.set(taskResourcePath, tableTaskZnRecord, AccessOption.PERSISTENT);
+  }
+
+  public void addNewReloadAllSegmentsTask(String tableNameWithType, String taskId, int numberOfMessagesSent) {
+    Map<String, String> taskMetadata = new HashMap<>();
+    taskMetadata.put(CommonConstants.Task.TASK_ID, taskId);
+    taskMetadata.put(CommonConstants.Task.TASK_TYPE, TaskType.RELOAD_ALL_SEGMENTS.toString());
+    taskMetadata.put(CommonConstants.Task.TASK_SUBMISSION_TIME, Long.toString(System.currentTimeMillis()));
+    taskMetadata.put(CommonConstants.Task.TASK_MESSAGE_COUNT, Integer.toString(numberOfMessagesSent));
+
+    String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    ZNRecord tableTaskZnRecord;
+
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      tableTaskZnRecord = _propertyStore.get(taskResourcePath, null, -1);
+      Map<String, Map<String, String>> tasks = tableTaskZnRecord.getMapFields();
+      tasks.put(taskId, taskMetadata);
+      if (tasks.size() > CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) {
+        tasks = tasks.

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1159336488

   > When you start using new znodes, then we need to worry about upgrades, cleaning up of znodes, etc. Please write up a design document on this before proceeding.
   
   Sounds good. I'll share the design doc for this. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931814889


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(jobId);
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find job metadata for id: {}", jobId, e);
+    }
+    return null;
+  }
+
+  /**
+   * Returns a Map of jobId to job's ZK metadata for the given table
+   * @param tableNameWithType the table for which jobs are to be fetched
+   * @return A Map of jobId to job properties
+   */
+  public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) {
+    String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1);
+      Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields();
+      return controllerJobs.entrySet().stream().filter(
+          job -> job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE)
+              .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find controller job node for table : {}", tableNameWithType, e);
+    }
+
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Adds a new reload segment job metadata into ZK
+   * @param tableNameWithType Table for which job is to be added
+   * @param segmentName Name of the segment being reloaded
+   * @param jobId job's UUID
+   * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId,
+      int numberOfMessagesSent) {

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931812899


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -597,14 +713,31 @@ public SuccessResponse reloadAllSegments(
     if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) {
       tableTypeFromRequest = TableType.OFFLINE;
     }
-    List<String> tableNamesWithType = ResourceUtils
-        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER);
-    Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>();
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
+            LOGGER);
+    Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
-      numMessagesSentPerTable.put(tableNameWithType, numMsgSent);
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      Map<String, String> tableReloadMeta = new HashMap<>();
+      tableReloadMeta.put("numberOfMessagesSent", String.valueOf(msgInfo.getLeft()));
+      tableReloadMeta.put("reloadMessageId", msgInfo.getRight());

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r935175091


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2382,56 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private String reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {
+    String jobId = null;
+    try {
+      String response =
+          sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
+              null);
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      JsonNode tableLevelDetails =
+          JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
+      String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
+
+      if (isZKWriteSuccess.equals("SUCCESS")) {
+        // We can validate reload status API now
+        jobId = tableLevelDetails.get("reloadJobId").asText();
+        String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(jobId));
+        JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
+
+        // Validate all fields are present
+        assertEquals(jobStatus.get("metadata").get("jobId").asText(), jobId);
+        assertEquals(jobStatus.get("metadata").get("jobType").asText(), "RELOAD_ALL_SEGMENTS");
+        assertEquals(jobStatus.get("metadata").get("tableName").asText(), tableNameWithType);
+      }
+    } catch (Exception e) {

Review Comment:
   ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931841990


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2367,44 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {
+    try {
+      String response =
+          sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
+              null);
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      JsonNode tableLevelDetails =
+          JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
+      String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
+
+      if (isZKWriteSuccess.equals("SUCCESS")) {

Review Comment:
   Ack. added in new test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r935176283


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2382,56 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private String reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {
+    String jobId = null;
+    try {
+      String response =
+          sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
+              null);
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      JsonNode tableLevelDetails =
+          JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
+      String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
+
+      if (isZKWriteSuccess.equals("SUCCESS")) {

Review Comment:
   ack. I was hoping to avoid cases where ZK metadata write fails, and we unnecessarily fail the test even though reload job was successful. But I guess it makes sense to validate the ZK write as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r928534270


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -563,14 +662,24 @@ public SuccessResponse reloadAllSegments(
     if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) {
       tableTypeFromRequest = TableType.OFFLINE;
     }
-    List<String> tableNamesWithType = ResourceUtils
-        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER);
-    Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>();
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
+            LOGGER);
+    Map<String, Pair<Integer, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
-      numMessagesSentPerTable.put(tableNameWithType, numMsgSent);
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      perTableMsgData.put(tableNameWithType, msgInfo);
+      // Store in ZK
+      try {
+        if (!_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(),
+            msgInfo.getLeft())) {
+          LOGGER.error("Failed to add reload all segments job meta into zookeeper for table {}", tableNameWithType);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload all segments job meta into zookeeper for table {}", tableNameWithType, e);
+      }
     }
-    return new SuccessResponse("Sent " + numMessagesSentPerTable + " reload messages");
+    return new SuccessResponse("Sent " + perTableMsgData + " reload messages");

Review Comment:
   Ack. Changed this to return both job id as well as zk meta write success / failure status



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r928533442


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",

Review Comment:
   Ack. Modified the response for this API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930641884


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +598,98 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)

Review Comment:
   Makes sense. Changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r893068727


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(

Review Comment:
   So that API just returns the list of active reload task ids and description like # of messages sent, table names etc?
   Agree that might be useful. But should that list be cleaned up by some sort of periodic task? Or no need?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r897574348


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2159,7 +2159,7 @@ public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMeta
     sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
   }
 
-  public int reloadAllSegments(String tableNameWithType, boolean forceDownload) {
+  public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean forceDownload) {

Review Comment:
   Have added these changes as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1159331570

   @mcvsubbu So nothing about the core functionality wrt reloading segments has been changed here. It's still largely a fire and forget operation, using helix messages that are handled by servers. This PR just associates each reload op with a trackable id, writes some additional meta about the task into ZK, and provides APIs which can be used to 1) Get the status of a given realod op task id 2) Get a historical list of reload tasks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r902143005


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +431,16 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add job meta into zookeeper ", e);

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r898744800


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadTaskStatusCache.java:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import com.google.common.cache.CacheBuilder;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+
+public class SegmentReloadTaskStatusCache {
+
+  public static class SegmentReloadStatusValue {
+    private final long _totalSegmentCount;
+    private final AtomicLong _successCount;
+
+    SegmentReloadStatusValue(long totalSegmentCount) {
+      _totalSegmentCount = totalSegmentCount;
+      _successCount = new AtomicLong(0);
+    }
+
+    @JsonIgnore
+    public void incrementSuccess() {
+      _successCount.addAndGet(1);
+    }
+
+    public long getTotalSegmentCount() {
+      return _totalSegmentCount;
+    }
+
+    public long getSuccessCount() {
+      return _successCount.get();
+    }
+  }
+
+  private SegmentReloadTaskStatusCache() {
+  }
+
+  private static final ConcurrentMap<String, SegmentReloadStatusValue> SEGMENT_RELOAD_STATUS_MAP =
+      CacheBuilder.newBuilder()

Review Comment:
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
npawar commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r900607520


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    Map<String, String> taskZKMetadata = null;
+
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      instances.addAll(serverToSegments.keySet());
+
+      if (taskZKMetadata == null) {
+        taskZKMetadata = _pinotHelixResourceManager.getTaskZKMetadata(tableNameWithType, reloadTaskId);
+      }
+    }
+
+    BiMap<String, String> serverEndPoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(instances);
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/task/status/" + reloadTaskId;
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =

Review Comment:
   But the realtime and offline table can be on completely different servers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r898872892


##########
pinot-common/src/main/java/org/apache/pinot/common/messages/SegmentReloadMessage.java:
##########
@@ -35,6 +35,9 @@ public class SegmentReloadMessage extends Message {
 
   private static final String FORCE_DOWNLOAD_KEY = "forceDownload";
 
+  // todo (saurabh) : getMsgId() returns different id on the server side than the one being set at controller. Check

Review Comment:
   So helix, when sending a PARTICIPANT message, generates one message per partition to store into ZK. When doing that, it overrides the original message's id with a new random one
   https://github.com/apache/helix/blob/master/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java#L199
   
   It only retains the fields from the original message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r929429595


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -459,9 +464,25 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    boolean zkJobMetaWriteSuccess = false;
+    if (msgInfo.getLeft() > 0) {
+      try {
+        if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft())) {
+          zkJobMetaWriteSuccess = true;
+        } else {
+          LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",

Review Comment:
   (nit) We usually put `:` for easier searching purpose, same for other places
   ```suggestion
             LOGGER.error("Failed to add reload segment job meta into zookeeper for table: {}, segment: {}",
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {

Review Comment:
   Annotate with `@Nullable`.
   Let's add some javadoc for the public method. Same for other public methods



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props

Review Comment:
   (nit)
   ```suggestion
        * Controller job ZK props
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "controller.job.type";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "controller.job.table.name";
+    public static final String CONTROLLER_JOB_ID = "controller.job.id";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = "controller.job.submission.time";
+    public static final String CONTROLLER_JOB_MESSAGES_COUNT = "controller.job.messages.count";
+
+    /**
+     * Segment reload task ZK props

Review Comment:
   (nit)
   ```suggestion
        * Segment reload job ZK props
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(controllerJobResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) {
+    String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(jobsResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1);
+      Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields();
+      return controllerJobs.entrySet().stream().filter(
+          job -> job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE)
+              .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private boolean addReloadJobToZK(String tableNameWithType, String taskId, Map<String, String> jobMetadata) {
+    String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    Stat stat = new Stat();
+    ZNRecord tableJobsZnRecord = _propertyStore.get(jobResourcePath, stat, AccessOption.PERSISTENT);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);

Review Comment:
   Move this line into `addNewReloadSegmentJob()` and `addNewReloadAllSegmentsJob()` to be more readable. We want to keep the metadata creation part together



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class ControllerJobStatusResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Path("/controllerJob/reloadStatus/{tableNameWithType}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given task")
+  public String taskStatus(@PathParam("tableNameWithType") String tableNameWithType,
+      @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp,
+      @QueryParam("segmentName") String segmentName)
+      throws Exception {
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+
+    if (segmentName == null) {
+      // All segments
+      List<SegmentDataManager> allSegments = tableDataManager.acquireAllSegments();
+      try {
+        long successCount = 0;
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          if (segmentDataManager.getLoadTimeMs() >= reloadJobSubmissionTimestamp) {
+            successCount++;
+          }
+        }
+        SegmentReloadStatusValue segmentReloadStatusValue =
+            new SegmentReloadStatusValue(allSegments.size(), successCount);
+        return JsonUtils.objectToString(segmentReloadStatusValue);
+      } finally {
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          tableDataManager.releaseSegment(segmentDataManager);
+        }
+      }
+    } else {
+      SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+      if (segmentDataManager == null) {
+        throw new WebApplicationException("Segment: " + segmentName + " is not found", Response.Status.NOT_FOUND);

Review Comment:
   Can we handle this properly on the controller side? Should we return 0/0 instead?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +598,98 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)

Review Comment:
   `jobId` itself should be enough? The `tableName` can be extracted from the ZK metadata



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +598,98 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
+    String tableNameWithType = TableNameBuilder.forType(tableTypeFromRequest).tableNameWithType(tableName);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to find controller job: " + reloadJobId + " for table: " + tableNameWithType, Status.NOT_FOUND);
+    }
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE).equals(
+        ControllerJobType.RELOAD_SEGMENT.toString())) {
+      singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();

Review Comment:
   We might not want to query all the servers, especially in large shared cluster. We can get the ideal state of the table, then use it to find the servers to query. We should also use the ideal state to determine the total number of segments to be reloaded because when there are segments moving around, server might not have the correct view on the segments to be reloaded



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "controller.job.type";

Review Comment:
   We usually use camel case as the key name in the ZNode. We may also simplify the key as it is always under the context of controller job. E.g. `jobType`, `jobId`, `tableName`, `submissionTimeMs`, `messagesCount`, `reloadSegmentName`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(controllerJobResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);

Review Comment:
   We can save one redundant ZK access and also avoid race condition, same for other places
   ```suggestion
       ZNRecord controllerJobRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
       if (controllerJobRecord != null) {
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "controller.job.type";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "controller.job.table.name";
+    public static final String CONTROLLER_JOB_ID = "controller.job.id";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = "controller.job.submission.time";
+    public static final String CONTROLLER_JOB_MESSAGES_COUNT = "controller.job.messages.count";
+
+    /**
+     * Segment reload task ZK props
+     */
+    public static final Integer MAXIMUM_RELOAD_JOBS_IN_ZK = 100;

Review Comment:
   This one applies to all jobs, instead of just the reload jobs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931813815


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931814630


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);

Review Comment:
   Ack. Yeah with AccessOption.PERSISTENT _propertyStore.get() does not throw `ZkNoNodeException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r935176493


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -427,6 +428,21 @@ public void testUploadSegmentRefreshOnly()
     cleanupTestTableDataManager(offlineTableName);
   }
 
+  @Test
+  public void testReloadStatusApi() {
+    String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
+    if (reloadJobId != null) {
+      // Spin until validated
+      TestUtils.waitForCondition(aVoid -> {
+        try {
+          return validateReloadJobSuccess(reloadJobId);

Review Comment:
   Ack. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r934899690


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class ControllerJobStatusResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Path("/controllerJob/reloadStatus/{tableNameWithType}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given task")
+  public String taskStatus(@PathParam("tableNameWithType") String tableNameWithType,

Review Comment:
   Revise the API doc and method name to reflect that this method is for the reload status check



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2382,56 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private String reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {
+    String jobId = null;
+    try {
+      String response =
+          sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
+              null);
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      JsonNode tableLevelDetails =
+          JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
+      String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
+
+      if (isZKWriteSuccess.equals("SUCCESS")) {
+        // We can validate reload status API now
+        jobId = tableLevelDetails.get("reloadJobId").asText();
+        String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(jobId));
+        JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
+
+        // Validate all fields are present
+        assertEquals(jobStatus.get("metadata").get("jobId").asText(), jobId);
+        assertEquals(jobStatus.get("metadata").get("jobType").asText(), "RELOAD_ALL_SEGMENTS");
+        assertEquals(jobStatus.get("metadata").get("tableName").asText(), tableNameWithType);
+      }
+    } catch (Exception e) {

Review Comment:
   We don't need to catch the exception. Instead we may directly throwing it out



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -427,6 +428,21 @@ public void testUploadSegmentRefreshOnly()
     cleanupTestTableDataManager(offlineTableName);
   }
 
+  @Test
+  public void testReloadStatusApi() {
+    String reloadJobId = reloadOfflineTableAndValidateResponse(getTableName(), false);
+    if (reloadJobId != null) {
+      // Spin until validated
+      TestUtils.waitForCondition(aVoid -> {
+        try {
+          return validateReloadJobSuccess(reloadJobId);

Review Comment:
   What I meant is that we can add the `validateReloadJobSuccess()` after every `reloadOfflineTableAndValidateResponse()` in other methods. All the segments should be successfully reloaded after the `TestUtils.waitForCondition()` check is passed



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2382,56 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private String reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {
+    String jobId = null;
+    try {
+      String response =
+          sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
+              null);
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      JsonNode tableLevelDetails =
+          JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
+      String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
+
+      if (isZKWriteSuccess.equals("SUCCESS")) {

Review Comment:
   Can we directly do `assertEquals()` here? It should return `SUCCESS` and we do want to verify this behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931808446


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +599,99 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
+          Status.NOT_FOUND);
+    }
+
+    String tableNameWithType =
+        controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE)
+        .equals(ControllerJobType.RELOAD_SEGMENT.toString())) {
+      // No need to query servers where this segment is not supposed to be hosted

Review Comment:
   Ack. Transformed the set of string being returned by this function to create a map server -> List of segments for keeping the code logic below this uniform



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r912613712


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1977,6 +1979,80 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String tableNameWithType, String taskId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    if (_propertyStore.exists(controllerJobResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) {
+    String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    if (_propertyStore.exists(jobsResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1);
+      return tableJobsRecord.getMapFields();
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public void addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
+    addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  public void addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private void addReloadJobToZK(String tableNameWithType, String taskId, Map<String, String> taskMetadata) {
+    String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    ZNRecord tableJobsZnRecord;
+
+    if (_propertyStore.exists(jobResourcePath, AccessOption.PERSISTENT)) {

Review Comment:
   Ack. Should we be retrying in case of a version mismatch? For now I've not added retries, in line with other version based _propertyStore updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r912614040


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",
+            tableNameWithType, segmentName, e);
+      }
+      return new SuccessResponse("Sent " + msgInfo + " reload messages");

Review Comment:
   Wanted to return the job id as part of the response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r910907030


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java:
##########
@@ -27,6 +27,15 @@
  */
 public abstract class SegmentDataManager {
   private int _referenceCount = 1;
+  private long _segmentLoadTimeInMillisEpoch = System.currentTimeMillis();

Review Comment:
   (minor) `_loadTimeMs` for simplicity, same for the getter and setter



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1977,6 +1979,80 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String tableNameWithType, String taskId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    if (_propertyStore.exists(controllerJobResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) {
+    String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    if (_propertyStore.exists(jobsResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1);
+      return tableJobsRecord.getMapFields();
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public void addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
+    addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  public void addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private void addReloadJobToZK(String tableNameWithType, String taskId, Map<String, String> taskMetadata) {
+    String jobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob(tableNameWithType);
+    ZNRecord tableJobsZnRecord;
+
+    if (_propertyStore.exists(jobResourcePath, AccessOption.PERSISTENT)) {

Review Comment:
   No need to check if the record exists. We can directly `get()` and check if the return is `null`. It can also prevent race condition.
   When the record exist, we should track the record version (passing a `Stat` into the `get()`), and check the expected version when setting the record to prevent race condition



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",
+            tableNameWithType, segmentName, e);
+      }
+      return new SuccessResponse("Sent " + msgInfo + " reload messages");

Review Comment:
   ```suggestion
         return new SuccessResponse("Sent " + msgInfo.getLeft() + " reload messages");
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",

Review Comment:
   Suggest reflecting this back to the user, instead of just logging an error and return success. User won't be able to read the reload status



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r892882977


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -245,7 +245,7 @@ public void reloadSegment(String tableNameWithType, String segmentName, boolean
   }
 
   @Override
-  public void reloadAllSegments(String tableNameWithType, boolean forceDownload,
+  public void reloadAllSegments(String taskId, String tableNameWithType, boolean forceDownload,

Review Comment:
   Let's also add `taskId` to the single `reloadSegment()` method



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadTaskStatusCache.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+
+public class SegmentReloadTaskStatusCache {
+
+  public static class SegmentReloadStatusValue {
+    private final long _totalSegmentCount;
+    private final AtomicLong _successCount;
+
+    SegmentReloadStatusValue(long totalSegmentCount) {
+      _totalSegmentCount = totalSegmentCount;
+      _successCount = new AtomicLong(0);
+    }
+
+    @JsonIgnore
+    public void incrementSuccess() {
+      _successCount.addAndGet(1);
+    }
+
+    public long getTotalSegmentCount() {
+      return _totalSegmentCount;
+    }
+
+    public long getSuccessCount() {
+      return _successCount.get();
+    }
+  }
+
+  private SegmentReloadTaskStatusCache() {
+  }
+
+  private static final Map<String, SegmentReloadStatusValue> SEGMENT_RELOAD_STATUS_MAP = new ConcurrentHashMap<>();

Review Comment:
   We can consider adding a size limit to the cache, and once the entry count reaches the limit, we remove the earliest entries



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TaskStatusResource.java:
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import org.apache.pinot.server.starter.helix.SegmentReloadTaskStatusCache;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class TaskStatusResource {
+  @GET
+  @Path("/task/status/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given task")
+  public String taskStatus(@PathParam("taskId") String taskId)
+      throws Exception {
+    SegmentReloadTaskStatusCache.SegmentReloadStatusValue segmentReloadStatusValue =
+        SegmentReloadTaskStatusCache.getStatus(taskId);
+
+    return JsonUtils.objectToString(segmentReloadStatusValue);

Review Comment:
   Will this handle `null` status properly?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      instances.addAll(serverToSegments.keySet());
+    }
+
+    BiMap<String, String> serverEndPoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(instances);
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/task/status/" + reloadTaskId;
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 1000);
+
+    ServerReloadTaskStatusResponse serverReloadTaskStatusResponse = new ServerReloadTaskStatusResponse();
+    serverReloadTaskStatusResponse.setSuccessCount(0);
+    serverReloadTaskStatusResponse.setTotalSegmentCount(0);
+    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
+      ServerReloadTaskStatusResponse response =
+          JsonUtils.stringToObject(streamResponse.getValue(), ServerReloadTaskStatusResponse.class);

Review Comment:
   `null` response need to be properly handled



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2159,7 +2159,7 @@ public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMeta
     sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
   }
 
-  public int reloadAllSegments(String tableNameWithType, boolean forceDownload) {
+  public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean forceDownload) {

Review Comment:
   Let's also track task id for single segment reload



##########
pinot-common/src/main/java/org/apache/pinot/common/messages/SegmentReloadMessage.java:
##########
@@ -35,6 +35,9 @@ public class SegmentReloadMessage extends Message {
 
   private static final String FORCE_DOWNLOAD_KEY = "forceDownload";
 
+  // todo (saurabh) : getMsgId() returns different id on the server side than the one being set at controller. Check

Review Comment:
   Interesting. The msgId should just be the ZNode id



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(

Review Comment:
   We should consider storing the task id and reload info (table name / segment name) into a ZK node, and add another API to read the reload tasks in the cluster in case user loses the response of the reload call. We can also put other useful info there, e.g. number of messages sent, which can be included in the status check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r918342988


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",
+            tableNameWithType, segmentName, e);
+      }
+      return new SuccessResponse("Sent " + msgInfo + " reload messages");

Review Comment:
   Then let's change the response message to include both job id and the messages sent, something like `"Submitted reload job: %s, sent %d reload messages"`



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -109,6 +110,10 @@ public static String constructPropertyStorePathForInstancePartitions(String inst
     return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, instancePartitionsName);
   }
 
+  public static String constructPropertyStorePathForControllerJob(String resourceName) {
+    return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, resourceName);

Review Comment:
   This will create one ZNode per table, which is too much IMO. We are just storing the recent controller jobs in the ZNode, and there shouldn't be a lot of them running in parallel, so we can have one single ZNode for all the tables. Keeping one single node will help track all the running jobs, and also clean up the old jobs.
   We can also consider having one node per job type to isolate the metadata for different jobs. All the entries for the same job type should have the same format, which is easier to manage



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -563,14 +662,24 @@ public SuccessResponse reloadAllSegments(
     if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) {
       tableTypeFromRequest = TableType.OFFLINE;
     }
-    List<String> tableNamesWithType = ResourceUtils
-        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER);
-    Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>();
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
+            LOGGER);
+    Map<String, Pair<Integer, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
-      numMessagesSentPerTable.put(tableNameWithType, numMsgSent);
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      perTableMsgData.put(tableNameWithType, msgInfo);
+      // Store in ZK
+      try {
+        if (!_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(),
+            msgInfo.getLeft())) {
+          LOGGER.error("Failed to add reload all segments job meta into zookeeper for table {}", tableNameWithType);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload all segments job meta into zookeeper for table {}", tableNameWithType, e);
+      }
     }
-    return new SuccessResponse("Sent " + numMessagesSentPerTable + " reload messages");
+    return new SuccessResponse("Sent " + perTableMsgData + " reload messages");

Review Comment:
   Let's make the response more clear to the user



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -425,9 +429,17 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    if (msgInfo.getLeft() > 0) {
+      try {
+        _pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",

Review Comment:
   ^^



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -563,14 +662,24 @@ public SuccessResponse reloadAllSegments(
     if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) {
       tableTypeFromRequest = TableType.OFFLINE;
     }
-    List<String> tableNamesWithType = ResourceUtils
-        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER);
-    Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>();
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
+            LOGGER);
+    Map<String, Pair<Integer, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
-      numMessagesSentPerTable.put(tableNameWithType, numMsgSent);
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      perTableMsgData.put(tableNameWithType, msgInfo);
+      // Store in ZK
+      try {
+        if (!_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(),
+            msgInfo.getLeft())) {
+          LOGGER.error("Failed to add reload all segments job meta into zookeeper for table {}", tableNameWithType);
+        }
+      } catch (Exception e) {
+        LOGGER.error("Failed to add reload all segments job meta into zookeeper for table {}", tableNameWithType, e);

Review Comment:
   Reflect the errors back to the user



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930640033


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -459,9 +464,25 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, forceDownload);
+    boolean zkJobMetaWriteSuccess = false;
+    if (msgInfo.getLeft() > 0) {
+      try {
+        if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(),
+            msgInfo.getLeft())) {
+          zkJobMetaWriteSuccess = true;
+        } else {
+          LOGGER.error("Failed to add reload segment job meta into zookeeper for table {}, segment {}",

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930662579


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(controllerJobResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);

Review Comment:
   Helix may throw a ZkNoNodeException exception if options = -1. So I've handled that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930674996


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {

Review Comment:
   Added javadoc. jobId must not be be null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931645241


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +599,99 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
+          Status.NOT_FOUND);
+    }
+
+    String tableNameWithType =
+        controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE)
+        .equals(ControllerJobType.RELOAD_SEGMENT.toString())) {
+      // No need to query servers where this segment is not supposed to be hosted

Review Comment:
   For single segment, we can use `_pinotHelixResourceManager.getServers()` instead



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -597,14 +713,31 @@ public SuccessResponse reloadAllSegments(
     if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) {
       tableTypeFromRequest = TableType.OFFLINE;
     }
-    List<String> tableNamesWithType = ResourceUtils
-        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER);
-    Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>();
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
+            LOGGER);
+    Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
-      numMessagesSentPerTable.put(tableNameWithType, numMsgSent);
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      Map<String, String> tableReloadMeta = new HashMap<>();
+      tableReloadMeta.put("numberOfMessagesSent", String.valueOf(msgInfo.getLeft()));
+      tableReloadMeta.put("reloadMessageId", msgInfo.getRight());

Review Comment:
   ```suggestion
         tableReloadMeta.put("numMessagesSent", String.valueOf(msgInfo.getLeft()));
         tableReloadMeta.put("reloadJobId", msgInfo.getRight());
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {

Review Comment:
   Annotate the return as `@Nullable`



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2367,44 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {
+    try {
+      String response =
+          sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
+              null);
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      JsonNode tableLevelDetails =
+          JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
+      String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
+
+      if (isZKWriteSuccess.equals("SUCCESS")) {

Review Comment:
   Let's verify that it is `SUCCESS`



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Controller job ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "jobType";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "tableName";
+    public static final String CONTROLLER_JOB_ID = "jobId";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = "submissionTime";

Review Comment:
   (minor) Consider adding `Ms` to be more clear
   ```suggestion
       public static final String SUBMISSION_TIME_MS = "submissionTimeMs";
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Controller job ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "jobType";

Review Comment:
   (minor) We may also simplify the key name because it is under `ControllerJob` class
   ```suggestion
       public static final String JOB_TYPE = "jobType";
   ```



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2367,44 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {
+    try {
+      String response =
+          sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
+              null);
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      JsonNode tableLevelDetails =
+          JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
+      String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
+
+      if (isZKWriteSuccess.equals("SUCCESS")) {
+        // We can validate reload status API now
+        String jobId = tableLevelDetails.get("reloadMessageId").asText();
+        String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(jobId));
+        JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
+
+        // Validate all fields are present
+        assertEquals(jobStatus.get("metadata").get("jobId").asText(), jobId);
+        assertEquals(jobStatus.get("metadata").get("jobType").asText(), "RELOAD_ALL_SEGMENTS");
+        assertEquals(jobStatus.get("metadata").get("tableName").asText(), tableNameWithType);
+      }
+    } catch (Exception e) {
+      Assert.fail("Reload failed :" + e.getMessage());

Review Comment:
   (nit)
   ```suggestion
         Assert.fail("Reload failed: " + e.getMessage());
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +599,99 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
+          Status.NOT_FOUND);
+    }
+
+    String tableNameWithType =
+        controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE)
+        .equals(ControllerJobType.RELOAD_SEGMENT.toString())) {
+      // No need to query servers where this segment is not supposed to be hosted
+      serverToSegments = serverToSegments.entrySet().stream().filter(kv -> kv.getValue()
+              .contains(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME)))
+          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint =
+          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp="
+              + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME);
+      if (singleSegmentName != null) {
+        reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName;
+      }
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse =
+        new ServerReloadControllerJobStatusResponse();
+    serverReloadControllerJobStatusResponse.setSuccessCount(0);
+    serverReloadControllerJobStatusResponse.setTotalSegmentCount(
+        _pinotHelixResourceManager.getSegmentsCount(tableNameWithType));

Review Comment:
   This is incorrect because it doesn't count the replicas. We can count the total segments in `serverToSegments` for table reload, and total servers for single segment reload



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(jobId);
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find job metadata for id: {}", jobId, e);
+    }
+    return null;
+  }
+
+  /**
+   * Returns a Map of jobId to job's ZK metadata for the given table
+   * @param tableNameWithType the table for which jobs are to be fetched
+   * @return A Map of jobId to job properties
+   */
+  public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) {
+    String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1);
+      Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields();
+      return controllerJobs.entrySet().stream().filter(
+          job -> job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE)
+              .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find controller job node for table : {}", tableNameWithType, e);
+    }
+
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Adds a new reload segment job metadata into ZK
+   * @param tableNameWithType Table for which job is to be added
+   * @param segmentName Name of the segment being reloaded
+   * @param jobId job's UUID
+   * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId,
+      int numberOfMessagesSent) {

Review Comment:
   (nit) we usually use `num` as a short version of `numberOf`
   ```suggestion
         int numMessagesSent) {
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Controller job ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "jobType";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = "tableName";
+    public static final String CONTROLLER_JOB_ID = "jobId";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = "submissionTime";
+    public static final String CONTROLLER_JOB_MESSAGES_COUNT = "messageCount";

Review Comment:
   (minor) Keep the key and value consistent
   ```suggestion
       public static final String MESSAGE_COUNT = "messageCount";
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);

Review Comment:
   We should not use option -1 here as it is not a valid option (this is not version). We usually use `AccessOption.PERSISTENT`. Same for other places, and we don't need to explicitly handle `ZkNoNodeException`



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2367,44 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {

Review Comment:
   Let's also test the job status after reloading is done



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1988,111 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  /**
+   * Returns the ZK metdata for the given jobId
+   * @param jobId the id of the job
+   * @return Map representing the job's ZK properties
+   */
+  public Map<String, String> getControllerJobZKMetadata(String jobId) {
+    String controllerJobResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(jobId);
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find job metadata for id: {}", jobId, e);
+    }
+    return null;
+  }
+
+  /**
+   * Returns a Map of jobId to job's ZK metadata for the given table
+   * @param tableNameWithType the table for which jobs are to be fetched
+   * @return A Map of jobId to job properties
+   */
+  public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) {
+    String jobsResourcePath = ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    try {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, -1);
+      Map<String, Map<String, String>> controllerJobs = tableJobsRecord.getMapFields();
+      return controllerJobs.entrySet().stream().filter(
+          job -> job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE)
+              .equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    } catch (ZkNoNodeException e) {
+      LOGGER.warn("Could not find controller job node for table : {}", tableNameWithType, e);
+    }
+
+    return Collections.emptyMap();
+  }
+
+  /**
+   * Adds a new reload segment job metadata into ZK
+   * @param tableNameWithType Table for which job is to be added
+   * @param segmentName Name of the segment being reloaded
+   * @param jobId job's UUID
+   * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, ControllerJobType.RELOAD_SEGMENT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName);
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  /**
+   * Adds a new reload segment job metadata into ZK
+   * @param tableNameWithType Table for which job is to be added
+   * @param jobId job's UUID
+   * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata
+   * @return boolean representing success / failure of the ZK write step
+   */
+  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE, tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private boolean addReloadJobToZK(String tableNameWithType, String taskId, Map<String, String> jobMetadata) {

Review Comment:
   ```suggestion
     private boolean addReloadJobToZK(String jobId, Map<String, String> jobMetadata) {
   ```



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class ControllerJobStatusResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Path("/controllerJob/reloadStatus/{tableNameWithType}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given task")
+  public String taskStatus(@PathParam("tableNameWithType") String tableNameWithType,
+      @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp,
+      @QueryParam("segmentName") String segmentName)
+      throws Exception {
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+
+    if (segmentName == null) {
+      // All segments
+      List<SegmentDataManager> allSegments = tableDataManager.acquireAllSegments();
+      try {
+        long successCount = 0;
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          if (segmentDataManager.getLoadTimeMs() >= reloadJobSubmissionTimestamp) {
+            successCount++;
+          }
+        }
+        SegmentReloadStatusValue segmentReloadStatusValue =
+            new SegmentReloadStatusValue(allSegments.size(), successCount);
+        return JsonUtils.objectToString(segmentReloadStatusValue);
+      } finally {
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          tableDataManager.releaseSegment(segmentDataManager);
+        }
+      }
+    } else {
+      SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+      if (segmentDataManager == null) {
+        throw new WebApplicationException("Segment: " + segmentName + " is not found", Response.Status.NOT_FOUND);

Review Comment:
   I feel returning `0/0` might be preferred so that we can differentiate server not responding vs server not having the segment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931822468


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class ControllerJobStatusResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Path("/controllerJob/reloadStatus/{tableNameWithType}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given task")
+  public String taskStatus(@PathParam("tableNameWithType") String tableNameWithType,
+      @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp,
+      @QueryParam("segmentName") String segmentName)
+      throws Exception {
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType);
+
+    if (segmentName == null) {
+      // All segments
+      List<SegmentDataManager> allSegments = tableDataManager.acquireAllSegments();
+      try {
+        long successCount = 0;
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          if (segmentDataManager.getLoadTimeMs() >= reloadJobSubmissionTimestamp) {
+            successCount++;
+          }
+        }
+        SegmentReloadStatusValue segmentReloadStatusValue =
+            new SegmentReloadStatusValue(allSegments.size(), successCount);
+        return JsonUtils.objectToString(segmentReloadStatusValue);
+      } finally {
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          tableDataManager.releaseSegment(segmentDataManager);
+        }
+      }
+    } else {
+      SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName);
+      if (segmentDataManager == null) {
+        throw new WebApplicationException("Segment: " + segmentName + " is not found", Response.Status.NOT_FOUND);

Review Comment:
   Changed this to return 0 / 0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931842396


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2367,44 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {
+    try {
+      String response =
+          sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
+              null);
+      String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
+      JsonNode tableLevelDetails =
+          JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
+      String isZKWriteSuccess = tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText();
+
+      if (isZKWriteSuccess.equals("SUCCESS")) {
+        // We can validate reload status API now
+        String jobId = tableLevelDetails.get("reloadMessageId").asText();
+        String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forControllerJobStatus(jobId));
+        JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse);
+
+        // Validate all fields are present
+        assertEquals(jobStatus.get("metadata").get("jobId").asText(), jobId);
+        assertEquals(jobStatus.get("metadata").get("jobType").asText(), "RELOAD_ALL_SEGMENTS");
+        assertEquals(jobStatus.get("metadata").get("tableName").asText(), tableNameWithType);
+      }
+    } catch (Exception e) {
+      Assert.fail("Reload failed :" + e.getMessage());

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931841855


##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -2366,16 +2367,44 @@ public void testJDBCClient()
     Assert.assertTrue(resultSet.getLong(1) > 0);
   }
 
-  private java.sql.Connection getJDBCConnectionFromController(int controllerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromController(int controllerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
 
-  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort) throws Exception {
+  private java.sql.Connection getJDBCConnectionFromBrokers(int controllerPort, int brokerPort)
+      throws Exception {
     PinotDriver pinotDriver = new PinotDriver();
     Properties jdbcProps = new Properties();
     jdbcProps.put(PinotConnection.BROKER_LIST, "localhost:" + brokerPort);
     return pinotDriver.connect("jdbc:pinot://localhost:" + controllerPort, jdbcProps);
   }
+
+  private void reloadOfflineTableAndValidateResponse(String tableName, boolean forceDownload) {

Review Comment:
   Ack. I added a new testcase to test success of a reload op



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r899980087


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    Map<String, String> taskZKMetadata = null;
+
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);

Review Comment:
   But even the reload API, returns 1 jobId per tableType for hybrid table. I think expecting tableNameWithType for this API makes sense. Have changed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1159089240

   Please change the description. This PR does much more than just get reload status. Earlier design for reload was to "fire and forget" (send a reload helix message and let servers handle it as they please). This one (if I understand right) changes the reload command to start a helix task. 
   
   have you considered what happens when controller  is upgraded to the new version but the servers are running old version, and the reload command is issued?  Please evaluate and add that to the PR description, and also mark this PR for release notes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] npawar commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
npawar commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r899689040


##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -55,6 +55,7 @@ private ZKMetadataProvider() {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ZKMetadataProvider.class);
   private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = "tenantIsolationEnabled";
+  private static final String PROPERTYSTORE_TASKS_PREFIX = "/TASKS";

Review Comment:
   could we name this something other than TASKS? Task is already a concept in helix task framework and as a result in minion, so I can see this getting very confusing.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")

Review Comment:
   s/tableName/tableNameWithType ?



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/task/TaskType.java:
##########
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metadata.task;
+
+public enum TaskType {

Review Comment:
   same here, there's already a string association between TaskType and minion tasks like MergeRollup, R2O etc.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    Map<String, String> taskZKMetadata = null;
+
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);

Review Comment:
   in case of a hybrid table, these 2 lines would run twice and always be redundant for one of the tables. Curious, why you chose to not just have tableNameWithType in the params?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    Map<String, String> taskZKMetadata = null;
+
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      instances.addAll(serverToSegments.keySet());
+
+      if (taskZKMetadata == null) {
+        taskZKMetadata = _pinotHelixResourceManager.getTaskZKMetadata(tableNameWithType, reloadTaskId);
+      }
+    }
+
+    BiMap<String, String> serverEndPoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(instances);
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/task/status/" + reloadTaskId;
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =

Review Comment:
   this will also happen for an extra set of servers in hybrid case?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1975,6 +1977,105 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getTaskZKMetadata(String tableNameWithType, String taskId) {
+    String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(taskResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllTasksForTable(String tableNameWithType) {
+    String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableTaskRecord = _propertyStore.get(taskResourcePath, null, -1);
+      return tableTaskRecord.getMapFields();
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public void addNewReloadSegmentTask(String tableNameWithType, String segmentName, String taskId,
+      int numberOfMessagesSent) {
+    Map<String, String> taskMetadata = new HashMap<>();
+    taskMetadata.put(CommonConstants.Task.TASK_ID, taskId);
+    taskMetadata.put(CommonConstants.Task.TASK_TYPE, TaskType.RELOAD_SEGMENT.toString());
+    taskMetadata.put(CommonConstants.Task.TASK_SUBMISSION_TIME, Long.toString(System.currentTimeMillis()));
+    taskMetadata.put(CommonConstants.Task.TASK_MESSAGE_COUNT, Integer.toString(numberOfMessagesSent));
+    taskMetadata.put(CommonConstants.Task.SEGMENT_RELOAD_TASK_SEGMENT_NAME, segmentName);
+
+    String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    ZNRecord tableTaskZnRecord;
+
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      tableTaskZnRecord = _propertyStore.get(taskResourcePath, null, -1);
+      Map<String, Map<String, String>> tasks = tableTaskZnRecord.getMapFields();
+      tasks.put(taskId, taskMetadata);
+      if (tasks.size() > CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) {
+        tasks = tasks.
+            entrySet()
+            .stream()
+            .sorted(new Comparator<Map.Entry<String, Map<String, String>>>() {
+          @Override
+          public int compare(Map.Entry<String, Map<String, String>> v1, Map.Entry<String, Map<String, String>> v2) {
+            return Long.compare(Long.parseLong(v2.getValue().get(CommonConstants.Task.TASK_SUBMISSION_TIME)),
+                Long.parseLong(v1.getValue().get(CommonConstants.Task.TASK_SUBMISSION_TIME)));
+          }
+        })
+            .collect(Collectors.toList())
+            .subList(0, CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK)
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      }
+      tableTaskZnRecord.setMapFields(tasks);
+    } else {
+      tableTaskZnRecord = new ZNRecord(taskResourcePath);
+      tableTaskZnRecord.setMapField(taskId, taskMetadata);
+    }
+
+    _propertyStore.set(taskResourcePath, tableTaskZnRecord, AccessOption.PERSISTENT);
+  }
+
+  public void addNewReloadAllSegmentsTask(String tableNameWithType, String taskId, int numberOfMessagesSent) {
+    Map<String, String> taskMetadata = new HashMap<>();
+    taskMetadata.put(CommonConstants.Task.TASK_ID, taskId);
+    taskMetadata.put(CommonConstants.Task.TASK_TYPE, TaskType.RELOAD_ALL_SEGMENTS.toString());
+    taskMetadata.put(CommonConstants.Task.TASK_SUBMISSION_TIME, Long.toString(System.currentTimeMillis()));
+    taskMetadata.put(CommonConstants.Task.TASK_MESSAGE_COUNT, Integer.toString(numberOfMessagesSent));
+
+    String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    ZNRecord tableTaskZnRecord;
+
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      tableTaskZnRecord = _propertyStore.get(taskResourcePath, null, -1);
+      Map<String, Map<String, String>> tasks = tableTaskZnRecord.getMapFields();
+      tasks.put(taskId, taskMetadata);
+      if (tasks.size() > CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) {
+        tasks = tasks.

Review Comment:
   extract this block into a common util to share across this and the above method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1161322942

   @mcvsubbu @npawar @Jackie-Jiang I've updated the description with the design doc link. Please do review 👍 
   https://docs.google.com/document/d/1Eqn2FDDIhCr8G2JFlifs5FjT0LsVPfpPTpdJIJvorwI/edit?usp=sharing


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang merged pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang merged PR #8828:
URL: https://github.com/apache/pinot/pull/8828


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r931859062


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +599,99 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
+          Status.NOT_FOUND);
+    }
+
+    String tableNameWithType =
+        controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE)
+        .equals(ControllerJobType.RELOAD_SEGMENT.toString())) {
+      // No need to query servers where this segment is not supposed to be hosted
+      serverToSegments = serverToSegments.entrySet().stream().filter(kv -> kv.getValue()
+              .contains(controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME)))
+          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint =
+          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp="
+              + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME);
+      if (singleSegmentName != null) {
+        reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName;
+      }
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse =
+        new ServerReloadControllerJobStatusResponse();
+    serverReloadControllerJobStatusResponse.setSuccessCount(0);
+    serverReloadControllerJobStatusResponse.setTotalSegmentCount(
+        _pinotHelixResourceManager.getSegmentsCount(tableNameWithType));

Review Comment:
   Realised the issue here. Fixed now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930647944


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +598,98 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
+    String tableNameWithType = TableNameBuilder.forType(tableTypeFromRequest).tableNameWithType(tableName);
+    Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to find controller job: " + reloadJobId + " for table: " + tableNameWithType, Status.NOT_FOUND);
+    }
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE).equals(
+        ControllerJobType.RELOAD_SEGMENT.toString())) {
+      singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();

Review Comment:
   The other part makes sense. I've
   1) Used ideal state to calculate the total number of segments for the response
   2) If its a single segment reload job, I've filtered server endpoints to only those (the one) that hosts the segment. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r930665449


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = "pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r928533221


##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -109,6 +110,10 @@ public static String constructPropertyStorePathForInstancePartitions(String inst
     return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, instancePartitionsName);
   }
 
+  public static String constructPropertyStorePathForControllerJob(String resourceName) {
+    return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX, resourceName);

Review Comment:
   Ack. I suppose since the tableName is part of the meta being stored itself and the jobId is guaranteed to be unique, we need not have to keep tableName as a key? I've flattened it out



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r899769988


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    Map<String, String> taskZKMetadata = null;
+
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);

Review Comment:
   The reload API accepts tableName and type is optional. Hence thought of keeping it same.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r899978988


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")

Review Comment:
   ACK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r905719983


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerReloadControllerJobStatusResponse.java:
##########
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.util.Map;
+
+public class ServerReloadControllerJobStatusResponse {
+  private double _timeElapsedInMinutes;
+  private double _estimatedTimeRemainingInMinutes;
+  private int _totalSegmentCount;
+  private int _successCount;
+  private int _totalServersQueried;
+  private int _totalServerCallsFailed;
+  private Map<String, String> _taskMetadata;

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on PR #8828:
URL: https://github.com/apache/pinot/pull/8828#issuecomment-1165194291

   > The changed approach is clever and very light weight. It's just a little odd to think of this as "status of the exact reload id" because there's no correlation anymore between the exact fired reload and the status we're returning. Are there any other operations that would set that load time? Like a resetSegment? (I think not, but just confirming). Or any other gotchas that need to be called out?
   
   @npawar Yes and that's a idea with this change. Scenarios like server restarting in the middle of a reload op, a rebalance, etc can lead to this. Earlier with the tight coupling of task id -> status map at the server, there were issues with server restarts etc leading to missing status data at the server. With this approach, success of a reload op has simply been defined as the load time being > reload job submission time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r894830884


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(

Review Comment:
   Yes. We may put some default retention to keep the reload info (e.g. 1 day). I don't think we need a periodic task to clean up the ZK node. Instead, for each reload call, when we add new entry, we can clean up the expired ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r896971986


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2159,7 +2159,7 @@ public void refreshSegment(String tableNameWithType, SegmentMetadata segmentMeta
     sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
   }
 
-  public int reloadAllSegments(String tableNameWithType, boolean forceDownload) {
+  public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean forceDownload) {

Review Comment:
   Ack. I'm planning to add it post getting a general agreement on the approach with all segments APIs first. Is that okay?



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -245,7 +245,7 @@ public void reloadSegment(String tableNameWithType, String segmentName, boolean
   }
 
   @Override
-  public void reloadAllSegments(String tableNameWithType, boolean forceDownload,
+  public void reloadAllSegments(String taskId, String tableNameWithType, boolean forceDownload,

Review Comment:
   Ack. I'm planning to add it post getting a general agreement on the approach with all segments APIs first. Is that okay?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: (WIP) Add server API for task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r896414321


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = "Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      instances.addAll(serverToSegments.keySet());
+    }
+
+    BiMap<String, String> serverEndPoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(instances);
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/task/status/" + reloadTaskId;
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 1000);
+
+    ServerReloadTaskStatusResponse serverReloadTaskStatusResponse = new ServerReloadTaskStatusResponse();
+    serverReloadTaskStatusResponse.setSuccessCount(0);
+    serverReloadTaskStatusResponse.setTotalSegmentCount(0);
+    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
+      ServerReloadTaskStatusResponse response =
+          JsonUtils.stringToObject(streamResponse.getValue(), ServerReloadTaskStatusResponse.class);

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r898863942


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -563,16 +646,26 @@ public SuccessResponse reloadAllSegments(
     if (forceDownload && (tableTypeFromTableName == null && tableTypeFromRequest == null)) {
       tableTypeFromRequest = TableType.OFFLINE;
     }
-    List<String> tableNamesWithType = ResourceUtils
-        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER);
-    Map<String, Integer> numMessagesSentPerTable = new LinkedHashMap<>();
+    List<String> tableNamesWithType =
+        ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest,
+            LOGGER);
+    Map<String, Pair<Integer, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
-      int numMsgSent = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
-      numMessagesSentPerTable.put(tableNameWithType, numMsgSent);
+      Pair<Integer, String> msgInfo = _pinotHelixResourceManager.reloadAllSegments(tableNameWithType, forceDownload);
+      perTableMsgData.put(tableNameWithType, msgInfo);
+      // Store in ZK
+      try {
+        _pinotHelixResourceManager.addNewReloadAllSegmentsTask(tableNameWithType, msgInfo.getRight(),
+            msgInfo.getLeft());
+      } catch (Exception e) {
+        LOGGER.error("Failed to store task meta in zookepper ", e);
+      }
     }
-    return new SuccessResponse("Sent " + numMessagesSentPerTable + " reload messages");
+    return new SuccessResponse("Sent " + perTableMsgData + " reload messages");
   }
 
+

Review Comment:
   Ack



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] saurabhd336 commented on a diff in pull request #8828: Add controller API for reload segment task status

Posted by GitBox <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r912613868


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java:
##########
@@ -27,6 +27,15 @@
  */
 public abstract class SegmentDataManager {
   private int _referenceCount = 1;
+  private long _segmentLoadTimeInMillisEpoch = System.currentTimeMillis();

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Add controller API for reload segment task status [pinot]

Posted by "saurabhd336 (via GitHub)" <gi...@apache.org>.
saurabhd336 commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r1359344751


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +599,107 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("segments/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find controller job id: " + reloadJobId,
+          Status.NOT_FOUND);
+    }
+
+    String tableNameWithType =
+        controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    Map<String, List<String>> serverToSegments;
+
+    String singleSegmentName = null;
+    if (controllerJobZKMetadata.get(CommonConstants.ControllerJob.JOB_TYPE)
+        .equals(ControllerJobType.RELOAD_SEGMENT.toString())) {
+      // No need to query servers where this segment is not supposed to be hosted
+      singleSegmentName = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+      serverToSegments = new HashMap<>();
+      List<String> segmentList = Arrays.asList(singleSegmentName);
+      _pinotHelixResourceManager.getServers(tableNameWithType, singleSegmentName).forEach(server -> {
+        serverToSegments.put(server, segmentList);
+      });
+    } else {
+      serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint =
+          endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp="
+              + controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
+      if (singleSegmentName != null) {
+        reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName;
+      }
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 10000);
+
+    ServerReloadControllerJobStatusResponse serverReloadControllerJobStatusResponse =
+        new ServerReloadControllerJobStatusResponse();
+    serverReloadControllerJobStatusResponse.setSuccessCount(0);
+
+    int totalSegments = 0;
+    for (Map.Entry<String, List<String>> entry: serverToSegments.entrySet()) {
+      totalSegments += entry.getValue().size();
+    }
+    serverReloadControllerJobStatusResponse.setTotalSegmentCount(totalSegments);
+    serverReloadControllerJobStatusResponse.setTotalServersQueried(serverUrls.size());
+    serverReloadControllerJobStatusResponse.setTotalServerCallsFailed(serviceResponse._failedResponseCount);
+
+    for (Map.Entry<String, String> streamResponse : serviceResponse._httpResponses.entrySet()) {
+      String responseString = streamResponse.getValue();
+      try {
+        ServerReloadControllerJobStatusResponse response =
+            JsonUtils.stringToObject(responseString, ServerReloadControllerJobStatusResponse.class);

Review Comment:
   Agreed that this is risky assumption. We can change this to `SegmentReloadStatusValue.class`
   But, I think since the agreed upon upgrade sequence is controller -> broker -> servers, if new fields are added to `SegmentReloadStatusValue`, in the controller we'd always need to ensure the newer fields are null checked before accessing. Since during upgrade server can be on older version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org