You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/11/15 00:44:30 UTC

[GitHub] [pinot] navina opened a new pull request, #9800: Adding a consumer lag as metric via a periodic task in controller

navina opened a new pull request, #9800:
URL: https://github.com/apache/pinot/pull/9800

   ## Description
   
   This PR enables Pinot to publish consumer lag as a metric for realtime tables. It is emitted via a periodic task in the controller that will periodically query `/consumingSegmentsInfo` API and record the max consuming lag among the partition's replicas. 
   Currently, it publishes the following metrics:
   - `MAX_RECORDS_LAG`
   - `MAX_AVAILABILITY_LAG_MS`
   
   
   The task can be configured to run at a given frequency so as to not overwhelm the server (and through that, not to overwhelm the data source). 
   
   Labels: `observability`
   
   ## Release Notes
   * Consumer lag for realtime tables can be monitored using metrics `MAX_RECORDS_LAG` and `MAX_AVAILABILITY_LAG_MS` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
navina commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1319109856

   Summarizing the discussion (or re-discussion) with @mayankshriv  / @snleee / @npawar :
   
   1. We all agree that periodic tasks on controller is not built for continuously / frequently run jobs. Moreover, this job will increase intra-cluster traffic and can have a negative impact on performance of Pinot components and, possibly even the upstream source (eg. in Kafka). 
   2. We all agree that this is not the best approach for emitting consumer lag metrics. It will be better to emit metrics on the server side and aggregate in the monitoring layer. Aggregating in the monitoring the layer and defining the alerting rules has its challenges, esp. during ongoing cluster operations. This has been a challenge in the past with other server-side metrics like `LLC_PARTITION_CONSUMING` . 
   
   Here is the plan of action:
   1. Let's leave this periodic task as an "opt-in" task in the controller. I will add a controller config that will define whether to enable this task or not. By default, it will be turned off.
   2. I will take a stab at adding the lag metric from the server side and create a follow-up PR. 
   
   I would like to keep both options open for use in production so that we can observe how these metrics/handlers workout under various scenarios. 
   
   @mcvsubbu if Linkedin is also working on the lag metrics, can you please share the design and the timeline for this ? I want to make sure design aligns and works with existing OSS apis. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9800:
URL: https://github.com/apache/pinot/pull/9800#discussion_r1023558425


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeConsumerMonitor extends ControllerPeriodicTask<RealtimeConsumerMonitor.Context> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
+  private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;
+
+  @VisibleForTesting
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ConsumingSegmentInfoReader consumingSegmentInfoReader) {
+    super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(),
+        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _consumingSegmentInfoReader = consumingSegmentInfoReader;
+  }
+
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
+    this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics,
+        new ConsumingSegmentInfoReader(executorService, new SimpleHttpConnectionManager(), pinotHelixResourceManager));
+  }
+
+  @Override
+  protected void setUpTask() {
+    LOGGER.info("Setting up RealtimeConsumerMonitor task");
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    if (!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType))) {
+      return;
+    }
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap =
+          _consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 10000);
+      Map<String, List<Long>> partitionToLagSet = new HashMap<>();
+      Map<String, List<Long>> partitionToAvailabilityLagSet = new HashMap<>();
+
+      for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info
+          : segmentsInfoMap._segmentToConsumingInfoMap.values()) {
+        info.forEach(segment -> {
+          segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> {
+            if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+              try {
+                long recordsLag = Long.parseLong(v);
+                partitionToLagSet.computeIfAbsent(k, k1 -> new ArrayList<>());
+                partitionToLagSet.get(k).add(recordsLag);
+              } catch (NumberFormatException nfe) {
+                // skip this as we are unable to parse the lag string

Review Comment:
   What can be the scenario of this? I guess that we don't want to log here due to too many lines?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeConsumerMonitor extends ControllerPeriodicTask<RealtimeConsumerMonitor.Context> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
+  private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;
+
+  @VisibleForTesting
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ConsumingSegmentInfoReader consumingSegmentInfoReader) {
+    super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(),
+        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _consumingSegmentInfoReader = consumingSegmentInfoReader;
+  }
+
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
+    this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics,
+        new ConsumingSegmentInfoReader(executorService, new SimpleHttpConnectionManager(), pinotHelixResourceManager));
+  }
+
+  @Override
+  protected void setUpTask() {
+    LOGGER.info("Setting up RealtimeConsumerMonitor task");
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    if (!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType))) {
+      return;
+    }
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap =
+          _consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 10000);

Review Comment:
   It looks that `10000` is to configure the time out. Can we use the `static final int` variable for this? `e.g. DEFAULT_TIMEOUT_MS`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeConsumerMonitor extends ControllerPeriodicTask<RealtimeConsumerMonitor.Context> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
+  private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;
+
+  @VisibleForTesting
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ConsumingSegmentInfoReader consumingSegmentInfoReader) {
+    super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(),
+        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _consumingSegmentInfoReader = consumingSegmentInfoReader;
+  }
+
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
+    this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics,
+        new ConsumingSegmentInfoReader(executorService, new SimpleHttpConnectionManager(), pinotHelixResourceManager));
+  }
+
+  @Override
+  protected void setUpTask() {
+    LOGGER.info("Setting up RealtimeConsumerMonitor task");
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    if (!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType))) {
+      return;
+    }
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap =
+          _consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 10000);
+      Map<String, List<Long>> partitionToLagSet = new HashMap<>();
+      Map<String, List<Long>> partitionToAvailabilityLagSet = new HashMap<>();
+
+      for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info
+          : segmentsInfoMap._segmentToConsumingInfoMap.values()) {
+        info.forEach(segment -> {
+          segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> {
+            if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+              try {
+                long recordsLag = Long.parseLong(v);
+                partitionToLagSet.computeIfAbsent(k, k1 -> new ArrayList<>());

Review Comment:
   I think that we can simplify 2 lines into the following:
   
   ```
   partitionToLagSet.computeIfAbsent(k, k1 -> new ArrayList<>()).add(recordsLag);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1315993390

   > > Moreover, when a table is rebalanced, the consuming segments get moved around. This can lead to prolonged stale value that will mostly cause noise.
   > 
   > When a table is rebalanced, each server can get notified and if they no longer serve a partition, they can remove the corresponding gauge metric.
   
   I believe we do invoke the code to remove a metric each time a partition completes consumption.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #9800:
URL: https://github.com/apache/pinot/pull/9800#discussion_r1024602698


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeConsumerMonitor extends ControllerPeriodicTask<RealtimeConsumerMonitor.Context> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
+  private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;
+
+  @VisibleForTesting
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ConsumingSegmentInfoReader consumingSegmentInfoReader) {
+    super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(),
+        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _consumingSegmentInfoReader = consumingSegmentInfoReader;
+  }
+
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
+    this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics,
+        new ConsumingSegmentInfoReader(executorService, new SimpleHttpConnectionManager(), pinotHelixResourceManager));
+  }
+
+  @Override
+  protected void setUpTask() {
+    LOGGER.info("Setting up RealtimeConsumerMonitor task");
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    if (!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType))) {
+      return;
+    }
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap =
+          _consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 10000);
+      Map<String, List<Long>> partitionToLagSet = new HashMap<>();
+      Map<String, List<Long>> partitionToAvailabilityLagSet = new HashMap<>();
+
+      for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info
+          : segmentsInfoMap._segmentToConsumingInfoMap.values()) {
+        info.forEach(segment -> {
+          segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> {
+            if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+              try {
+                long recordsLag = Long.parseLong(v);
+                partitionToLagSet.computeIfAbsent(k, k1 -> new ArrayList<>());
+                partitionToLagSet.get(k).add(recordsLag);
+              } catch (NumberFormatException nfe) {
+                // skip this as we are unable to parse the lag string

Review Comment:
   There are connectors that don't yet calculate lag or may not have a lag value (for example, kinesis will only have availability lag and not records lag). While the default for any lag value is `NOT_CALCULATED`, it could be anything that the connector plugin returns. Just safe-guarding against invalid long values here. 
   
   I can add a log line here to indicate this exception happened. printing stacktrace will create too many lines. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1315990338

   > We can make this an opt-in periodic task. I added it as a separate task to have better control over the frequency of this task.
   
   This is a useful metric to have. Ideally we'd want the gauge metric to be updated pretty frequently, like in matter one minute. I'm not sure running the periodic task every minute or so is a good idea!
   If we choose to emit the metric on the server side, then we can change the gauge as soon as the events are consumed. It's just up to the metric & monitoring system (outside pinot) to aggregate the metric values (e.g. finding max value) for different replicas of each partition.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1314633766

   # [Codecov](https://codecov.io/gh/apache/pinot/pull/9800?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#9800](https://codecov.io/gh/apache/pinot/pull/9800?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (3a16078) into [master](https://codecov.io/gh/apache/pinot/commit/78504b941331681a8b5bd27e37c176a97e5bbcca?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (78504b9) will **decrease** coverage by `43.15%`.
   > The diff coverage is `80.95%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master    #9800       +/-   ##
   =============================================
   - Coverage     68.69%   25.53%   -43.16%     
   + Complexity     5423       44     -5379     
   =============================================
     Files          1961     1950       -11     
     Lines        104982   104691      -291     
     Branches      15883    15847       -36     
   =============================================
   - Hits          72115    26736    -45379     
   - Misses        27799    75239    +47440     
   + Partials       5068     2716     -2352     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1 | `25.53% <80.95%> (?)` | |
   | integration2 | `?` | |
   | unittests1 | `?` | |
   | unittests2 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pinot/pull/9800?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...g/apache/pinot/common/metrics/AbstractMetrics.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9BYnN0cmFjdE1ldHJpY3MuamF2YQ==) | `79.04% <0.00%> (+1.88%)` | :arrow_up: |
   | [...ot/controller/util/ConsumingSegmentInfoReader.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci91dGlsL0NvbnN1bWluZ1NlZ21lbnRJbmZvUmVhZGVyLmphdmE=) | `68.08% <ø> (-21.28%)` | :arrow_down: |
   | [...org/apache/pinot/spi/stream/PartitionLagState.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvc3RyZWFtL1BhcnRpdGlvbkxhZ1N0YXRlLmphdmE=) | `0.00% <ø> (ø)` | |
   | [...inot/controller/helix/RealtimeConsumerMonitor.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9oZWxpeC9SZWFsdGltZUNvbnN1bWVyTW9uaXRvci5qYXZh) | `85.41% <85.41%> (ø)` | |
   | [...g/apache/pinot/common/metrics/ControllerGauge.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vbWV0cmljcy9Db250cm9sbGVyR2F1Z2UuamF2YQ==) | `98.03% <100.00%> (+0.08%)` | :arrow_up: |
   | [...apache/pinot/controller/BaseControllerStarter.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9CYXNlQ29udHJvbGxlclN0YXJ0ZXIuamF2YQ==) | `76.20% <100.00%> (-5.92%)` | :arrow_down: |
   | [...va/org/apache/pinot/controller/ControllerConf.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3QtY29udHJvbGxlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29udHJvbGxlci9Db250cm9sbGVyQ29uZi5qYXZh) | `50.18% <100.00%> (-6.35%)` | :arrow_down: |
   | [...in/java/org/apache/pinot/spi/utils/BytesUtils.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdXRpbHMvQnl0ZXNVdGlscy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/trace/BaseRecording.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdHJhY2UvQmFzZVJlY29yZGluZy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...java/org/apache/pinot/spi/trace/NoOpRecording.java](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cGlub3Qtc3BpL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9zcGkvdHJhY2UvTm9PcFJlY29yZGluZy5qYXZh) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | ... and [1479 more](https://codecov.io/gh/apache/pinot/pull/9800/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1319119805

   > 
   
   [I thought all periodic tasks are opt-in. Just not configure it at all, or set the time inteval to 0 or somethig like that?]
   
   Anyways, yes. we are going to work on it, the timeline is next few weeks.
   
   No design doc   but what we will be doing is:
   - One metric per table per consuming host -- basically the worst partition.
   - the metric gets destroyed when the partition is not consuming (as all metrics do today).
   - When new host starts to consume, it will get created in the new host (as metrics do today)
   
   I will let Juan add more details once he has it (or just a PR)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
navina commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1317882435

   > Just for my understanding, can you elaborate what you mean by unclean shutdown?
   
   Say when the server crashed for whatever reason (maybe memory) while the user was rebalancing the table, and moved away the consuming segments to a different server?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1315978874

   > Moreover, when a table is rebalanced, the consuming segments get moved around. This can lead to prolonged stale value that will mostly cause noise.
   
   When a table is rebalanced, each server can get notified and if they no longer serve a partition, they can remove the corresponding gauge metric.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on a diff in pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
snleee commented on code in PR #9800:
URL: https://github.com/apache/pinot/pull/9800#discussion_r1026931794


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeConsumerMonitor extends ControllerPeriodicTask<RealtimeConsumerMonitor.Context> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
+  private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;
+
+  @VisibleForTesting
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ConsumingSegmentInfoReader consumingSegmentInfoReader) {
+    super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(),
+        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _consumingSegmentInfoReader = consumingSegmentInfoReader;
+  }
+
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
+    this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics,
+        new ConsumingSegmentInfoReader(executorService, new SimpleHttpConnectionManager(), pinotHelixResourceManager));
+  }
+
+  @Override
+  protected void setUpTask() {
+    LOGGER.info("Setting up RealtimeConsumerMonitor task");
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    if (!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType))) {
+      return;
+    }
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap =
+          _consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 10000);
+      Map<String, List<Long>> partitionToLagSet = new HashMap<>();
+      Map<String, List<Long>> partitionToAvailabilityLagSet = new HashMap<>();
+
+      for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info
+          : segmentsInfoMap._segmentToConsumingInfoMap.values()) {
+        info.forEach(segment -> {
+          segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> {
+            if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+              try {
+                long recordsLag = Long.parseLong(v);
+                partitionToLagSet.computeIfAbsent(k, k1 -> new ArrayList<>());
+                partitionToLagSet.get(k).add(recordsLag);
+              } catch (NumberFormatException nfe) {
+                // skip this as we are unable to parse the lag string

Review Comment:
   thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1315994609

   > > We can make this an opt-in periodic task. I added it as a separate task to have better control over the frequency of this task.
   > 
   > This is a useful metric to have. Ideally we'd want the gauge metric to be updated pretty frequently, like in matter of one minute. I'm not sure running the periodic task every minute or so is a good idea! If we choose to emit the metric on the server side, then we can change the gauge as soon as the events are consumed. It's just up to the metric & monitoring system (outside pinot) to aggregate the metric values (e.g. finding max value) for different replicas of each partition.
   
   Agreed.
   
   We should be emitting this metric every few minutes so as to detect lags quickly and act on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
snleee commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1320584678

   @mcvsubbu @jugomezv I will check in this since this PR doesn't introduce any invasive change given that the feature is disabled by default. 
   
   @navina Let's loop back once the server side solution from LinkedIn is posted. We can review it together. Thanks for driving the discussion!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] snleee merged pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
snleee merged PR #9800:
URL: https://github.com/apache/pinot/pull/9800


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
navina commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1315777608

   > Why do we need to emit this from the controller? We can aggregate the metrics that is being emitted from the servers, right?
   > 
   > cc: @sajjad-moradi
   
   We could emit metrics from servers and then, try to compute in the monitoring layer. This is hard as we would have to find the max lag among all replicas that ever existed for a given partition. I am not familiar with a way to find the max value among the current replica set for a given partition. Moreover, when a table is rebalanced, the consuming segments get moved around. This can lead to prolonged stale value that will mostly cause noise.
   
   We can make this an opt-in periodic task. I added it as a separate task to have better control over the frequency of this task. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] sajjad-moradi commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
sajjad-moradi commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1317871498

   > This works well in a stable state and clean operations. But this doesn't cover cases of unclean shutdown / crashes in production and it has generally been observed to be not very reliable.
   
   Just for my understanding, can you elaborate what you mean by unclean shutdown?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] mcvsubbu commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1317727618

   I see that this has already been approved for merge. We intend to submit a PR soon that will handle this at server level, since we need alerting sooner than later on lag. If you choose to, you can wait for that PR before merging this. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
navina commented on PR #9800:
URL: https://github.com/apache/pinot/pull/9800#issuecomment-1317817172

   > I'm not sure running the periodic task every minute or so is a good idea!
   
   Agreed here :) We will likely not set it to query every minute.
   
   > If we choose to emit the metric on the server side, then we can change the gauge as soon as the events are consumed. It's just up to the metric & monitoring system (outside pinot) to aggregate the metric values (e.g. finding max value) for different replicas of each partition.
   
   Agree that we can detect it sooner. but there doesn't seem to be a good way to aggregate it in the monitoring layer in the presence of rebalance (clean/unclean) or consuming segment re-distribution for any other reason. 
   We have also noted that sometimes all consuming segments get into ERROR state (maybe consumer crashed or hanged) and yet the monitoring metric `LLC_PARTITION_CONSUMING` doesn't detect [ @npawar may have more context ].
   Moreover adding a metric in the segment data manager feels like tip-toeing across a landmine.
   
   A much cleaner way would be to emit at partition level from the connector plugin directly or from server (without involving the server tag, but a stable replica id tag). I believe there are some dependency issues to be sorted out before getting there. 
   
   > I believe we do invoke the code to remove a metric each time a partition completes consumption.
   
   This works well in a stable state and clean operations. But this doesn't cover cases of unclean shutdown / crashes in production and it has generally been observed to be not very reliable. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #9800: Adding a consumer lag as metric via a periodic task in controller

Posted by GitBox <gi...@apache.org>.
navina commented on code in PR #9800:
URL: https://github.com/apache/pinot/pull/9800#discussion_r1025798743


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitor.java:
##########
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import org.apache.commons.httpclient.SimpleHttpConnectionManager;
+import org.apache.pinot.common.metrics.ControllerGauge;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RealtimeConsumerMonitor extends ControllerPeriodicTask<RealtimeConsumerMonitor.Context> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumerMonitor.class);
+  private final ConsumingSegmentInfoReader _consumingSegmentInfoReader;
+
+  @VisibleForTesting
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ConsumingSegmentInfoReader consumingSegmentInfoReader) {
+    super("RealtimeConsumerMonitor", controllerConf.getRealtimeConsumerMonitorRunFrequency(),
+        controllerConf.getRealtimeConsumerMonitorInitialDelayInSeconds(), pinotHelixResourceManager,
+        leadControllerManager, controllerMetrics);
+    _consumingSegmentInfoReader = consumingSegmentInfoReader;
+  }
+
+  public RealtimeConsumerMonitor(ControllerConf controllerConf, PinotHelixResourceManager pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics controllerMetrics,
+      ExecutorService executorService) {
+    this(controllerConf, pinotHelixResourceManager, leadControllerManager, controllerMetrics,
+        new ConsumingSegmentInfoReader(executorService, new SimpleHttpConnectionManager(), pinotHelixResourceManager));
+  }
+
+  @Override
+  protected void setUpTask() {
+    LOGGER.info("Setting up RealtimeConsumerMonitor task");
+  }
+
+  @Override
+  protected void processTable(String tableNameWithType) {
+    if (!TableType.REALTIME.equals(TableNameBuilder.getTableTypeFromTableName(tableNameWithType))) {
+      return;
+    }
+    try {
+      ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap segmentsInfoMap =
+          _consumingSegmentInfoReader.getConsumingSegmentsInfo(tableNameWithType, 10000);
+      Map<String, List<Long>> partitionToLagSet = new HashMap<>();
+      Map<String, List<Long>> partitionToAvailabilityLagSet = new HashMap<>();
+
+      for (List<ConsumingSegmentInfoReader.ConsumingSegmentInfo> info
+          : segmentsInfoMap._segmentToConsumingInfoMap.values()) {
+        info.forEach(segment -> {
+          segment._partitionOffsetInfo._recordsLagMap.forEach((k, v) -> {
+            if (!PartitionLagState.NOT_CALCULATED.equals(v)) {
+              try {
+                long recordsLag = Long.parseLong(v);
+                partitionToLagSet.computeIfAbsent(k, k1 -> new ArrayList<>());
+                partitionToLagSet.get(k).add(recordsLag);
+              } catch (NumberFormatException nfe) {
+                // skip this as we are unable to parse the lag string

Review Comment:
   I didn't add a log line here as it is likely to get very noisy. We can always cross-check by running the same query thought our rest api `/consumingSegmentsApi` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org