You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/12/08 21:38:15 UTC

[GitHub] [incubator-pinot] npawar opened a new pull request #6336: Segment reset API

npawar opened a new pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336


   
   Adding a reset API. This API will disable and then enable the segment. This API will be useful in case of resetting consumers which are stuck as reported in https://github.com/apache/incubator-pinot/issues/6308.
   **Note: Adding test pending**
   
   1. If the segment is in ERROR state, invoking this API will send state transitions first to OFFLINE, and then back to ONLINE/CONSUMING.
   2. If segment is ONLINE/CONSUMING, invoking this API will send state transitions, first to OFFLINE and then back to ONLINE/CONSUMING.
   
   Reset one segment:
   ```
    curl -X POST "http://localhost:9000/segments/transcript_REALTIME/transcript__1__3__20201208T1956Z/reset" -H "accept: application/json"
   {"status":"Successfully invoked segment reset"}
   ``` 
   Reset all segments:
   ```
   curl -X POST "http://localhost:9000/segments/transcript_REALTIME/reset" -H "accept: application/json" 
   {"status":"Successfully invoked segment reset"}
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kishoreg commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
kishoreg commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r539588567



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1768,6 +1770,63 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }

Review comment:
       yes, we might have to wait for EV to change/stabilize




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548076840



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);

Review comment:
       Can we keep the log messages consistent? Let us call it a segment instead of partition.  (please check other log messages as well)

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);

Review comment:
       should be 4xx error (unless pinot messed up real bad. :-))

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }

Review comment:
       Please add a thread.sleep here instead of a busy-wait loop. Suggestion:
   `
   Thread.sleep(min(100,maxWaitTimeMillis/10))
   `

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+          externalViewWaitTimeMs > 0 ? externalViewWaitTimeMs
+              : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse("Successfully invoked segment reset");
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segment: %s in table: %s. %s", segmentName, tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    }
+  }
+
+  /**
+   * Resets all segments of the given table
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets all segments of the table, by first disabling them, waiting for external view to stabilize, and finally enabling the segments",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetAllSegments(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetAllSegments(tableNameWithType, externalViewWaitTimeMs > 0 ? externalViewWaitTimeMs
+          : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse("Successfully invoked segment reset");

Review comment:
       better to include the table name in the message. Also, you may want to word it such that it clearly implies that the reset is completed.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {

Review comment:
       All that this parameter does is to override the admin command wait time. Why not call it something like that? We can then add it to any admin command now or later with the same name.
   Suggested:
   ```suggestion
         @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed") @QueryParam("maxWaitTimeMs") long externalViewWaitTimeMs) {
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);

Review comment:
       Dont you have to enable the partition (segment) again after this call?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();

Review comment:
       suggest naming the variables with `segments` instead of `partritions`. We do have two other semantics of partitions that is already confusing (stream partitions, and partitioning of data in segment assignment)

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions

Review comment:
       Not sure if we can start off with no external view at thsi point, since helix will still be processing the reset calls.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);

Review comment:
       Please make sure that this error message shows up on the console or curl command if table type is not given. Sometimes we see that the precondition check error message does not show up, and we get a 5xx error (this should be a 4xx error)

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");

Review comment:
       should be a 4xx error

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Iterator<Map.Entry<String, Set<String>>> iterator = partitionInstancesToCheck.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+        String partitionToCheck = entryToCheck.getKey();
+        Set<String> instancesToCheck = entryToCheck.getValue();
+        Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(partitionToCheck);
+        if (newExternalViewStateMap == null) {
+          continue;
+        }
+        boolean allOffline = true;
+        for (String instance : instancesToCheck) {
+          if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+            allOffline = false;
+            break;
+          }
+        }
+        if (allOffline) {
+          iterator.remove();
+        }
+      }
+    }
+    if (!partitionInstancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of segments of table: %s",
+          tableNameWithType));
+    }
+
+    // Enable partitions
+    LOGGER.info("Enabling partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, partitions);

Review comment:
       Why do we have an enable here and one in line 1869? Can you clarify again if helix expects two of these in the reset api ?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Iterator<Map.Entry<String, Set<String>>> iterator = partitionInstancesToCheck.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+        String partitionToCheck = entryToCheck.getKey();
+        Set<String> instancesToCheck = entryToCheck.getValue();
+        Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(partitionToCheck);
+        if (newExternalViewStateMap == null) {
+          continue;
+        }
+        boolean allOffline = true;
+        for (String instance : instancesToCheck) {
+          if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+            allOffline = false;
+            break;
+          }
+        }
+        if (allOffline) {
+          iterator.remove();
+        }
+      }
+    }

Review comment:
       Please add a sleep here like in the other case, instead of busy-waiting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548250420



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Iterator<Map.Entry<String, Set<String>>> iterator = partitionInstancesToCheck.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+        String partitionToCheck = entryToCheck.getKey();
+        Set<String> instancesToCheck = entryToCheck.getValue();
+        Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(partitionToCheck);
+        if (newExternalViewStateMap == null) {
+          continue;
+        }
+        boolean allOffline = true;
+        for (String instance : instancesToCheck) {
+          if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+            allOffline = false;
+            break;
+          }
+        }
+        if (allOffline) {
+          iterator.remove();
+        }
+      }
+    }
+    if (!partitionInstancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of segments of table: %s",
+          tableNameWithType));
+    }
+
+    // Enable partitions
+    LOGGER.info("Enabling partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, partitions);

Review comment:
       the one in line 1869 is a disable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r538870662



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1768,6 +1770,63 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);

Review comment:
       Can you document what this API call does (or, is supposed to do)?  Can it throw some exceptions that we need to catch (or not catch)?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1768,6 +1770,63 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);

Review comment:
       Dont we need to wait until externalview stabiliizes before calling enable again?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1768,6 +1770,63 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }

Review comment:
       I remember we used to have this disable/enable (aka toggle)  in segments API before, we had a lot of problems with it. We used it for refreshing segments, and then we moved to use the refresh helix message instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r539584954



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1768,6 +1770,63 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }

Review comment:
       the problem with using SegmentRefreshMessage is that there's no way to bring about a change of state in the External View with that. A consuming segment in ERROR state will remain in ERROR state even after refreshed, unless we go through disable/enable partition




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548249903



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions

Review comment:
       Not sure what you mean. This is just a safeguard against table being deleted in between




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548253767



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {

Review comment:
       This is not a generic max wait time, but a wait time for external view to stabilize. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#issuecomment-741130249


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=h1) Report
   > Merging [#6336](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=desc) (d4952f9) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `1.35%`.
   > The diff coverage is `56.80%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6336      +/-   ##
   ==========================================
   - Coverage   66.44%   65.09%   -1.36%     
   ==========================================
     Files        1075     1298     +223     
     Lines       54773    62883    +8110     
     Branches     8168     9142     +974     
   ==========================================
   + Hits        36396    40931    +4535     
   - Misses      15700    19030    +3330     
   - Partials     2677     2922     +245     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `65.09% <56.80%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `66.66% <0.00%> (+9.52%)` | :arrow_up: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [...common/config/tuner/NoOpTableTableConfigTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL05vT3BUYWJsZVRhYmxlQ29uZmlnVHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ot/common/config/tuner/RealTimeAutoIndexTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1JlYWxUaW1lQXV0b0luZGV4VHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1151 more](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=footer). Last update [b8bc74f...d4952f9](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io commented on pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#issuecomment-741130249


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=h1) Report
   > Merging [#6336](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=desc) (2855944) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `1.13%`.
   > The diff coverage is `45.12%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6336      +/-   ##
   ==========================================
   - Coverage   66.44%   65.31%   -1.14%     
   ==========================================
     Files        1075     1283     +208     
     Lines       54773    62034    +7261     
     Branches     8168     9011     +843     
   ==========================================
   + Hits        36396    40515    +4119     
   - Misses      15700    18628    +2928     
   - Partials     2677     2891     +214     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `65.31% <45.12%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `66.66% <0.00%> (+9.52%)` | :arrow_up: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1131 more](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=footer). Last update [41a3fc4...2855944](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r549536070



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions

Review comment:
       Empty EV will go through this call. But null EV will not. I dont think we should try to support that. I'm not sure what null EV would mean for making reset/disable calls.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r549535850



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);

Review comment:
       It does show up.
   ```
   {
     "code": 404,
     "error": "Failed to reset segments in table: foo_REALTIME. Could not find ideal state for table: foo_REALTIME"
   }
   ```
   
   Also, all places which you've commented for 4xx are already throwing 4xx. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548313544



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +354,61 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again",
+      notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Time in millis to wait for external view to converge") @QueryParam("externalViewWaitTimeMs") long externalViewWaitTimeMs) {

Review comment:
       I am trying to avoid documenting what external view means. We know it as pinot devs, but probably better to minimize exposure of this to basic beginner users (and I think that is who we are targeting here). For all practical purposes, the ev wait will really translate to the total admin command wait time, so a re-use of that same term will save us some explanation -- is my thought.  I am flexible on this

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions

Review comment:
       I see, yes that is good.
   
   My question was, will we ever get into this with an empty EV in a _valid_ case. So, let us say, a beginner starts running pinot, has things badly configured so that EV never appears. Now, they correct the config and click on 'reset'. Is this a valid scenario? Do we not want to support that scenario if it is valid?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();

Review comment:
       Yes, when we make calls to helix we have to use partition. But within pinot we always refer to a segment as a segment . It will really reduce confusion if you keep the `segment` term when we refer to that.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);

Review comment:
       My bad. I did not see the 'false' argument there. (God, I wish helix went a little extra inch to provide a disablePartition api :-)

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> disableInstanceToPartitionsMap = new HashMap<>();
+    Map<String, Set<String>> partitionInstancesToCheck = new HashMap<>();
+
+    for (String partition : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(partition);
+      Map<String, String> externalViewStateMap = externalView.getStateMap(partition);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          resetInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        } else {
+          disableInstanceToPartitionsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(partition);
+        }
+      }
+      partitionInstancesToCheck.put(partition, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the partitions
+    LOGGER.info("Disabling/resetting partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partitions of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!partitionInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Iterator<Map.Entry<String, Set<String>>> iterator = partitionInstancesToCheck.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+        String partitionToCheck = entryToCheck.getKey();
+        Set<String> instancesToCheck = entryToCheck.getValue();
+        Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(partitionToCheck);
+        if (newExternalViewStateMap == null) {
+          continue;
+        }
+        boolean allOffline = true;
+        for (String instance : instancesToCheck) {
+          if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+            allOffline = false;
+            break;
+          }
+        }
+        if (allOffline) {
+          iterator.remove();
+        }
+      }
+    }
+    if (!partitionInstancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of segments of table: %s",
+          tableNameWithType));
+    }
+
+    // Enable partitions
+    LOGGER.info("Enabling partitions of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : resetInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, partitions);
+    }
+    for (Map.Entry<String, Set<String>> entry : disableInstanceToPartitionsMap.entrySet()) {
+      ArrayList<String> partitions = Lists.newArrayList(entry.getValue());
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, partitions);

Review comment:
       Agreed. My bad.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548252132



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);

Review comment:
       Did not follow your comment. That is exactly what's happening few lines below after the external view stabilizes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kishoreg commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
kishoreg commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r549953735



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +355,67 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again", notes = "Resets a segment by disabling and then enabling the segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+          maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse(
+          String.format("Successfully reset segment: %s of table: %s", segmentName, tableNameWithType));
+    } catch (IllegalStateException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segment: %s of table: %s. %s", segmentName, tableNameWithType, e.getMessage()),
+          Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  /**
+   * Resets all segments of the given table
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets all segments of the table, by first disabling them, waiting for external view to stabilize, and finally enabling the segments", notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetAllSegments(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetAllSegments(tableNameWithType,

Review comment:
       what is the exception thrown if it takes longer for whatever reason?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +355,67 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again", notes = "Resets a segment by disabling and then enabling the segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+          maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse(
+          String.format("Successfully reset segment: %s of table: %s", segmentName, tableNameWithType));
+    } catch (IllegalStateException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segment: %s of table: %s. %s", segmentName, tableNameWithType, e.getMessage()),
+          Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  /**
+   * Resets all segments of the given table
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets all segments of the table, by first disabling them, waiting for external view to stabilize, and finally enabling the segments", notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetAllSegments(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetAllSegments(tableNameWithType,
+          maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse(String.format("Successfully reset all segments of table: %s", tableNameWithType));
+    } catch (IllegalStateException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    } catch (Exception e) {

Review comment:
       handle timeout exception separately and lets make sure we provide the right message that it timed out but it might get completed in the background?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#issuecomment-741130249


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=h1) Report
   > Merging [#6336](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=desc) (415a6c7) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **increase** coverage by `6.89%`.
   > The diff coverage is `73.16%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6336      +/-   ##
   ==========================================
   + Coverage   66.44%   73.34%   +6.89%     
   ==========================================
     Files        1075     1318     +243     
     Lines       54773    64104    +9331     
     Branches     8168     9329    +1161     
   ==========================================
   + Hits        36396    47019   +10623     
   + Misses      15700    14032    -1668     
   - Partials     2677     3053     +376     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `44.28% <38.69%> (?)` | |
   | unittests | `64.99% <56.80%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `44.44% <0.00%> (-4.40%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `78.57% <ø> (+5.40%)` | :arrow_up: |
   | [...common/config/tuner/NoOpTableTableConfigTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL05vT3BUYWJsZVRhYmxlQ29uZmlnVHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ot/common/config/tuner/RealTimeAutoIndexTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1JlYWxUaW1lQXV0b0luZGV4VHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../common/config/tuner/TableConfigTunerRegistry.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1RhYmxlQ29uZmlnVHVuZXJSZWdpc3RyeS5qYXZh) | `72.00% <ø> (ø)` | |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1122 more](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=footer). Last update [b8bc74f...415a6c7](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#issuecomment-741130249


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=h1) Report
   > Merging [#6336](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=desc) (d4952f9) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **increase** coverage by `7.30%`.
   > The diff coverage is `73.16%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6336      +/-   ##
   ==========================================
   + Coverage   66.44%   73.75%   +7.30%     
   ==========================================
     Files        1075     1298     +223     
     Lines       54773    62883    +8110     
     Branches     8168     9142     +974     
   ==========================================
   + Hits        36396    46381    +9985     
   + Misses      15700    13489    -2211     
   - Partials     2677     3013     +336     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `45.03% <38.69%> (?)` | |
   | unittests | `65.09% <56.80%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `44.44% <0.00%> (-4.40%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `78.57% <ø> (+5.40%)` | :arrow_up: |
   | [...common/config/tuner/NoOpTableTableConfigTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL05vT3BUYWJsZVRhYmxlQ29uZmlnVHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ot/common/config/tuner/RealTimeAutoIndexTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1JlYWxUaW1lQXV0b0luZGV4VHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../common/config/tuner/TableConfigTunerRegistry.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1RhYmxlQ29uZmlnVHVuZXJSZWdpc3RyeS5qYXZh) | `72.00% <ø> (ø)` | |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1097 more](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=footer). Last update [b8bc74f...d4952f9](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#issuecomment-741130249


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=h1) Report
   > Merging [#6336](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=desc) (7b42d94) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `22.00%`.
   > The diff coverage is `38.60%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #6336       +/-   ##
   ===========================================
   - Coverage   66.44%   44.44%   -22.01%     
   ===========================================
     Files        1075     1314      +239     
     Lines       54773    63760     +8987     
     Branches     8168     9286     +1118     
   ===========================================
   - Hits        36396    28338     -8058     
   - Misses      15700    33064    +17364     
   + Partials     2677     2358      -319     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `44.44% <38.60%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...org/apache/pinot/broker/queryquota/HitCounter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9IaXRDb3VudGVyLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/pinot/broker/queryquota/MaxHitRateTracker.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9NYXhIaXRSYXRlVHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/broker/queryquota/QueryQuotaEntity.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9RdWVyeVF1b3RhRW50aXR5LmphdmE=) | `0.00% <0.00%> (-50.00%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ceselector/StrictReplicaGroupInstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL1N0cmljdFJlcGxpY2FHcm91cEluc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/TimeSegmentPruner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL1RpbWVTZWdtZW50UHJ1bmVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/interval/Interval.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...r/routing/segmentpruner/interval/IntervalTree.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsVHJlZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | ... and [1317 more](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=footer). Last update [b8bc74f...7b42d94](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#issuecomment-741130249


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=h1) Report
   > Merging [#6336](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=desc) (415a6c7) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `1.45%`.
   > The diff coverage is `56.80%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6336      +/-   ##
   ==========================================
   - Coverage   66.44%   64.99%   -1.46%     
   ==========================================
     Files        1075     1318     +243     
     Lines       54773    64104    +9331     
     Branches     8168     9329    +1161     
   ==========================================
   + Hits        36396    41662    +5266     
   - Misses      15700    19474    +3774     
   - Partials     2677     2968     +291     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `64.99% <56.80%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `66.66% <0.00%> (+9.52%)` | :arrow_up: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [...common/config/tuner/NoOpTableTableConfigTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL05vT3BUYWJsZVRhYmxlQ29uZmlnVHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ot/common/config/tuner/RealTimeAutoIndexTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1JlYWxUaW1lQXV0b0luZGV4VHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1177 more](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=footer). Last update [b8bc74f...415a6c7](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar merged pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar merged pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r550008061



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +355,67 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again", notes = "Resets a segment by disabling and then enabling the segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+          maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse(
+          String.format("Successfully reset segment: %s of table: %s", segmentName, tableNameWithType));
+    } catch (IllegalStateException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segment: %s of table: %s. %s", segmentName, tableNameWithType, e.getMessage()),
+          Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  /**
+   * Resets all segments of the given table
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets all segments of the table, by first disabling them, waiting for external view to stabilize, and finally enabling the segments", notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetAllSegments(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetAllSegments(tableNameWithType,

Review comment:
       `Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. Skipping enable of segment`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#issuecomment-741130249


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=h1) Report
   > Merging [#6336](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=desc) (b85aec7) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `21.95%`.
   > The diff coverage is `38.69%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6336/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #6336       +/-   ##
   ===========================================
   - Coverage   66.44%   44.49%   -21.96%     
   ===========================================
     Files        1075     1314      +239     
     Lines       54773    63754     +8981     
     Branches     8168     9286     +1118     
   ===========================================
   - Hits        36396    28366     -8030     
   - Misses      15700    33026    +17326     
   + Partials     2677     2362      -315     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `44.49% <38.69%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...org/apache/pinot/broker/queryquota/HitCounter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9IaXRDb3VudGVyLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/pinot/broker/queryquota/MaxHitRateTracker.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9NYXhIaXRSYXRlVHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/broker/queryquota/QueryQuotaEntity.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9RdWVyeVF1b3RhRW50aXR5LmphdmE=) | `0.00% <0.00%> (-50.00%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ceselector/StrictReplicaGroupInstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL1N0cmljdFJlcGxpY2FHcm91cEluc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/TimeSegmentPruner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL1RpbWVTZWdtZW50UHJ1bmVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/interval/Interval.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...r/routing/segmentpruner/interval/IntervalTree.java](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsVHJlZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | ... and [1314 more](https://codecov.io/gh/apache/incubator-pinot/pull/6336/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=footer). Last update [b8bc74f...7b42d94](https://codecov.io/gh/apache/incubator-pinot/pull/6336?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r550005832



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
##########
@@ -355,6 +355,67 @@ public SuccessResponse reloadSegment(
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again", notes = "Resets a segment by disabling and then enabling the segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+          maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse(
+          String.format("Successfully reset segment: %s of table: %s", segmentName, tableNameWithType));
+    } catch (IllegalStateException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segment: %s of table: %s. %s", segmentName, tableNameWithType, e.getMessage()),
+          Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  /**
+   * Resets all segments of the given table
+   * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets all segments of the table, by first disabling them, waiting for external view to stabilize, and finally enabling the segments", notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetAllSegments(
+      @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetAllSegments(tableNameWithType,
+          maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse(String.format("Successfully reset all segments of table: %s", tableNameWithType));
+    } catch (IllegalStateException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    } catch (Exception e) {

Review comment:
       In case of timeout, the message will be `Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. Skipping enable of segment.`. 
   The way it is handled right now, we will return the message from the exception. And for timeout, generally 5xx response code is recommended, hence kept it with the general exception which is also 5xx.
   
   The task will not get completed if there's a timeout. We will skip the enabling (as indicated in the message). In such a case, user would have to invoke API again with increased timeout.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r539586744



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1768,6 +1770,63 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }

Review comment:
       Oh I know that. I am not saying that we use a message here. I am just pointing out that we had problems with the toggle API before. I forget the details, but it probably had to do with not waiting until EV stabilized




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] npawar commented on a change in pull request #6336: Segment reset API

Posted by GitBox <gi...@apache.org>.
npawar commented on a change in pull request #6336:
URL: https://github.com/apache/incubator-pinot/pull/6336#discussion_r548249157



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -1777,6 +1779,143 @@ public int reloadSegment(String tableNameWithType, String segmentName) {
     return numMessagesSent;
   }
 
+  /**
+   * Resets a segment by disabling and then enabling the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName);
+    List<String> partitions = Lists.newArrayList(segmentName);
+
+    // First, disable or reset partition
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Resetting partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, partitions);
+      } else {
+        LOGGER.info("Disabling partition: {} of table: {}", segmentName, tableNameWithType);
+        _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, partitions);
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of partition: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new IllegalStateException(String.format(
+          "Timed out waiting for external view to stabilize after disable/reset call. Skipping enable of partition: %s of table: %s",
+          segmentName, tableNameWithType));
+    }
+
+    // Enable partition
+    LOGGER.info("Enabling partition: {} of table: {}", segmentName, tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, partitions);
+    }
+  }
+
+  /**
+   * Resets all segments of a table by disabling and then enabling the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> resetInstanceToPartitionsMap = new HashMap<>();

Review comment:
       I want to keep it partitions because the calls being made are `resetPartition` and `enablePartition`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org