You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/02/05 02:21:24 UTC

[nifi] branch main updated: NIFI-9639: Determine how long it takes to find cluster coordinator and perform DNS lookup when sending heartbeats and include in the logs

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 94668de  NIFI-9639: Determine how long it takes to find cluster coordinator and perform DNS lookup when sending heartbeats and include in the logs
94668de is described below

commit 94668dee6e0da404a8ba7ba03a5d13ce315ce196
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jan 28 10:17:11 2022 -0500

    NIFI-9639: Determine how long it takes to find cluster coordinator and perform DNS lookup when sending heartbeats and include in the logs
    
    This closes #5748
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../protocol/AbstractNodeProtocolSender.java       | 84 ++++++++++++++--------
 .../cluster/protocol/jaxb/JaxbProtocolContext.java | 23 +++---
 .../protocol/message/CommsTimingDetails.java       | 66 +++++++++++++++++
 .../protocol/message/HeartbeatResponseMessage.java | 16 +++--
 .../cluster/ClusterProtocolHeartbeater.java        | 36 +++++++---
 .../election/CuratorLeaderElectionManager.java     | 43 ++++++-----
 6 files changed, 195 insertions(+), 73 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
index c55b41e..33322ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/AbstractNodeProtocolSender.java
@@ -19,6 +19,7 @@ package org.apache.nifi.cluster.protocol;
 
 import org.apache.nifi.cluster.protocol.message.ClusterWorkloadRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ClusterWorkloadResponseMessage;
+import org.apache.nifi.cluster.protocol.message.CommsTimingDetails;
 import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
 import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
@@ -30,7 +31,11 @@ import org.apache.nifi.io.socket.SocketUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.Objects;
@@ -39,10 +44,15 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
     private static final Logger logger = LoggerFactory.getLogger(AbstractNodeProtocolSender.class);
     private final SocketConfiguration socketConfiguration;
     private final ProtocolContext<ProtocolMessage> protocolContext;
+    private final ProtocolMessageMarshaller<ProtocolMessage> marshaller;
+    private final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller;
 
     public AbstractNodeProtocolSender(final SocketConfiguration socketConfiguration, final ProtocolContext<ProtocolMessage> protocolContext) {
         this.socketConfiguration = socketConfiguration;
         this.protocolContext = protocolContext;
+
+        marshaller = protocolContext.createMarshaller();
+        unmarshaller = protocolContext.createUnmarshaller();
     }
 
     @Override
@@ -111,24 +121,24 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
 
     @Override
     public HeartbeatResponseMessage heartbeat(final HeartbeatMessage msg, final String address) throws ProtocolException {
-        final String hostname;
-        final int port;
-        try {
-            final String[] parts = address.split(":");
-            hostname = parts[0];
-            port = Integer.parseInt(parts[1]);
-        } catch (final Exception e) {
-            throw new IllegalArgumentException("Cannot send heartbeat to address [" + address + "]. Address must be in <hostname>:<port> format");
-        }
+        final CommsTimingDetails timingDetails = new CommsTimingDetails();
+
+        final String[] parts = address.split(":");
+        final String hostname = parts[0];
+        final int port = Integer.parseInt(parts[1]);
+
+        final ProtocolMessage responseMessage = sendProtocolMessage(msg, hostname, port, timingDetails);
 
-        final ProtocolMessage responseMessage = sendProtocolMessage(msg, hostname, port);
         if (MessageType.HEARTBEAT_RESPONSE == responseMessage.getType()) {
-            return (HeartbeatResponseMessage) responseMessage;
+            final HeartbeatResponseMessage heartbeatResponseMessage = (HeartbeatResponseMessage) responseMessage;
+            heartbeatResponseMessage.setCommsTimingDetails(timingDetails);
+            return heartbeatResponseMessage;
         }
 
         throw new ProtocolException("Expected message type '" + MessageType.HEARTBEAT_RESPONSE + "' but found '" + responseMessage.getType() + "'");
     }
 
+
     @Override
     public ClusterWorkloadResponseMessage clusterWorkload(final ClusterWorkloadRequestMessage msg) throws ProtocolException {
         final InetSocketAddress serviceAddress;
@@ -137,7 +147,8 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
         } catch (IOException e) {
             throw new ProtocolException("Failed to getServiceAddress due to " + e, e);
         }
-        final ProtocolMessage responseMessage = sendProtocolMessage(msg, serviceAddress.getHostName(), serviceAddress.getPort());
+
+        final ProtocolMessage responseMessage = sendProtocolMessage(msg, serviceAddress.getHostName(), serviceAddress.getPort(), new CommsTimingDetails());
         if (MessageType.CLUSTER_WORKLOAD_RESPONSE == responseMessage.getType()) {
             return (ClusterWorkloadResponseMessage) responseMessage;
         }
@@ -162,38 +173,55 @@ public abstract class AbstractNodeProtocolSender implements NodeProtocolSender {
         return socketConfiguration;
     }
 
-    private ProtocolMessage sendProtocolMessage(final ProtocolMessage msg, final String hostname, final int port) {
-        Socket socket = null;
-        try {
-            try {
-                socket = SocketUtils.createSocket(new InetSocketAddress(hostname, port), socketConfiguration);
-            } catch (IOException e) {
-                throw new ProtocolException("Failed to send message to Cluster Coordinator due to: " + e, e);
-            }
+    private ProtocolMessage sendProtocolMessage(final ProtocolMessage msg, final String hostname, final int port, final CommsTimingDetails timingDetails) {
+        final long dnsLookupStart = System.currentTimeMillis();
+        final InetSocketAddress socketAddress = new InetSocketAddress(hostname, port);
+
+        final long connectStart = System.currentTimeMillis();
+        try (final Socket socket = SocketUtils.createSocket(socketAddress, socketConfiguration);
+             final InputStream in = new BufferedInputStream(socket.getInputStream());
+             final OutputStream out = new BufferedOutputStream(socket.getOutputStream())) {
 
+            final long sendStart = System.currentTimeMillis();
             try {
                 // marshal message to output stream
-                final ProtocolMessageMarshaller<ProtocolMessage> marshaller = protocolContext.createMarshaller();
-                marshaller.marshal(msg, socket.getOutputStream());
+                marshaller.marshal(msg, out);
             } catch (final IOException ioe) {
-                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message due to: " + ioe, ioe);
+                throw new ProtocolException("Failed marshalling '" + msg.getType() + "' protocol message", ioe);
             }
 
+            // Read the first byte to see how long it takes, and then reset it so that
+            // we can consume it in the unmarshaller. This provides us helpful information in logs if heartbeats are not being produced/consumed
+            // as frequently/efficiently as we'd like.
+            final long receiveStart = System.currentTimeMillis();
+            in.mark(1);
+            in.read();
+            in.reset();
+
+            final long receiveFullStart = System.currentTimeMillis();
             final ProtocolMessage response;
             try {
                 // unmarshall response and return
-                final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller();
-                response = unmarshaller.unmarshal(socket.getInputStream());
+                response = unmarshaller.unmarshal(in);
             } catch (final IOException ioe) {
                 throw new ProtocolException("Failed unmarshalling '" + MessageType.CONNECTION_RESPONSE + "' protocol message from "
-                        + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe);
+                        + socket.getRemoteSocketAddress(), ioe);
             }
 
+            final long receiveEnd = System.currentTimeMillis();
+
+            timingDetails.setDnsLookupMillis(connectStart - dnsLookupStart);
+            timingDetails.setConnectMillis(sendStart - connectStart);
+            timingDetails.setSendRequestMillis(receiveStart - sendStart);
+            timingDetails.setReceiveFirstByteMillis(receiveFullStart - receiveStart);
+            timingDetails.setReceiveFullResponseMillis(receiveEnd - receiveStart);
+
             return response;
-        } finally {
-            SocketUtils.closeQuietly(socket);
+        } catch (IOException e) {
+            throw new ProtocolException("Failed to send message to Cluster Coordinator", e);
         }
     }
 
+
     protected abstract InetSocketAddress getServiceAddress() throws IOException;
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
index 23d45d1..34da349 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/jaxb/JaxbProtocolContext.java
@@ -16,6 +16,17 @@
  */
 package org.apache.nifi.cluster.protocol.jaxb;
 
+import org.apache.nifi.cluster.protocol.ProtocolContext;
+import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
+import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
+import org.apache.nifi.security.xml.XmlUtils;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -25,16 +36,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.stream.XMLStreamException;
-import javax.xml.stream.XMLStreamReader;
-import org.apache.nifi.cluster.protocol.ProtocolContext;
-import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
-import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
-import org.apache.nifi.security.xml.XmlUtils;
 
 /**
  * Implements a context for communicating internally amongst the cluster using
@@ -83,7 +84,7 @@ public class JaxbProtocolContext<T> implements ProtocolContext {
                     dos.writeInt(msgBytes.size());
 
                     // write message
-                    dos.write(msgBytes.toByteArray());
+                    msgBytes.writeTo(dos);
 
                     dos.flush();
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/CommsTimingDetails.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/CommsTimingDetails.java
new file mode 100644
index 0000000..0c96b06
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/CommsTimingDetails.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cluster.protocol.message;
+
+public class CommsTimingDetails {
+    private long dnsLookupMillis = -1;
+    private long connectMillis = -1;
+    private long sendRequestMillis = -1;
+    private long receiveFirstByteMillis = -1;
+    private long receiveFullResponseMillis = -1;
+
+    public long getDnsLookupMillis() {
+        return dnsLookupMillis;
+    }
+
+    public void setDnsLookupMillis(final long dnsLookupMillis) {
+        this.dnsLookupMillis = dnsLookupMillis;
+    }
+
+    public long getConnectMillis() {
+        return connectMillis;
+    }
+
+    public void setConnectMillis(final long connectMillis) {
+        this.connectMillis = connectMillis;
+    }
+
+    public long getSendRequestMillis() {
+        return sendRequestMillis;
+    }
+
+    public void setSendRequestMillis(final long sendRequestMillis) {
+        this.sendRequestMillis = sendRequestMillis;
+    }
+
+    public long getReceiveFirstByteMillis() {
+        return receiveFirstByteMillis;
+    }
+
+    public void setReceiveFirstByteMillis(final long receiveFirstByteMillis) {
+        this.receiveFirstByteMillis = receiveFirstByteMillis;
+    }
+
+    public long getReceiveFullResponseMillis() {
+        return receiveFullResponseMillis;
+    }
+
+    public void setReceiveFullResponseMillis(final long receiveFullResponseMillis) {
+        this.receiveFullResponseMillis = receiveFullResponseMillis;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
index e508185..8c6aa32 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/message/HeartbeatResponseMessage.java
@@ -17,18 +17,18 @@
 
 package org.apache.nifi.cluster.protocol.message;
 
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
 
 import javax.xml.bind.annotation.XmlRootElement;
-
-import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
+import java.util.ArrayList;
+import java.util.List;
 
 @XmlRootElement(name = "heartbeatResponse")
 public class HeartbeatResponseMessage extends ProtocolMessage {
 
     private List<NodeConnectionStatus> updatedNodeStatuses = new ArrayList<>();
     private String flowElectionMessage = null;
+    private CommsTimingDetails commsTimingDetails;
 
     @Override
     public MessageType getType() {
@@ -50,4 +50,12 @@ public class HeartbeatResponseMessage extends ProtocolMessage {
     public void setFlowElectionMessage(String flowElectionMessage) {
         this.flowElectionMessage = flowElectionMessage;
     }
+
+    public CommsTimingDetails getCommsTimingDetails() {
+        return commsTimingDetails;
+    }
+
+    public void setCommsTimingDetails(final CommsTimingDetails commsTimingDetails) {
+        this.commsTimingDetails = commsTimingDetails;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
index 50d0382..a6efd87 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/cluster/ClusterProtocolHeartbeater.java
@@ -16,15 +16,6 @@
  */
 package org.apache.nifi.controller.cluster;
 
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 import org.apache.nifi.cluster.coordination.ClusterCoordinator;
 import org.apache.nifi.cluster.coordination.node.ClusterRoles;
 import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
@@ -32,12 +23,23 @@ import org.apache.nifi.cluster.protocol.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.ProtocolException;
+import org.apache.nifi.cluster.protocol.message.CommsTimingDetails;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
 import org.apache.nifi.cluster.protocol.message.HeartbeatResponseMessage;
 import org.apache.nifi.controller.leader.election.LeaderElectionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
 /**
  * Uses Leader Election Manager in order to determine which node is the elected
  * Cluster Coordinator and to indicate that this node is part of the cluster.
@@ -68,11 +70,14 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
         return heartbeatAddress;
     }
 
+
     @Override
     public synchronized void send(final HeartbeatMessage heartbeatMessage) throws IOException {
         final long sendStart = System.nanoTime();
-
+        final long findCoordinatorStart = System.nanoTime();
         final String heartbeatAddress = getHeartbeatAddress();
+        final long findCoordinatorNanos = System.nanoTime() - findCoordinatorStart;
+
         final HeartbeatResponseMessage responseMessage = protocolSender.heartbeat(heartbeatMessage, heartbeatAddress);
 
         final byte[] payloadBytes = heartbeatMessage.getHeartbeat().getPayload();
@@ -104,10 +109,19 @@ public class ClusterProtocolHeartbeater implements Heartbeater {
         final String flowElectionMessage = responseMessage.getFlowElectionMessage();
         final String formattedElectionMessage = flowElectionMessage == null ? "" : "; " + flowElectionMessage;
 
-        logger.info("Heartbeat created at {} and sent to {} at {}; send took {} millis{}",
+        final CommsTimingDetails timingDetails = responseMessage.getCommsTimingDetails();
+
+        logger.info("Heartbeat created at {} and sent to {} at {}; determining Cluster Coordinator took {} millis; DNS lookup for coordinator took {} millis; connecting to coordinator took {} " +
+                "millis; sending heartbeat took {} millis; receiving first byte from response took {} millis; receiving full response took {} millis; total time was {} millis{}",
             dateFormatter.format(new Date(heartbeatMessage.getHeartbeat().getCreatedTimestamp())),
             heartbeatAddress,
             dateFormatter.format(new Date()),
+            TimeUnit.NANOSECONDS.toMillis(findCoordinatorNanos),
+            timingDetails.getDnsLookupMillis(),
+            timingDetails.getConnectMillis(),
+            timingDetails.getSendRequestMillis(),
+            timingDetails.getReceiveFirstByteMillis(),
+            timingDetails.getReceiveFullResponseMillis(),
             sendMillis,
             formattedElectionMessage);
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
index 5fe8153..f1d729f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/leader/election/CuratorLeaderElectionManager.java
@@ -274,31 +274,36 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
         }
 
         final long startNanos = System.nanoTime();
-        Participant participant;
         try {
-            participant = role.getLeaderSelector().getLeader();
-        } catch (Exception e) {
-            logger.debug("Unable to determine leader for role '{}'; returning null", roleName);
-            return null;
-        }
 
-        if (participant == null) {
-            return null;
-        }
+            Participant participant;
+            try {
+                participant = role.getLeaderSelector().getLeader();
+            } catch (Exception e) {
+                logger.warn("Unable to determine leader for role '{}'; returning null", roleName, e);
+                return null;
+            }
 
-        final String participantId = participant.getId();
-        if (StringUtils.isEmpty(participantId)) {
-            return null;
-        }
+            if (participant == null) {
+                logger.debug("There is currently no elected leader for the {} role", roleName);
+                return null;
+            }
 
-        registerPollTime(System.nanoTime() - startNanos);
+            final String participantId = participant.getId();
+            if (StringUtils.isEmpty(participantId)) {
+                logger.debug("Found leader participant for role {} but the participantId was empty", roleName);
+                return null;
+            }
 
-        final String previousLeader = lastKnownLeader.put(roleName, participantId);
-        if (previousLeader != null && !previousLeader.equals(participantId)) {
-            onLeaderChanged(roleName);
-        }
+            final String previousLeader = lastKnownLeader.put(roleName, participantId);
+            if (previousLeader != null && !previousLeader.equals(participantId)) {
+                onLeaderChanged(roleName);
+            }
 
-        return participantId;
+            return participantId;
+        } finally {
+            registerPollTime(System.nanoTime() - startNanos);
+        }
     }
 
     private void registerPollTime(final long nanos) {