You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by su...@apache.org on 2018/12/11 21:10:37 UTC

[incubator-pinot] branch master updated: [PINOT-7476] Add metrics to track cases where segment refresh/reloads fail (#3606)

This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 569022a  [PINOT-7476] Add metrics to track cases where segment refresh/reloads fail (#3606)
569022a is described below

commit 569022ad64960561a32b401f7ec1b2e6f0d6dc69
Author: Sunitha Beeram <sb...@linkedin.com>
AuthorDate: Tue Dec 11 13:10:32 2018 -0800

    [PINOT-7476] Add metrics to track cases where segment refresh/reloads fail (#3606)
    
    * [PINOT-7476] Add metrics to track cases where segment refresh/reloads fail
---
 .../linkedin/pinot/common/metrics/ServerMeter.java |  4 +++-
 .../core/segment/index/loader/LoaderUtils.java     |  2 +-
 .../server/starter/helix/HelixServerStarter.java   |  4 ++--
 .../helix/SegmentMessageHandlerFactory.java        | 27 ++++++++++++++++++----
 4 files changed, 28 insertions(+), 9 deletions(-)

diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java
index 4cda98d..44193fd 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java
@@ -54,7 +54,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NUM_SEGMENTS_QUERIED("numSegmentsQueried", false),
   NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", false),
   NUM_SEGMENTS_MATCHED("numSegmentsMatched", false),
-  NUM_MISSING_SEGMENTS("segments", false);
+  NUM_MISSING_SEGMENTS("segments", false),
+  RELOAD_FAILURES("segments", false),
+  REFRESH_FAILURES("segments", false);
 
   private final String meterName;
   private final String unit;
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java
index b8c30e4..d69130b 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java
@@ -82,7 +82,7 @@ public class LoaderUtils {
    * Try to recover a segment from reload failures (reloadSegment() method in HelixInstanceDataManager). This has no
    * effect for normal segments.
    * <p>Reload failures include normal failures like Java exceptions (called in reloadSegment() finally block) and hard
-   * failures such as server restart during reload and JVM crush (called before trying to load segment from the index
+   * failures such as server restart during reload and JVM crash (called before trying to load segment from the index
    * directory).
    * <p>The following failure scenarios could happen (use atomic renaming operation to classify scenarios):
    * <ul>
diff --git a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java
index f63725e..3cb99bd 100644
--- a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java
+++ b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java
@@ -163,13 +163,13 @@ public class HelixServerStarter {
     _adminApiApplication.start(adminApiPort);
     setAdminApiPort(adminApiPort);
 
+    final ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     // Register message handler factory
     SegmentMessageHandlerFactory messageHandlerFactory =
-        new SegmentMessageHandlerFactory(fetcherAndLoader, _serverInstance.getInstanceDataManager());
+        new SegmentMessageHandlerFactory(fetcherAndLoader, _serverInstance.getInstanceDataManager(), serverMetrics);
     _helixManager.getMessagingService()
         .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory);
 
-    final ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     serverMetrics.addCallbackGauge("helix.connected", () -> _helixManager.isConnected() ? 1L : 0L);
     _helixManager.addPreConnectCallback(
         () -> serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L));
diff --git a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index ddaf778..a14c8c6 100644
--- a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -15,8 +15,11 @@
  */
 package com.linkedin.pinot.server.starter.helix;
 
+import com.linkedin.pinot.common.Utils;
 import com.linkedin.pinot.common.messages.SegmentRefreshMessage;
 import com.linkedin.pinot.common.messages.SegmentReloadMessage;
+import com.linkedin.pinot.common.metrics.ServerMeter;
+import com.linkedin.pinot.common.metrics.ServerMetrics;
 import com.linkedin.pinot.core.data.manager.InstanceDataManager;
 import java.util.concurrent.Semaphore;
 import org.apache.helix.NotificationContext;
@@ -37,11 +40,13 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
 
   private final SegmentFetcherAndLoader _fetcherAndLoader;
   private final InstanceDataManager _instanceDataManager;
+  private final ServerMetrics _metrics;
 
   public SegmentMessageHandlerFactory(SegmentFetcherAndLoader fetcherAndLoader,
-      InstanceDataManager instanceDataManager) {
+      InstanceDataManager instanceDataManager, ServerMetrics metrics) {
     _fetcherAndLoader = fetcherAndLoader;
     _instanceDataManager = instanceDataManager;
+    _metrics = metrics;
     int maxParallelRefreshThreads = instanceDataManager.getMaxParallelRefreshThreads();
     if (maxParallelRefreshThreads > 0) {
       _refreshThreadSemaphore = new Semaphore(maxParallelRefreshThreads, true);
@@ -75,9 +80,9 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
     String msgSubType = message.getMsgSubType();
     switch (msgSubType) {
       case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE:
-        return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), context);
+        return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), _metrics, context);
       case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE:
-        return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), context);
+        return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context);
       default:
         throw new UnsupportedOperationException("Unsupported user defined message sub type: " + msgSubType);
     }
@@ -97,12 +102,15 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
   private class SegmentRefreshMessageHandler extends MessageHandler {
     private final String _segmentName;
     private final String _tableNameWithType;
+    private final ServerMetrics _metrics;
     private final Logger _logger;
 
-    public SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, NotificationContext context) {
+    public SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, ServerMetrics metrics,
+        NotificationContext context) {
       super(refreshMessage, context);
       _segmentName = refreshMessage.getPartitionName();
       _tableNameWithType = refreshMessage.getResourceName();
+      _metrics = metrics;
       _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + SegmentRefreshMessageHandler.class);
     }
 
@@ -115,6 +123,9 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
         // The number of retry times depends on the retry count in SegmentOperations.
         _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType, _segmentName);
         result.setSuccess(true);
+      } catch (Exception e) {
+        _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REFRESH_FAILURES, 1);
+        Utils.rethrowException(e);
       } finally {
         releaseSema();
       }
@@ -130,12 +141,15 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
   private class SegmentReloadMessageHandler extends MessageHandler {
     private final String _segmentName;
     private final String _tableNameWithType;
+    private final ServerMetrics _metrics;
     private final Logger _logger;
 
-    public SegmentReloadMessageHandler(SegmentReloadMessage segmentReloadMessage, NotificationContext context) {
+    public SegmentReloadMessageHandler(SegmentReloadMessage segmentReloadMessage, ServerMetrics metrics,
+        NotificationContext context) {
       super(segmentReloadMessage, context);
       _segmentName = segmentReloadMessage.getPartitionName();
       _tableNameWithType = segmentReloadMessage.getResourceName();
+      _metrics = metrics;
       _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + SegmentReloadMessageHandler.class);
     }
 
@@ -146,6 +160,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
       try {
         if (_segmentName.equals("")) {
           acquireSema("ALL", _logger);
+          // NOTE: the method aborts if any segment reload encounters an unhandled exception - can lead to inconsistent
+          // state across segments
           _instanceDataManager.reloadAllSegments(_tableNameWithType);
         } else {
           // Reload one segment
@@ -154,6 +170,7 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory {
         }
         helixTaskResult.setSuccess(true);
       } catch (Throwable e) {
+        _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.RELOAD_FAILURES, 1);
         // catch all Errors and Exceptions: if we only catch Exception, Errors go completely unhandled
         // (without any corresponding logs to indicate failure!) in the callable path
         throw new RuntimeException(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org