You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@asterixdb.apache.org by mb...@apache.org on 2018/02/14 05:44:20 UTC

asterixdb git commit: [ASTERIXDB-2284][CLUS] Ensure Node Failure on Heartbeat Miss

Repository: asterixdb
Updated Branches:
  refs/heads/master e8e78e24a -> bf74a319d


[ASTERIXDB-2284][CLUS] Ensure Node Failure on Heartbeat Miss

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Request the node which exceeded its heartbeat misses
  to shutdown to ensure its failures.
- Ensure thread safety of lastHeartbeatNanoTime in
  NodeControllerState.

Change-Id: I121f85fd858484377a9d888d18c3069c239f00fc
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2390
Sonar-Qube: Jenkins <je...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/bf74a319
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/bf74a319
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/bf74a319

Branch: refs/heads/master
Commit: bf74a319dbdfa3fea3007d3286f14a77fecac178
Parents: e8e78e2
Author: Murtadha Hubail <mh...@apache.org>
Authored: Wed Feb 14 04:37:15 2018 +0300
Committer: Michael Blow <mb...@apache.org>
Committed: Tue Feb 13 21:44:03 2018 -0800

----------------------------------------------------------------------
 .../hyracks/control/cc/NodeControllerState.java |  6 +---
 .../hyracks/control/cc/cluster/NodeManager.java | 30 ++++++++++++++------
 2 files changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bf74a319/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
index 06af01f..415ca81 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/NodeControllerState.java
@@ -141,7 +141,7 @@ public class NodeControllerState {
 
     private int rrdPtr;
 
-    private long lastHeartbeatNanoTime;
+    private volatile long lastHeartbeatNanoTime;
 
     private NodeCapacity capacity;
 
@@ -254,10 +254,6 @@ public class NodeControllerState {
         return System.nanoTime() - lastHeartbeatNanoTime;
     }
 
-    public long getLastHeartbeatNanoTime() {
-        return lastHeartbeatNanoTime;
-    }
-
     public NodeControllerRemoteProxy getNodeController() {
         return nodeController;
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/bf74a319/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
----------------------------------------------------------------------
diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
index 98cf67a..8f73864 100644
--- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
+++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java
@@ -48,9 +48,11 @@ import org.apache.hyracks.control.common.controllers.CCConfig;
 import org.apache.hyracks.control.common.ipc.CCNCFunctions.AbortCCJobsFunction;
 import org.apache.hyracks.ipc.api.IIPCHandle;
 import org.apache.hyracks.ipc.exceptions.IPCException;
+import org.apache.hyracks.util.annotations.NotThreadSafe;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+@NotThreadSafe
 public class NodeManager implements INodeManager {
     private static final Logger LOGGER = LogManager.getLogger();
 
@@ -99,7 +101,7 @@ public class NodeManager implements INodeManager {
         // Updates the node registry.
         if (nodeRegistry.containsKey(nodeId)) {
             LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering.");
-            removeDeadNode(nodeId);
+            failNonDeadNode(nodeId);
         } else {
             try {
                 // TODO(mblow): it seems we should close IPC handles when we're done with them (like here)
@@ -155,22 +157,23 @@ public class NodeManager implements INodeManager {
             Map.Entry<String, NodeControllerState> entry = nodeIterator.next();
             String nodeId = entry.getKey();
             NodeControllerState state = entry.getValue();
-            if (state.nanosSinceLastHeartbeat() >= deadNodeNanosThreshold) {
+            final long nanosSinceLastHeartbeat = state.nanosSinceLastHeartbeat();
+            if (nanosSinceLastHeartbeat >= deadNodeNanosThreshold) {
+                ensureNodeFailure(nodeId, state);
                 deadNodes.add(nodeId);
                 affectedJobIds.addAll(state.getActiveJobIds());
-                // Removes the node from node map.
                 nodeIterator.remove();
-                // Removes the node from IP map.
                 removeNodeFromIpAddressMap(nodeId, state);
-                // Updates the cluster capacity.
                 resourceManager.update(nodeId, new NodeCapacity(0L, 0));
-                LOGGER.info(entry.getKey() + " considered dead");
+                LOGGER.info("{} considered dead. Last heartbeat received {}ms ago. Max miss period: {}ms", nodeId,
+                        TimeUnit.NANOSECONDS.toMillis(nanosSinceLastHeartbeat),
+                        TimeUnit.NANOSECONDS.toMillis(deadNodeNanosThreshold));
             }
         }
         return Pair.of(deadNodes, affectedJobIds);
     }
 
-    public void removeDeadNode(String nodeId) throws HyracksException {
+    private void failNonDeadNode(String nodeId) throws HyracksException {
         NodeControllerState state = nodeRegistry.get(nodeId);
         Set<JobId> affectedJobIds = state.getActiveJobIds();
         // Removes the node from node map.
@@ -196,7 +199,6 @@ public class NodeManager implements INodeManager {
         nodeRegistry.forEach(nodeFunction::apply);
     }
 
-    // Removes the entry of the node in <code>ipAddressNodeNameMap</code>.
     private void removeNodeFromIpAddressMap(String nodeId, NodeControllerState ncState) throws HyracksException {
         InetAddress ipAddress = getIpAddress(ncState);
         Set<String> nodes = ipAddressNodeNameMap.get(ipAddress);
@@ -209,7 +211,6 @@ public class NodeManager implements INodeManager {
         }
     }
 
-    // Retrieves the IP address for a given node.
     private InetAddress getIpAddress(NodeControllerState ncState) throws HyracksException {
         String ipAddress = ncState.getNCConfig().getDataPublicAddress();
         try {
@@ -222,4 +223,15 @@ public class NodeManager implements INodeManager {
     private NodeCapacity getAdjustedNodeCapacity(NodeCapacity nodeCapacity) {
         return new NodeCapacity(nodeCapacity.getMemoryByteSize(), nodeCapacity.getCores() * nodeCoresMultiplier);
     }
+
+    private void ensureNodeFailure(String nodeId, NodeControllerState state) {
+        try {
+            LOGGER.info("Requesting node {} to shutdown to ensure failure", nodeId);
+            state.getNodeController().shutdown(false);
+            LOGGER.info("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication",
+                    nodeId);
+        } catch (Exception ignore) {
+            LOGGER.debug(() -> "Ignoring failure on ensuring node " + nodeId + " has failed", ignore);
+        }
+    }
 }