You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/10/07 00:04:40 UTC

[GitHub] [pinot] npawar commented on a diff in pull request #9515: Exposing consumer's record lag in /consumingSegmentsInfo

npawar commented on code in PR #9515:
URL: https://github.com/apache/pinot/pull/9515#discussion_r989567029


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -819,6 +823,24 @@ public long getLastConsumedTimestamp() {
     return _lastLogTime;
   }
 
+  @Override
+  public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
+    String partitionGroupId = String.valueOf(_partitionGroupId);
+    return Collections.singletonMap(partitionGroupId, new ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
+        getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000)));
+  }
+
+  @Override
+  public Map<String, PartitionLagState> getPartitionToLagState(List<ConsumerPartitionState> consumerPartitionState) {

Review Comment:
   could we keep it simple and just keep param as Map<String, ConsumerPartitionState> ?



##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java:
##########
@@ -32,15 +32,21 @@ public class SegmentConsumerInfo {
   private final String _consumerState;
   private final long _lastConsumedTimestamp;
   private final Map<String, String> _partitionToOffsetMap;
+  private final Map<String, String> _partitionToUpstreamLatestMap;
+  private final Map<String, String> _partitionToOffsetLag;
 
   public SegmentConsumerInfo(@JsonProperty("segmentName") String segmentName,
       @JsonProperty("consumerState") String consumerState,
       @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
-      @JsonProperty("partitionToOffsetMap") Map<String, String> partitionToOffsetMap) {
+      @JsonProperty("partitionToOffsetMap") Map<String, String> partitionToOffsetMap,
+      @JsonProperty("partitionToUpstreamLatestMap") Map<String, String> partitionToUpstreamLatestMap,
+      @JsonProperty("partitionToOffsetLag") Map<String, String> partitionToOffsetLagMap) {
     _segmentName = segmentName;
     _consumerState = consumerState;
     _lastConsumedTimestamp = lastConsumedTimestamp;
     _partitionToOffsetMap = partitionToOffsetMap;
+    _partitionToUpstreamLatestMap = partitionToUpstreamLatestMap;

Review Comment:
   rename `_partitionToUpstreamLatestMap` to `_partitionToUpstreamLatestOffsetMap` ?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java:
##########
@@ -205,15 +205,19 @@ static public class ConsumingSegmentInfo {
     public long _lastConsumedTimestamp;
     @JsonProperty("partitionToOffsetMap")
     public Map<String, String> _partitionToOffsetMap;
+    @JsonProperty("partitionToOffsetLagMap")
+    public Map<String, String> _partitionToOffsetLagMap;

Review Comment:
   it would be helpful here to also return the upstreamLatestOffset. Just offset lag might sometimes not be too indicative given they might not be contiguous? 
   Another thought, instead of so many partitionTo* maps now, we could also have a `Map<String, ConsumingSegmentInfoReader.PartitionOffsetInfo> _partitionToOffsetInfo` where this value object could hold current consumer offset, latest stream offset, and lag



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -134,6 +150,35 @@ public Response getConsumptionStatus(
     }
   }
 
+  @GET
+  @Path("/tables/{tableName}/consumingSegmentsInfo")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Returns state of consuming segments", notes = "Gets the status of consumers from all servers")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap getConsumingSegmentsInfo2(

Review Comment:
   drop the "2" from the end of this method name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org