You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2017/09/19 02:54:39 UTC

[2/4] incubator-trafodion git commit: [TRAFODION-2651] Fixed monitor-to-monitor communication by adding reconnect logic to better handle various communication glitches which previously resulted in false node down situations.

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/cluster.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx
index 585ff0a..c65116b 100644
--- a/core/sqf/monitor/linux/cluster.cxx
+++ b/core/sqf/monitor/linux/cluster.cxx
@@ -361,13 +361,14 @@ void CCluster::NodeReady( CNode *spareNode )
 // Assigns a new TMLeader if given pnid is same as TmLeaderNid 
 // TmLeader is a logical node num. 
 // pnid has gone down, so if that node was previously the TM leader, a new one needs to be chosen.
-void CCluster::AssignTmLeader(int pnid)
+void CCluster::AssignTmLeader( int pnid, bool checkProcess )
 {
     const char method_name[] = "CCluster::AssignTmLeader";
     TRACE_ENTRY;
 
     int i = 0;
     CNode *node = NULL;
+    CProcess *process = NULL;
 
     int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid();
 
@@ -375,24 +376,50 @@ void CCluster::AssignTmLeader(int pnid)
     {
         node = LNode[TmLeaderNid]->GetNode();
 
-        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+        if (checkProcess)
         {
-            trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, isSoftNodeDown=%d\n"
-                        , method_name, __LINE__
-                        , node->GetPNid()
-                        , node->GetName()
-                        , NodePhaseString(node->GetPhase())
-                        , node->IsSoftNodeDown());
+            process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM );
+            if (process)
+            {
+                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+                {
+                    if (node)
+                        trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, "
+                                      "isSoftNodeDown=%d, checkProcess=%d\n"
+                                    , method_name, __LINE__
+                                    , node->GetPNid()
+                                    , node->GetName()
+                                    , NodePhaseString(node->GetPhase())
+                                    , node->IsSoftNodeDown()
+                                    , checkProcess );
+                }
+                return;
+            }
+        }
+        else
+        {
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+            {
+                if (node)
+                    trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, "
+                                  "isSoftNodeDown=%d, checkProcess=%d\n"
+                                , method_name, __LINE__
+                                , node->GetPNid()
+                                , node->GetName()
+                                , NodePhaseString(node->GetPhase())
+                                , node->IsSoftNodeDown()
+                                , checkProcess );
+            }
+            return;
         }
-    
-        return;
     }
 
     node = Node[TmLeaderPNid];
 
     if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
     {
-        trace_printf("%s@%d" " - Node "  "%d" " TmLeader failed." "\n", method_name, __LINE__, TmLeaderNid);
+        trace_printf( "%s@%d" " - Node "  "%d" " TmLeader failed! (checkProcess=%d)\n"
+                    , method_name, __LINE__, TmLeaderNid, checkProcess );
     }
 
     for (i=0; i<GetConfigPNodesMax(); i++)
@@ -436,6 +463,15 @@ void CCluster::AssignTmLeader(int pnid)
 
         TmLeaderNid = node->GetFirstLNode()->GetNid();
 
+        if (checkProcess)
+        {
+            process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM );
+            if (!process)
+            {
+                continue; // skip this node no DTM process exists
+            }
+        }
+
         if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
         {
             trace_printf("%s@%d" " - Node "  "%d" " is the new TmLeader." "\n", method_name, __LINE__, TmLeaderNid);
@@ -483,7 +519,11 @@ CCluster::CCluster (void)
       integratingPNid_(-1),
       joinComm_(MPI_COMM_NULL),
       joinSock_(-1),
-      seqNum_(0),
+      lastSeqNum_(0),
+      lowSeqNum_(0),
+      highSeqNum_(0),
+      reconnectSeqNum_(0),
+      seqNum_(1),
       waitForWatchdogExit_(false)
       ,checkSeqNum_(false)
       ,validateNodeDown_(false)
@@ -511,6 +551,9 @@ CCluster::CCluster (void)
                     method_name, __LINE__,
                     (checkSeqNum_ ? "enabled" : "disabled"));
     
+    CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
+    configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
+
     // Compute minimum "sync cycles" per second.   The minimum is 1/10
     // the expected number, assuming "next_test_delay" cycles per second (where
     // next_test_delay is in microseconds).
@@ -521,8 +564,6 @@ CCluster::CCluster (void)
     agMinElapsed_.tv_sec = 10000;
     agMinElapsed_.tv_nsec = 0;
 
-    tmSyncBuffer_ = Nodes->GetSyncBuffer();
-
     // Allocate structures for monitor point-to-point communications
     //
     //   The current approach is to allocate to a maximum number (MAX_NODES).
@@ -547,6 +588,7 @@ CCluster::CCluster (void)
     {
         comms_[i] = MPI_COMM_NULL;
         socks_[i] = -1;
+        sockPorts_[i] = -1;
     }
 
     env = getenv("SQ_MON_NODE_DOWN_VALIDATION");
@@ -647,7 +689,7 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
         {
             trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
         }
-        if (nodestate[i].seq_num > 0)
+        if (nodestate[i].seq_num > 1)
         {
             if (seqNum == 0) 
             {
@@ -658,6 +700,10 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
                 assert(nodestate[i].seq_num == seqNum);
             }
         }
+        if (trace_settings & TRACE_RECOVERY)
+        {
+            trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
+        }
     }
 
     TRACE_EXIT;
@@ -839,7 +885,7 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state)
     if ( Emulate_Down )
     {
         IAmIntegrated = false;
-        AssignTmLeader(pnid);
+        AssignTmLeader(pnid, false);
     }
 
     TRACE_EXIT;
@@ -940,7 +986,7 @@ void CCluster::SoftNodeDown( int pnid )
     }
 
     IAmIntegrated = false;
-    AssignTmLeader(pnid);
+    AssignTmLeader(pnid, false);
 
     TRACE_EXIT;
 }
@@ -1303,9 +1349,10 @@ int CCluster::HardNodeUp( int pnid, char *node_name )
                             , method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) );
                 for ( int i =0; i < Nodes->GetPNodesCount(); i++ )
                 {
-                    trace_printf( "%s@%d socks_[indexToPnid_[%d]=%d]=%d\n"
+                    trace_printf( "%s@%d socks_[indexToPnid_[%d]=%d]=%d, sockPorts_[indexToPnid_[%d]=%d]=%d\n"
                                 , method_name, __LINE__
-                                , i, indexToPnid_[i], socks_[indexToPnid_[i]] );
+                                , i, indexToPnid_[i], socks_[indexToPnid_[i]]
+                                , i, indexToPnid_[i], sockPorts_[indexToPnid_[i]] );
                 }
             }
             if ( MyNode->IsCreator() )
@@ -1616,8 +1663,9 @@ const char *SyncStateString( SyncState state)
 }
 
 
-void CCluster::AddTmsyncMsg (struct sync_def *sync,
-                             struct internal_msg_def *msg)
+void CCluster::AddTmsyncMsg( struct sync_buffer_def *tmSyncBuffer
+                           , struct sync_def *sync
+                           , struct internal_msg_def *msg)
 {
     const char method_name[] = "CCluster::AddTmsyncMsg";
     TRACE_ENTRY;
@@ -1647,12 +1695,12 @@ void CCluster::AddTmsyncMsg (struct sync_def *sync,
 
     // Insert the message size into the message header
     msg->replSize = msgSize;
-    tmSyncBuffer_->msgInfo.msg_count = 1;
-    tmSyncBuffer_->msgInfo.msg_offset += msgSize;
+    tmSyncBuffer->msgInfo.msg_count = 1;
+    tmSyncBuffer->msgInfo.msg_offset += msgSize;
 
     // Set end-of-buffer marker
     msg = (struct internal_msg_def *)
-        &tmSyncBuffer_->msg[tmSyncBuffer_->msgInfo.msg_offset];
+        &tmSyncBuffer->msg[tmSyncBuffer->msgInfo.msg_offset];
     msg->type = InternalType_Null;
 
     TRACE_EXIT;
@@ -2787,7 +2835,6 @@ void CCluster::InitializeConfigCluster( void )
     MPI_Comm_size (MPI_COMM_WORLD, &worldSize);    
     int rankToPnid[worldSize];
     CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
-    configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
     
     CurNodes = worldSize;
 
@@ -2931,7 +2978,7 @@ void CCluster::InitializeConfigCluster( void )
                     node->SetCommPort( &commPortNums[ i * MPI_MAX_PORT_NAME] );
                     node->SetSyncPort( &syncPortNums[ i * MPI_MAX_PORT_NAME] );
                     rankToPnid[i] = node->GetPNid();
-                    nodeStatus[i] = true;
+                    nodeStatus[rankToPnid[i]] = true;
 
                     if (trace_settings & TRACE_INIT)
                     {
@@ -2967,7 +3014,7 @@ void CCluster::InitializeConfigCluster( void )
             // Any nodes not in the initial MPI_COMM_WORLD are down.
             for (int i=0; i<GetConfigPNodesCount(); ++i)
             {
-                if ( nodeStatus[i] == false )
+                if ( nodeStatus[indexToPnid_[i]] == false )
                 {
                     if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
                         trace_printf( "%s@%d - nodeStatus[%d]=%d"
@@ -2982,7 +3029,7 @@ void CCluster::InitializeConfigCluster( void )
                     // assign new TmLeader if TMLeader node is dead.
                     if (TmLeaderPNid == indexToPnid_[i]) 
                     {
-                        AssignTmLeader(indexToPnid_[i]);
+                        AssignTmLeader(indexToPnid_[i], false);
                     }
                 }
                 else
@@ -3327,6 +3374,132 @@ void CCluster::SendReIntegrateStatus( STATE nodeState, int initErr )
     }
 }
 
+bool CCluster::PingSockPeer(CNode *node)
+{
+    const char method_name[] = "CCluster::PingSockPeer";
+    TRACE_ENTRY;
+
+    bool rs = true;
+    int  rc;
+    int  pingSock = -1;
+
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        trace_printf( "%s@%d - Pinging remote monitor %s, pnid=%d\n"
+                    , method_name, __LINE__
+                    , node->GetName(), node->GetPNid() );
+    }
+
+    // Attempt to connect with remote monitor
+    for (int i = 0; i < MAX_RECONN_PING_RETRY_COUNT; i++ )
+    {
+        pingSock = Monitor->Connect( node->GetCommPort() );
+        if ( pingSock < 0 )
+        {
+            sleep( MAX_RECONN_PING_WAIT_TIMEOUT );
+        }
+        else
+        {
+            break;
+        }
+    }
+    if ( pingSock < 0 )
+    {
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Can't connect to remote monitor %s, pnid=%d\n"
+                        , method_name, __LINE__
+                        , node->GetName(), node->GetPNid() );
+        }
+        return(false);
+    }
+
+    nodeId_t nodeInfo;
+
+    nodeInfo.pnid = MyPNID;
+    strcpy(nodeInfo.nodeName, MyNode->GetName());
+    strcpy(nodeInfo.commPort, MyNode->GetCommPort());
+    strcpy(nodeInfo.syncPort, MyNode->GetSyncPort());
+    nodeInfo.ping = true;
+    nodeInfo.creatorPNid = -1;
+    nodeInfo.creator = false;
+    nodeInfo.creatorShellPid = -1;
+    nodeInfo.creatorShellVerifier = -1;
+    
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        trace_printf( "Sending my nodeInfo.pnid=%d\n"
+                      "        nodeInfo.nodeName=%s\n"
+                      "        nodeInfo.commPort=%s\n"
+                      "        nodeInfo.syncPort=%s\n"
+                      "        nodeInfo.creatorPNid=%d\n"
+                      "        nodeInfo.creator=%d\n"
+                      "        nodeInfo.creatorShellPid=%d\n"
+                      "        nodeInfo.creatorShellVerifier=%d\n"
+                      "        nodeInfo.ping=%d\n"
+                    , nodeInfo.pnid
+                    , nodeInfo.nodeName
+                    , nodeInfo.commPort
+                    , nodeInfo.syncPort
+                    , nodeInfo.creatorPNid
+                    , nodeInfo.creator
+                    , nodeInfo.creatorShellPid
+                    , nodeInfo.creatorShellVerifier
+                    , nodeInfo.ping );
+    }
+
+    rc = Monitor->SendSock( (char *) &nodeInfo
+                          , sizeof(nodeId_t)
+                          , pingSock );
+
+    if ( rc )
+    {
+        rs = false;
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], Cannot send ping node info to node %s: (%s)\n"
+                , method_name, node->GetName(), ErrorMsg(rc));
+        mon_log_write(MON_PINGSOCKPEER_1, SQ_LOG_ERR, buf);    
+    }
+    else
+    {
+        // Get info about connecting monitor
+        rc = Monitor->ReceiveSock( (char *) &nodeInfo
+                                 , sizeof(nodeId_t)
+                                 , pingSock );
+        if ( rc )
+        {   // Handle error
+            rs = false;
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf)
+                    , "[%s], Cannot receive ping node info from node %s: (%s)\n"
+                    , method_name, node->GetName(), ErrorMsg(rc));
+            mon_log_write(MON_PINGSOCKPEER_2, SQ_LOG_ERR, buf);    
+        }
+        else
+        {
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "Received from nodeInfo.pnid=%d\n"
+                              "        nodeInfo.nodeName=%s\n"
+                              "        nodeInfo.commPort=%s\n"
+                              "        nodeInfo.syncPort=%s\n"
+                              "        nodeInfo.ping=%d\n"
+                            , nodeInfo.pnid
+                            , nodeInfo.nodeName
+                            , nodeInfo.commPort
+                            , nodeInfo.syncPort
+                            , nodeInfo.ping );
+            }
+        }
+    }
+
+    close( pingSock );
+
+    TRACE_EXIT;
+    return( rs );
+}
+
 void CCluster::ReIntegrate( int initProblem )
 {
     const char method_name[] = "CCluster::ReIntegrate";
@@ -3607,6 +3780,10 @@ void CCluster::ReIntegrateSock( int initProblem )
     int rc;
     int existingCommFd;
     int existingSyncFd;
+    char commPort[MPI_MAX_PORT_NAME];
+    char syncPort[MPI_MAX_PORT_NAME];
+    char *pch1;
+    char *pch2;
 
     // Set bit indicating my node is up
     upNodes_.upNodes[MyPNID/MAX_NODE_BITMASK] |= (1ull << (MyPNID%MAX_NODE_BITMASK));
@@ -3642,12 +3819,6 @@ void CCluster::ReIntegrateSock( int initProblem )
 
     TEST_POINT( TP011_NODE_UP );
 
-    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
-    {
-        trace_printf("%s@%d Connected to creator monitor, sending my node info\n",
-                     method_name, __LINE__); 
-    }
-
     // Send this node's name and port number so creator monitor 
     // knows who we are, and set flag to let creator monitor it is the CREATOR.
     nodeId_t myNodeInfo;
@@ -3662,21 +3833,14 @@ void CCluster::ReIntegrateSock( int initProblem )
 
     if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
     {
-        trace_printf( "%s@%d - Sending my node info to creator monitor\n"
-                    , method_name, __LINE__);
-        trace_printf( "Node info for pnid=%d\n"
-                      "        myNodeInfo.nodeName=%s\n"
-                      "        myNodeInfo.commPort=%s\n"
-                      "        myNodeInfo.syncPort=%s\n"
-                      "        myNodeInfo.creatorPNid=%d\n"
-                      "        myNodeInfo.creator=%d\n"
-                      "        myNodeInfo.creatorShellPid=%d\n"
-                      "        myNodeInfo.creatorShellVerifier=%d\n"
+        trace_printf( "%s@%d - Connected to creator monitor, sending my info, "
+                      "node %d (%s), commPort=%s, syncPort=%s, creator=%d, "
+                      "creatorShellPid=%d:%d\n"
+                    , method_name, __LINE__
                     , myNodeInfo.pnid
                     , myNodeInfo.nodeName
                     , myNodeInfo.commPort
                     , myNodeInfo.syncPort
-                    , myNodeInfo.creatorPNid
                     , myNodeInfo.creator
                     , myNodeInfo.creatorShellPid
                     , myNodeInfo.creatorShellVerifier );
@@ -3774,10 +3938,33 @@ void CCluster::ReIntegrateSock( int initProblem )
             otherMonRank_[nodeInfo[i].pnid] = 0;
             ++CurNodes;
 
-            Node[nodeInfo[i].pnid]->SetCommPort( nodeInfo[i].commPort );
-            Node[nodeInfo[i].pnid]->SetSyncPort( nodeInfo[i].syncPort );
+            // Store port numbers for the node
+            strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME);
+            strncpy(syncPort, nodeInfo[i].syncPort, MPI_MAX_PORT_NAME);
+
+            Node[nodeInfo[i].pnid]->SetCommPort( commPort );
+            pch1 = strtok (commPort,":");
+            pch1 = strtok (NULL,":");
+            Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+
+            Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
+            pch2 = strtok (syncPort,":");
+            pch2 = strtok (NULL,":");
+            Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+            sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
+
             Node[nodeInfo[i].pnid]->SetState( State_Up );
 
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+                            , method_name, __LINE__
+                            , Node[nodeInfo[i].pnid]->GetPNid()
+                            , Node[nodeInfo[i].pnid]->GetName()
+                            , pch1, atoi(pch1)
+                            , pch2, atoi(pch2) );
+            }
+
             // Tell creator we are ready to accept its connection
             int mypnid = MyPNID;
             rc = Monitor->SendSock( (char *) &mypnid
@@ -3897,10 +4084,34 @@ void CCluster::ReIntegrateSock( int initProblem )
 
             otherMonRank_[nodeInfo[i].pnid] = 0;
             ++CurNodes;
-            Node[nodeInfo[i].pnid]->SetCommPort( nodeInfo[i].commPort );
-            Node[nodeInfo[i].pnid]->SetSyncPort( nodeInfo[i].syncPort );
+
+            // Store port numbers for the node
+            strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME);
+            strncpy(syncPort, nodeInfo[i].syncPort, MPI_MAX_PORT_NAME);
+        
+            Node[nodeInfo[i].pnid]->SetCommPort( commPort );
+            pch1 = strtok (commPort,":");
+            pch1 = strtok (NULL,":");
+            Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+        
+            Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
+            pch2 = strtok (syncPort,":");
+            pch2 = strtok (NULL,":");
+            Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+            sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
+
             Node[nodeInfo[i].pnid]->SetState( State_Up );
 
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+                            , method_name, __LINE__
+                            , Node[nodeInfo[i].pnid]->GetPNid()
+                            , Node[nodeInfo[i].pnid]->GetName()
+                            , pch1, atoi(pch1)
+                            , pch2, atoi(pch2) );
+            }
+
             // Connect to existing monitor
             existingSyncFd = AcceptSyncSock();
             if ( existingSyncFd < 0 )
@@ -3969,9 +4180,10 @@ void CCluster::ReIntegrateSock( int initProblem )
         }
         for ( int i =0; i < pnodeCount; i++ )
         {
-            trace_printf( "%s@%d socks_[%d]=%d\n"
+            trace_printf( "%s@%d socks_[%d]=%d, sockPorts_[%d]=%d\n"
                         , method_name, __LINE__
-                        , nodeInfo[i].pnid, socks_[nodeInfo[i].pnid]);
+                        , nodeInfo[i].pnid, socks_[nodeInfo[i].pnid]
+                        , nodeInfo[i].pnid, sockPorts_[nodeInfo[i].pnid]);
         }
         for ( int i =0; i < MAX_NODE_MASKS ; i++ )
         {
@@ -4224,16 +4436,24 @@ void CCluster::setNewSock( int pnid )
                 --CurNodes;
             }
 
-            if (trace_settings & TRACE_RECOVERY)
-            {
-                trace_printf("%s@%d - setting new communicator for pnid %d, "
-                             "otherRank=%d\n",
-                             method_name, __LINE__, it->pnid, it->otherRank);
-            }
-
+            CNode *node= Nodes->GetNode( it->pnid );
             socks_[it->pnid] = it->socket;
+            sockPorts_[it->pnid] = node->GetSyncSocketPort();
             otherMonRank_[it->pnid] = it->otherRank;
             ++CurNodes;
+
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d - Setting new communicator for %d (%s), "
+                              "socks_[%d]=%d, sockPorts_[%d]=%d, otherMonRank_[%d]=%d\n"
+                            , method_name, __LINE__
+                            , node->GetPNid()
+                            , node->GetName()
+                            , it->pnid, socks_[it->pnid]
+                            , it->pnid, sockPorts_[it->pnid]
+                            , it->pnid, otherMonRank_[it->pnid] );
+            }
+
             // Set bit indicating node is up
             upNodes_.upNodes[it->pnid/MAX_NODE_BITMASK] |= (1ull << (it->pnid%MAX_NODE_BITMASK));
 
@@ -4419,20 +4639,9 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
     const char method_name[] = "CCluster::AllgatherSock";
     TRACE_ENTRY;
 
+    bool reconnecting = false;
     static int hdrSize = Nodes->GetSyncHdrSize( );
     int err = MPI_SUCCESS;
-    typedef struct
-    {
-        int p_sent;
-        int p_received;
-        int p_n2recv;
-        bool p_sending;
-        bool p_receiving;
-        int p_timeout_count;
-        bool p_initial_check;
-        char *p_buff;
-        struct timespec znodeFailedTime;
-    } peer_t;
     peer_t p[GetConfigPNodesMax()];
     memset( p, 0, sizeof(p) );
     tag = 0; // make compiler happy
@@ -4442,12 +4651,12 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
                                 ? (ZClient->GetSessionTimeout() * 2) : 120;
 
     int nsent = 0, nrecv = 0;
-    for ( int iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
+    for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
     {
-        peer_t *peer = &p[iPeer];
-        stats[iPeer].MPI_ERROR = MPI_SUCCESS;
-        stats[iPeer].count = 0;
-        if ( iPeer == MyPNID || socks_[iPeer] == -1 )
+        peer_t *peer = &p[indexToPnid_[iPeer]];
+        stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_SUCCESS;
+        stats[indexToPnid_[iPeer]].count = 0;
+        if ( indexToPnid_[iPeer] == MyPNID || socks_[indexToPnid_[iPeer]] == -1 )
         {
             peer->p_sending = peer->p_receiving = false;
             nsent++;
@@ -4460,11 +4669,31 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
             peer->p_timeout_count = 0;
             peer->p_initial_check = true;
             peer->p_n2recv = -1;
-            peer->p_buff = ((char *) rbuf) + (iPeer * CommBufSize);
+            peer->p_buff = ((char *) rbuf) + (indexToPnid_[iPeer] * CommBufSize);
+
             struct epoll_event event;
-            event.data.fd = socks_[iPeer];
+            event.data.fd = socks_[indexToPnid_[iPeer]];
             event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
-            EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[iPeer], &event );
+            EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[iPeer]], &event );
+        }
+    }
+
+    if (trace_settings & (TRACE_SYNC | TRACE_SYNC_DETAIL))
+    {
+        for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+        {
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                peer_t *peer = &p[indexToPnid_[i]];
+                trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                              "peer->p_sending=%d, "
+                              "peer->p_receiving=%d\n"
+                            , method_name, __LINE__
+                            , indexToPnid_[i]
+                            , socks_[indexToPnid_[i]]
+                            , peer->p_sending
+                            , peer->p_receiving );
+            }
         }
     }
 
@@ -4493,8 +4722,8 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
         else
         {
             // default to 64 seconds
-            sv_epoll_wait_timeout = 4000;
-            sv_epoll_retry_count = 16;
+            sv_epoll_wait_timeout = 16000;
+            sv_epoll_retry_count = 4;
         }
 
         char buf[MON_STRING_BUF_SIZE];
@@ -4512,38 +4741,128 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
     struct epoll_event events[2*GetConfigPNodesMax() + 1];
     while ( 1 )
     {
-        int maxEvents = 2*GetConfigPNodesMax() - nsent - nrecv;
+reconnected:
+        int maxEvents = 2*GetConfigPNodesCount() - nsent - nrecv;
         if ( maxEvents == 0 ) break;
         int nw;
         int zerr = ZOK;
+
         while ( 1 )
         {
             nw = epoll_wait( epollFD_, events, maxEvents, sv_epoll_wait_timeout );
             if ( nw >= 0 || errno != EINTR ) break;
         }
+
         if ( nw == 0 )
-        {
-            for ( int iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
+        { // Timeout, no fd's ready
+            for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
             {
-                peer_t *peer = &p[iPeer];
-                if ( (iPeer != MyPNID) &&
-                    (socks_[iPeer] != -1) )
+                peer_t *peer = &p[indexToPnid_[iPeer]];
+                if ( (indexToPnid_[iPeer] != MyPNID) && (socks_[indexToPnid_[iPeer]] != -1) )
                 {
-                    if ( (peer->p_receiving) ||
-                        (peer->p_sending) )
+                    if ( (peer->p_receiving) || (peer->p_sending) )
                     {
                         if ( ! ZClientEnabled )
                         {
-                            peer->p_timeout_count++;
+                            if (peer->p_initial_check && !reconnecting)
+                            {
+                                peer->p_initial_check = false;
+                                clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
+                                peer->znodeFailedTime.tv_sec += sessionTimeout;
+                                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                {
+                                    trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n"
+                                                , method_name, __LINE__
+                                                , peer->znodeFailedTime.tv_sec);
+                                }
+                            }
     
                             if ( peer->p_timeout_count < sv_epoll_retry_count )
                             {
+                                peer->p_timeout_count++;
+
+                                if (IsRealCluster)
+                                {
+                                    if (trace_settings & TRACE_RECOVERY)
+                                    {
+                                        trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+                                                      " timeout count=%d,"
+                                                      " sending=%d,"
+                                                      " receiving=%d\n"
+                                                    , method_name, __LINE__
+                                                    , Node[indexToPnid_[iPeer]]->GetName(), iPeer
+                                                    , peer->p_timeout_count
+                                                    , peer->p_sending
+                                                    , peer->p_receiving);
+                                    }
+                                    // Attempt reconnect to all peers
+                                    err = AllgatherSockReconnect( stats );
+                                    // Redrive IOs on live peer connections
+                                    nsent = 0; nrecv = 0;
+                                    for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+                                    {
+                                        peer_t *peer = &p[indexToPnid_[i]];
+                                        if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+                                        {
+                                            peer->p_sending = peer->p_receiving = false;
+                                            nsent++;
+                                            nrecv++;
+                                        }
+                                        else
+                                        {
+                                            peer->p_sending = peer->p_receiving = true;
+                                            peer->p_sent = peer->p_received = 0;
+                                            peer->p_timeout_count = 0;
+                                            peer->p_n2recv = -1;
+                                            peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+                                
+                                            struct epoll_event event;
+                                            event.data.fd = socks_[indexToPnid_[i]];
+                                            event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+                                            EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event );
+                                        }
+                                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                        {
+                                            trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                                                          "peer->p_sending=%d, "
+                                                          "peer->p_receiving=%d\n"
+                                                        , method_name, __LINE__
+                                                        , indexToPnid_[i]
+                                                        , socks_[indexToPnid_[i]]
+                                                        , peer->p_sending
+                                                        , peer->p_receiving );
+                                        }
+                                    }
+                                    reconnectSeqNum_ = seqNum_;
+                                    reconnecting = true;
+                                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                    {
+                                        trace_printf( "%s@%d" " - Reconnecting!\n"
+                                                    , method_name, __LINE__ );
+                                    }
+                                    goto reconnected;
+                                }
                                 continue;
                             }
+                            if (trace_settings & TRACE_RECOVERY)
+                            {
+                                trace_printf( "%s@%d - Peer timeout triggered: "
+                                              "peer->p_timeout_count %d, "
+                                              "sv_epoll_retry_count %d\n"
+                                              "        socks_[%d]=%d\n"
+                                              "        stats[%d].MPI_ERROR=%s\n"
+                                            , method_name, __LINE__
+                                            , peer->p_timeout_count
+                                            , sv_epoll_retry_count
+                                            , indexToPnid_[iPeer]
+                                            , socks_[indexToPnid_[iPeer]]
+                                            , iPeer
+                                            , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
+                            }
                         }
                         else
                         {
-                            if (peer->p_initial_check)
+                            if (peer->p_initial_check && !reconnecting)
                             {
                                 peer->p_initial_check = false;
                                 clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
@@ -4557,7 +4876,7 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
                                 
                             }
                             // If not expired, stay in the loop
-                            if ( ! ZClient->IsZNodeExpired( Node[iPeer]->GetName(), zerr ))
+                            if ( ! ZClient->IsZNodeExpired( Node[indexToPnid_[iPeer]]->GetName(), zerr ))
                             {
                                 if ( zerr == ZCONNECTIONLOSS || zerr == ZOPERATIONTIMEOUT )
                                 {
@@ -4572,10 +4891,71 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
    
                                         if ( peer->p_timeout_count < sv_epoll_retry_count )
                                         {
+                                            if (IsRealCluster)
+                                            {
+                                                if (trace_settings & TRACE_RECOVERY)
+                                                {
+                                                    trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+                                                                  " timeout count=%d,"
+                                                                  " sending=%d,"
+                                                                  " receiving=%d\n"
+                                                                , method_name, __LINE__
+                                                                , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                                                , peer->p_timeout_count
+                                                                , peer->p_sending
+                                                                , peer->p_receiving);
+                                                }
+                                                // Attempt reconnect to all peers
+                                                err = AllgatherSockReconnect( stats );
+                                                // Redrive IOs on live peer connections
+                                                nsent = 0; nrecv = 0;
+                                                for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+                                                {
+                                                    peer_t *peer = &p[indexToPnid_[i]];
+                                                    if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+                                                    {
+                                                        peer->p_sending = peer->p_receiving = false;
+                                                        nsent++;
+                                                        nrecv++;
+                                                    }
+                                                    else
+                                                    {
+                                                        peer->p_sending = peer->p_receiving = true;
+                                                        peer->p_sent = peer->p_received = 0;
+                                                        peer->p_timeout_count = 0;
+                                                        peer->p_n2recv = -1;
+                                                        peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+                                            
+                                                        struct epoll_event event;
+                                                        event.data.fd = socks_[indexToPnid_[i]];
+                                                        event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+                                                        EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
+                                                    }
+                                                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                                    {
+                                                        trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                                                                      "peer->p_sending=%d, "
+                                                                      "peer->p_receiving=%d\n"
+                                                                    , method_name, __LINE__
+                                                                    , indexToPnid_[i]
+                                                                    , socks_[indexToPnid_[i]]
+                                                                    , peer->p_sending
+                                                                    , peer->p_receiving );
+                                                    }
+                                                }
+                                                reconnectSeqNum_ = seqNum_;
+                                                reconnecting = true;
+                                                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                                {
+                                                    trace_printf( "%s@%d" " - Reconnecting!\n"
+                                                                , method_name, __LINE__ );
+                                                }
+                                                goto reconnected;
+                                            }
                                             continue;
                                         }
                                     }
-                                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                    if (trace_settings & TRACE_RECOVERY)
                                     {
                                         trace_printf( "%s@%d - Znode Failed triggered\n"
                                                       "        Current Time    %ld(secs)\n"
@@ -4592,58 +4972,99 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
 
                                     if ( peer->p_timeout_count < sv_epoll_retry_count )
                                     {
+                                        if (IsRealCluster)
+                                        {
+                                            if (trace_settings & TRACE_RECOVERY)
+                                            {
+                                                trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+                                                              " timeout count=%d,"
+                                                              " sending=%d,"
+                                                              " receiving=%d\n"
+                                                            , method_name, __LINE__
+                                                            , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                                            , peer->p_timeout_count
+                                                            , peer->p_sending
+                                                            , peer->p_receiving);
+                                            }
+                                            // Attempt reconnect to all peers
+                                            err = AllgatherSockReconnect( stats );
+                                            // Redrive IOs on live peer connections
+                                            nsent = 0; nrecv = 0;
+                                            for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+                                            {
+                                                peer_t *peer = &p[indexToPnid_[i]];
+                                                if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+                                                {
+                                                    peer->p_sending = peer->p_receiving = false;
+                                                    nsent++;
+                                                    nrecv++;
+                                                }
+                                                else
+                                                {
+                                                    peer->p_sending = peer->p_receiving = true;
+                                                    peer->p_sent = peer->p_received = 0;
+                                                    peer->p_timeout_count = 0;
+                                                    peer->p_n2recv = -1;
+                                                    peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+                                        
+                                                    struct epoll_event event;
+                                                    event.data.fd = socks_[indexToPnid_[i]];
+                                                    event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+                                                    EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
+                                                }
+                                                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                                {
+                                                    trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                                                                  "peer->p_sending=%d, "
+                                                                  "peer->p_receiving=%d\n"
+                                                                , method_name, __LINE__
+                                                                , indexToPnid_[i]
+                                                                , socks_[indexToPnid_[i]]
+                                                                , peer->p_sending
+                                                                , peer->p_receiving );
+                                                }
+                                            }
+                                            reconnectSeqNum_ = seqNum_;
+                                            reconnecting = true;
+                                            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                            {
+                                                trace_printf( "%s@%d" " - Reconnecting!\n"
+                                                            , method_name, __LINE__ );
+                                            }
+                                            goto reconnected;
+                                        }
                                         continue;
                                     }
                                 }
                             }
                         }
 
-                        char buf[MON_STRING_BUF_SIZE];
-                        snprintf( buf, sizeof(buf)
-                                , "[%s@%d] Not heard from peer=%d (node=%s) "
-                                  "(current seq # is %lld)\n"
-                                , method_name
-                                ,  __LINE__
-                                , iPeer
-                                , Node[iPeer]->GetName()
-                                , seqNum_ );
-                        mon_log_write( MON_CLUSTER_ALLGATHERSOCK_1, SQ_LOG_CRIT, buf );
-
-                        stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
-                        err = MPI_ERR_IN_STATUS;
-                        if ( peer->p_sending )
-                        {
-                            peer->p_sending = false;
-                            nsent++;
-                        }
-                        if ( peer->p_receiving )
+                        if (trace_settings & TRACE_RECOVERY)
                         {
-                            peer->p_receiving = false;
-                            nrecv++;
+                            trace_printf( "%s@%d - err=%d, socks_[%d]=%d, stats[%d].MPI_ERROR=%s\n"
+                                        , method_name, __LINE__
+                                        , err
+                                        , indexToPnid_[iPeer]
+                                        , socks_[indexToPnid_[iPeer]]
+                                        , indexToPnid_[iPeer]
+                                        , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
                         }
 
-                        // setup the epoll structures 
-                        struct epoll_event event;
-                        event.data.fd = socks_[iPeer];
-                        int op = 0;
-                        if ( !peer->p_sending && !peer->p_receiving )
-                        {
-                            op = EPOLL_CTL_DEL;
-                            event.events = 0;
-                        }
-                        else if ( peer->p_sending )
-                        {
-                            op = EPOLL_CTL_MOD;
-                            event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
-                        }
-                        else if ( peer->p_receiving )
+                        if ( err == MPI_ERR_IN_STATUS
+                          && stats[indexToPnid_[iPeer]].MPI_ERROR == MPI_ERR_EXITED)
                         {
-                            op = EPOLL_CTL_MOD;
-                            event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
-                        }
-                        if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD )
-                        {
-                            EpollCtl( epollFD_, op, socks_[iPeer], &event );
+                            // At this point, this peer is not responding and
+                            // reconnects failed or its znode expired
+                            char buf[MON_STRING_BUF_SIZE];
+                            snprintf( buf, sizeof(buf)
+                                    , "[%s@%d] Not heard from peer=%d (node=%s) "
+                                      "(current seq # is %lld)\n"
+                                    , method_name
+                                    ,  __LINE__
+                                    , indexToPnid_[iPeer]
+                                    , Node[indexToPnid_[iPeer]]->GetName()
+                                    , seqNum_ );
+                            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
                         }
                     }
                 }
@@ -4651,35 +5072,43 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
         }
  
         if ( nw < 0 )
-        {
+        { // Got an error
             char ebuff[256];
             char buf[MON_STRING_BUF_SIZE];
             snprintf( buf, sizeof(buf), "[%s@%d] epoll_wait(%d, %d) error: %s\n",
                 method_name, __LINE__, epollFD_, maxEvents,
                 strerror_r( errno, ebuff, 256 ) );
-            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
+            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
             MPI_Abort( MPI_COMM_SELF,99 );
         }
+
+        // Process fd's which are ready to initiate an IO or completed IO
         for ( int iEvent = 0; iEvent < nw; iEvent++ )
         {
             bool stateChange = false;
             int fd = events[iEvent].data.fd;
             int iPeer;
-            for ( iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
-            {
-                if ( events[iEvent].data.fd == socks_[iPeer] ) break;
+            for ( iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
+            { // Find corresponding peer by matching socket fd
+                if ( events[iEvent].data.fd == socks_[indexToPnid_[iPeer]] ) break;
             }
-            if ( iPeer < 0 || iPeer >= GetConfigPNodesMax() || iPeer == MyPNID
-                || socks_[iPeer] == -1
-                || (!p[iPeer].p_sending && !p[iPeer].p_receiving) )
+            if ( indexToPnid_[iPeer] < 0 || indexToPnid_[iPeer] >= GetConfigPNodesMax() || indexToPnid_[iPeer] == MyPNID
+                || socks_[indexToPnid_[iPeer]] == -1
+                || (!p[indexToPnid_[iPeer]].p_sending && !p[indexToPnid_[iPeer]].p_receiving) )
             {
                 char buf[MON_STRING_BUF_SIZE];
-                snprintf( buf, sizeof(buf), "[%s@%d] invalid peer %d\n",
-                    method_name, __LINE__, iPeer );
-                mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
+                snprintf( buf, sizeof(buf)
+                        , "[%s@%d] Invalid peer %d, "
+                          "peer.p_sending=%d, "
+                          "peer.p_receiving=%d\n"
+                        , method_name, __LINE__
+                        , indexToPnid_[iPeer]
+                        , indexToPnid_[iPeer] >= GetConfigPNodesMax()?-1:p[indexToPnid_[iPeer]].p_sending
+                        , indexToPnid_[iPeer] >= GetConfigPNodesMax()?-1:p[indexToPnid_[iPeer]].p_receiving );
+                mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf );
                 MPI_Abort( MPI_COMM_SELF,99 );
             }
-            peer_t *peer = &p[iPeer];
+            peer_t *peer = &p[indexToPnid_[iPeer]];
             if ( (events[iEvent].events & EPOLLERR) ||
                  (events[iEvent].events & EPOLLHUP) ||
                  ( !(events[iEvent].events & (EPOLLIN|EPOLLOUT))) )
@@ -4690,13 +5119,13 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
                 snprintf( buf, sizeof(buf)
                         , "[%s@%d] Error: peer=%d, events[%d].data.fd=%d, event[%d]=%s\n"
                         , method_name, __LINE__
-                        , iPeer
+                        , indexToPnid_[iPeer]
                         , iEvent
                         , events[iEvent].data.fd
                         , iEvent
                         , EpollEventString(events[iEvent].events) );
-                mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf );
-                stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+                mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf );
+                stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
                 err = MPI_ERR_IN_STATUS;
                 if ( peer->p_sending )
                 {
@@ -4708,12 +5137,11 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
                     peer->p_receiving = false;
                     nrecv++;
                 }
-                stateChange = 1;
+                stateChange = true;
                 goto early_exit;
             }
-            if ( peer->p_receiving
-                && events[iEvent].events & EPOLLIN )
-            {
+            if ( peer->p_receiving && events[iEvent].events & EPOLLIN )
+            { // Got receive (read) completion
                 int eagain_ok = 0;
 read_again:
                 char *r = &peer->p_buff[peer->p_received];
@@ -4729,6 +5157,26 @@ read_again:
                 int nr;
                 while ( 1 )
                 {
+                    if (trace_settings & TRACE_SYNC_DETAIL)
+                    {
+                        trace_printf( "%s@%d - EPOLLIN from %s(%d),"
+                                      " sending=%d,"
+                                      " receiving=%d (%d)"
+                                      " sent=%d,"
+                                      " received=%d"
+                                      " timeout_count=%d,"
+                                      " initial_check=%d,"
+                                      " n2recv=%d\n"
+                                    , method_name, __LINE__
+                                    , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                    , peer->p_sending
+                                    , peer->p_receiving, n2get
+                                    , peer->p_sent
+                                    , peer->p_received
+                                    , peer->p_timeout_count
+                                    , peer->p_initial_check
+                                    , peer->p_n2recv );
+                    }
                     nr = recv( fd, r, n2get, 0 );
                     if ( nr >= 0 || errno == EINTR ) break;
                 }
@@ -4746,8 +5194,8 @@ read_again:
                         snprintf( buf, sizeof(buf)
                                 , "[%s@%d] recv[%d](%d) error %d (%s)\n"
                                 , method_name, __LINE__
-                                , iPeer, nr , err, strerror(err) );
-                        mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf );
+                                , indexToPnid_[iPeer], nr , err, strerror(err) );
+                        mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf );
                         peer->p_receiving = false;
                         nrecv++;
                         if ( peer->p_sending )
@@ -4755,7 +5203,7 @@ read_again:
                             peer->p_sending = false;
                             nsent++;
                         }
-                        stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+                        stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
                         err = MPI_ERR_IN_STATUS;
                         stateChange = true;
                     }
@@ -4792,7 +5240,7 @@ read_again:
                             snprintf( buf, sizeof(buf),
                                 "[%s@%d] error n2recv %d\n",
                                 method_name, __LINE__, peer->p_n2recv );
-                            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf );
+                            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_7, SQ_LOG_CRIT, buf );
                             MPI_Abort( MPI_COMM_SELF,99 );
                         }
                         if ( peer->p_n2recv == 0 )
@@ -4800,20 +5248,59 @@ read_again:
                             // this buffer is done
                             peer->p_receiving = false;
                             nrecv++;
-                            stats[iPeer].count = peer->p_received;
+                            stats[indexToPnid_[iPeer]].count = peer->p_received;
+                            if (trace_settings & TRACE_SYNC_DETAIL)
+                            {
+                                trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+                                              " sending=%d,"
+                                              " receiving=%d (%d)"
+                                              " sent=%d,"
+                                              " received=%d"
+                                              " timeout_count=%d,"
+                                              " initial_check=%d,"
+                                              " n2recv=%d\n"
+                                            , method_name, __LINE__
+                                            , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                            , peer->p_sending
+                                            , peer->p_receiving, n2get
+                                            , peer->p_sent
+                                            , peer->p_received
+                                            , peer->p_timeout_count
+                                            , peer->p_initial_check
+                                            , peer->p_n2recv );
+                            }
                             stateChange = true;
                         }
                     }
                 }
             }
-            if ( peer->p_sending
-                && events[iEvent].events & EPOLLOUT )
-            {
+            if ( peer->p_sending  && events[iEvent].events & EPOLLOUT )
+            { // Got send (write) completion
                 char *s = &((char *)sbuf)[peer->p_sent];
                 int n2send = nbytes - peer->p_sent;
                 int ns;
                 while ( 1 )
                 {
+                    if (trace_settings & TRACE_SYNC_DETAIL)
+                    {
+                        trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+                                      " sending=%d (%d),"
+                                      " receiving=%d"
+                                      " sent=%d,"
+                                      " received=%d"
+                                      " timeout_count=%d,"
+                                      " initial_check=%d,"
+                                      " n2recv=%d\n"
+                                    , method_name, __LINE__
+                                    , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                    , peer->p_sending, n2send
+                                    , peer->p_receiving
+                                    , peer->p_sent
+                                    , peer->p_received
+                                    , peer->p_timeout_count
+                                    , peer->p_initial_check
+                                    , peer->p_n2recv );
+                    }
                     ns = send( fd, s, n2send, 0 );
                     if ( ns >= 0 || errno != EINTR ) break;
                 }
@@ -4825,8 +5312,8 @@ read_again:
                     snprintf( buf, sizeof(buf)
                             , "[%s@%d] send[%d](%d) error=%d (%s)\n"
                             , method_name, __LINE__
-                            , iPeer, ns, err, strerror(err) );
-                    mon_log_write( MON_CLUSTER_ALLGATHERSOCK_7, SQ_LOG_CRIT, buf );
+                            , indexToPnid_[iPeer], ns, err, strerror(err) );
+                    mon_log_write( MON_CLUSTER_ALLGATHERSOCK_8, SQ_LOG_CRIT, buf );
                     peer->p_sending = false;
                     nsent++;
                     if ( peer->p_receiving )
@@ -4834,7 +5321,7 @@ read_again:
                         peer->p_receiving = false;
                         nrecv++;
                     }
-                    stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+                    stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
                     err = MPI_ERR_IN_STATUS;
                     stateChange = true;
                 }
@@ -4846,6 +5333,26 @@ read_again:
                         // finished sending to this destination
                         peer->p_sending = false;
                         nsent++;
+                        if (trace_settings & TRACE_SYNC_DETAIL)
+                        {
+                            trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+                                          " sending=%d (%d),"
+                                          " receiving=%d"
+                                          " sent=%d,"
+                                          " received=%d"
+                                          " timeout_count=%d,"
+                                          " initial_check=%d,"
+                                          " n2recv=%d\n"
+                                        , method_name, __LINE__
+                                        , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                        , peer->p_sending, n2send
+                                        , peer->p_receiving
+                                        , peer->p_sent
+                                        , peer->p_received
+                                        , peer->p_timeout_count
+                                        , peer->p_initial_check
+                                        , peer->p_n2recv );
+                        }
                         stateChange = true;
                     }
                 }
@@ -4854,7 +5361,7 @@ early_exit:
             if ( stateChange )
             {
                 struct epoll_event event;
-                event.data.fd = socks_[iPeer];
+                event.data.fd = socks_[indexToPnid_[iPeer]];
                 int op = 0;
                 if ( !peer->p_sending && !peer->p_receiving )
                 {
@@ -4888,49 +5395,465 @@ early_exit:
     return err;
 }
 
-// When we get a communication error for a point-to-point monitor communicator
-// verify that the other nodes in the cluster also lost communications
-// with that monitor.  If all nodes concur we consider that monitor
-// down.
-void CCluster::ValidateClusterState( cluster_state_def_t nodestate[],
-                                     bool haveDivergence)
+int CCluster::AllgatherSockReconnect( MPI_Status *stats )
 {
-    const char method_name[] = "CCluster::ValidateClusterState";
-
-    exitedMons_t::iterator it;
-    upNodes_t nodeMask;
+    const char method_name[] = "CCluster::AllgatherSockReconnect";
+    TRACE_ENTRY;
 
-    for ( int i =0; i < MAX_NODE_MASKS ; i++ )
-    {
-        nodeMask.upNodes[i] = 0;
-    }
+    int err = MPI_SUCCESS;
+    int idst;
+    int reconnectSock = -1;
+    CNode *node;
 
-    for ( it = exitedMons_.begin(); it != exitedMons_.end(); )
+    // Loop on each node in the cluster
+    for ( int i = 0; i < GetConfigPNodesMax(); i++ )
     {
-        if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT))
-        {
-            trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld"
-                         " (current seqNum_=%lld)\n", method_name, __LINE__,
-                         it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_);
-        }
-
-        if ( seqNum_ >= (it->seqNum + 2) )
+        // Loop on each adjacent node in the cluster
+        for ( int j = i+1; j < GetConfigPNodesMax(); j++ )
         {
-            char buf[MON_STRING_BUF_SIZE];
-            snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, "
-                      "detected by node %d at seq #%lld "
-                      "(current seq # is %lld).\n",
-                      method_name, it->exitedPnid, it->detectingPnid,
-                      it->seqNum, seqNum_);
-            mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf);
-
-            int concurringNodes = 0;
+            if ( i == MyPNID )
+            { // Current [i] node is my node, so connect to [j] node
 
-            // Check if all active nodes see the node as down.
-            nodeMask.upNodes[it->exitedPnid/MAX_NODE_BITMASK] = 1ull << (it->exitedPnid%MAX_NODE_BITMASK);
-            string setSeesUp;
-            string setSeesDown;
-            char nodeX[10];
+                idst = j;
+                node = Nodes->GetNode( idst );
+                if (!node) continue;
+                if (node->GetState() != State_Up)
+                {
+                    if (socks_[idst] != -1)
+                    { // Peer socket is still active
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                          "removing old socket from epoll set, "
+                                          "socks_[%d]=%d\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst, socks_[idst] );
+                        }
+                        // Remove old socket from epoll set, it may not be there
+                        struct epoll_event event;
+                        event.data.fd = socks_[idst];
+                        event.events = 0;
+                        EpollCtlDelete( epollFD_, socks_[idst], &event );
+                        socks_[idst] = -1;
+                    }
+                    continue;
+                }
+                reconnectSock = ConnectSockPeer( node, idst );
+                if (reconnectSock == -1)
+                {
+                    stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+                    stats[idst].count = 0;
+                    err = MPI_ERR_IN_STATUS;
+                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                    {
+                        trace_printf( "%s@%d - Setting Node %s (%d) status to "
+                                      "stats[%d].MPI_ERROR=%s\n"
+                                    , method_name, __LINE__
+                                    , node->GetName(), node->GetPNid()
+                                    , idst
+                                    , ErrorMsg(stats[idst].MPI_ERROR) );
+                    }
+                    if (node->GetState() != State_Up)
+                    {
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                          "removing old socket from epoll set, "
+                                          "socks_[%d]=%d\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst, socks_[idst] );
+                        }
+                        // Remove old socket from epoll set, it may not be there
+                        struct epoll_event event;
+                        event.data.fd = socks_[idst];
+                        event.events = 0;
+                        EpollCtlDelete( epollFD_, socks_[idst], &event );
+                        socks_[idst] = -1;
+                    }
+                }
+            }
+            else if ( j == MyPNID )
+            { // Current [j] is my node, accept connection from peer [i] node
+
+                idst = i;
+                node = Nodes->GetNode( idst );
+                if (!node) continue;
+                if (node->GetState() != State_Up)
+                {
+                    if (socks_[idst] != -1)
+                    { // Peer socket is still active
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                          "removing old socket from epoll set, "
+                                          "socks_[%d]=%d\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst, socks_[idst] );
+                        }
+                        // Remove old socket from epoll set, it may not be there
+                        struct epoll_event event;
+                        event.data.fd = socks_[idst];
+                        event.events = 0;
+                        EpollCtlDelete( epollFD_, socks_[idst], &event );
+                        socks_[idst] = -1;
+                    }
+                    continue;
+                }
+                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                {
+                    trace_printf( "%s@%d - Pinging Node %s (%d) to see if it's up\n"
+                                , method_name, __LINE__
+                                , node->GetName(), node->GetPNid() );
+                }
+                if (PingSockPeer(node))
+                {
+                    reconnectSock = AcceptSockPeer( node, idst );
+                    if (reconnectSock == -1)
+                    {
+                        stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+                        stats[idst].count = 0;
+                        err = MPI_ERR_IN_STATUS;
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Setting Node %s (%d) status to "
+                                          "stats[%d].MPI_ERROR=%s\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst
+                                        , ErrorMsg(stats[idst].MPI_ERROR) );
+                        }
+                        if (node->GetState() != State_Up)
+                        {
+                            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                            {
+                                trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                              "removing old socket from epoll set, "
+                                              "socks_[%d]=%d\n"
+                                            , method_name, __LINE__
+                                            , node->GetName(), node->GetPNid()
+                                            , idst, socks_[idst] );
+                            }
+                            // Remove old socket from epoll set, it may not be there
+                            struct epoll_event event;
+                            event.data.fd = socks_[idst];
+                            event.events = 0;
+                            EpollCtlDelete( epollFD_, socks_[idst], &event );
+                            socks_[idst] = -1;
+                        }
+                    }
+                }
+                else
+                {
+                    if (socks_[idst] != -1)
+                    {
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                          "removing old socket from epoll set, "
+                                          "socks_[%d]=%d\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst, socks_[idst] );
+                        }
+                        // Remove old socket from epoll set, it may not be there
+                        struct epoll_event event;
+                        event.data.fd = socks_[idst];
+                        event.events = 0;
+                        EpollCtlDelete( epollFD_, socks_[idst], &event );
+                    }
+                    reconnectSock = -1;
+                    stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+                    stats[idst].count = 0;
+                    err = MPI_ERR_IN_STATUS;
+                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                    {
+                        trace_printf( "%s@%d - Setting Node %s (%d) status to "
+                                      "stats[%d].MPI_ERROR=%s\n"
+                                    , method_name, __LINE__
+                                    , node->GetName(), node->GetPNid()
+                                    , idst
+                                    , ErrorMsg(stats[idst].MPI_ERROR) );
+                    }
+                }
+            }
+            else
+            {
+                idst = -1;
+            }
+            if ( idst >= 0 
+              && reconnectSock != -1
+              && fcntl( socks_[idst], F_SETFL, O_NONBLOCK ) )
+            {
+                err = MPI_ERR_AMODE;
+                char ebuff[256];
+                char buf[MON_STRING_BUF_SIZE];
+                snprintf( buf, sizeof(buf), "[%s@%d] fcntl(NONBLOCK) error: %s\n",
+                    method_name, __LINE__, strerror_r( errno, ebuff, 256 ) );
+                mon_log_write( MON_CLUSTER_ALLGATHERSOCKRECONN_1, SQ_LOG_CRIT, buf );
+            }
+        }
+    }
+
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+        {
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                              "stats[%d].MPI_ERROR=%s\n"
+                            , method_name, __LINE__
+                            , indexToPnid_[i]
+                            , socks_[indexToPnid_[i]]
+                            , indexToPnid_[i]
+                            , ErrorMsg(stats[indexToPnid_[i]].MPI_ERROR) );
+            }
+        }
+        trace_printf( "%s@%d - Returning err=%d\n"
+                    , method_name, __LINE__, err );
+    }
+
+    TRACE_EXIT;
+    return( err );
+}
+
+int CCluster::AcceptSockPeer( CNode *node, int peer )
+{
+    const char method_name[] = "CCluster::AcceptSockPeer";
+    TRACE_ENTRY;
+
+    int rc = MPI_SUCCESS;
+    int reconnectSock = -1;
+    unsigned char srcaddr[4];
+    struct hostent *he;
+
+    // Get my host structure via my node name
+    he = gethostbyname( MyNode->GetName() );
+    if ( !he )
+    {
+        char ebuff[256];
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s@%d] gethostbyname(%s) error: %s\n"
+                , method_name, __LINE__
+                , MyNode->GetName()
+                , strerror_r( errno, ebuff, 256 ) );
+        mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_1, SQ_LOG_CRIT, buf );
+        abort();
+    }
+    else
+    {
+        if (trace_settings & TRACE_RECOVERY)
+        {
+            trace_printf( "%s@%d Accepting server socket: from %s(%d), src=%d.%d.%d.%d, port=%d\n"
+                        , method_name, __LINE__
+                        , node->GetName(), node->GetPNid()
+                        , (int)((unsigned char *)srcaddr)[0]
+                        , (int)((unsigned char *)srcaddr)[1]
+                        , (int)((unsigned char *)srcaddr)[2]
+                        , (int)((unsigned char *)srcaddr)[3]
+                        , MyNode->GetSyncSocketPort() );
+        }
+
+        // Accept connection from peer
+        reconnectSock = AcceptSock( syncSock_ );
+        if (reconnectSock != -1)
+        {
+            if (trace_settings & TRACE_RECOVERY)
+            {
+                trace_printf( "%s@%d Server %s(%d) accepted from client %s(%d), old socks_[%d]=%d, new socks_[%d]=%d\n"
+                            , method_name, __LINE__
+                            , MyNode->GetName(), MyPNID
+                            , node->GetName(), node->GetPNid()
+                            , peer, socks_[peer]
+                            , peer, reconnectSock);
+            }
+        }
+        else
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf), "[%s@%d] AcceptSock(%d) failed!\n",
+                method_name, __LINE__, syncSock_ );
+            mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_2, SQ_LOG_ERR, buf );
+            rc = -1;
+        }
+
+        if (socks_[peer] != -1)
+        {
+            // Remove old socket from epoll set, it may not be there
+            struct epoll_event event;
+            event.data.fd = socks_[peer];
+            event.events = 0;
+            EpollCtlDelete( epollFD_, socks_[peer], &event );
+        }
+        if (reconnectSock != -1)
+        {
+            socks_[peer] = reconnectSock;
+        }
+    }
+
+    TRACE_EXIT;
+    return rc;
+}
+
+int CCluster::ConnectSockPeer( CNode *node, int peer )
+{
+    const char method_name[] = "CCluster::ConnectSockPeer";
+    TRACE_ENTRY;
+
+    int rc = MPI_SUCCESS;
+    int reconnectSock = -1;
+    unsigned char srcaddr[4], dstaddr[4];
+    struct hostent *he;
+
+    // Get my host structure via my node name
+    he = gethostbyname( MyNode->GetName() );
+    if ( !he )
+    {
+        char ebuff[256];
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s@%d] gethostbyname(%s) error: %s\n"
+                , method_name, __LINE__
+                , MyNode->GetName()
+                , strerror_r( errno, ebuff, 256 ) );
+        mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_1, SQ_LOG_CRIT, buf );
+        abort();
+    }
+    else
+    {
+        // Initialize my source address structure
+        memcpy( srcaddr, he->h_addr, 4 );
+        // Get peer's host structure via its node name 
+        he = gethostbyname( node->GetName() );
+        if ( !he )
+        {
+            char ebuff[256];
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf),
+                "[%s@%d] gethostbyname(%s) error: %s\n",
+                method_name, __LINE__, node->GetName(),
+                strerror_r( errno, ebuff, 256 ) );
+            mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_2, SQ_LOG_CRIT, buf );
+            abort();
+        }
+        // Initialize peer's destination address structure
+        memcpy( dstaddr, he->h_addr, 4 );
+
+        if (trace_settings & TRACE_RECOVERY)
+        {
+            trace_printf( "%s@%d Creating client socket: src=%d.%d.%d.%d, "
+                          "dst(%s)=%d.%d.%d.%d, dst port=%d\n"
+                        , method_name, __LINE__
+                        , (int)((unsigned char *)srcaddr)[0]
+                        , (int)((unsigned char *)srcaddr)[1]
+                        , (int)((unsigned char *)srcaddr)[2]
+                        , (int)((unsigned char *)srcaddr)[3]
+                        ,  node->GetName()
+                        , (int)((unsigned char *)dstaddr)[0]
+                        , (int)((unsigned char *)dstaddr)[1]
+                        , (int)((unsigned char *)dstaddr)[2]
+                        , (int)((unsigned char *)dstaddr)[3]
+                        , sockPorts_[peer] );
+        }
+        // Connect to peer
+        reconnectSock = MkCltSock( srcaddr, dstaddr, sockPorts_[peer] );
+        if (reconnectSock != -1)
+        {
+            if (trace_settings & TRACE_RECOVERY)
+            {
+                trace_printf( "%s@%d Client %s(%d) connected to server %s(%d), old socks_[%d]=%d, new socks_[%d]=%d\n"
+                            , method_name, __LINE__
+                            , MyNode->GetName(), MyPNID
+                            , node->GetName(), node->GetPNid()
+                            , peer, socks_[peer]
+                            , peer, reconnectSock);
+            }
+        }
+        else
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf)
+                    , "[%s@%d] MkCltSock() src=%d.%d.%d.%d, "
+                      "dst(%s)=%d.%d.%d.%d failed!\n"
+                    , method_name, __LINE__
+                    , (int)((unsigned char *)srcaddr)[0]
+                    , (int)((unsigned char *)srcaddr)[1]
+                    , (int)((unsigned char *)srcaddr)[2]
+                    , (int)((unsigned char *)srcaddr)[3]
+                    ,  node->GetName()
+                    , (int)((unsigned char *)dstaddr)[0]
+                    , (int)((unsigned char *)dstaddr)[1]
+                    , (int)((unsigned char *)dstaddr)[2]
+                    , (int)((unsigned char *)dstaddr)[3] );
+            mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_3, SQ_LOG_ERR, buf );
+            rc = -1;
+        }
+
+        if (socks_[peer] != -1)
+        {
+            // Remove old socket from epoll set, it may not be there
+            struct epoll_event event;
+            event.data.fd = socks_[peer];
+            event.events = 0;
+            EpollCtlDelete( epollFD_, socks_[peer], &event );
+        }
+        if (reconnectSock != -1)
+        {
+            socks_[peer] = reconnectSock;
+        }
+    }
+
+    TRACE_EXIT;
+    return( rc );
+}
+
+// When we get a communication error for a point-to-point monitor communicator
+// verify that the other nodes in the cluster also lost communications
+// with that monitor.  If all nodes concur we consider that monitor
+// down.
+void CCluster::ValidateClusterState( cluster_state_def_t nodestate[],
+                                     bool haveDivergence)
+{
+    const char method_name[] = "CCluster::ValidateClusterState";
+
+    exitedMons_t::iterator it;
+    upNodes_t nodeMask;
+
+    for ( int i =0; i < MAX_NODE_MASKS ; i++ )
+    {
+        nodeMask.upNodes[i] = 0;
+    }
+
+    for ( it = exitedMons_.begin(); it != exitedMons_.end(); )
+    {
+        if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT))
+        {
+            trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld"
+                         " (current seqNum_=%lld)\n", method_name, __LINE__,
+                         it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_);
+        }
+
+        if ( seqNum_ >= (it->seqNum + 2) )
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, "
+                      "detected by node %d at seq #%lld "
+                      "(current seq # is %lld).\n",
+                      method_name, it->exitedPnid, it->detectingPnid,
+                      it->seqNum, seqNum_);
+            mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf);
+
+            int concurringNodes = 0;
+
+            // Check if all active nodes see the node as down.
+            nodeMask.upNodes[it->exitedPnid/MAX_NODE_BITMASK] = 1ull << (it->exitedPnid%MAX_NODE_BITMASK);
+            string setSeesUp;
+            string setSeesDown;
+            char nodeX[10];
 
             // Evaluate each active (up) node in the cluster
             int pnodesCount = 0;
@@ -5187,8 +6110,10 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
     const char method_name[] = "CCluster::ValidateSeqNum";
 
     unsigned long long seqNum;
-    unsigned long long seqNumBucket[256];
-    int seqNumCount[256];
+    unsigned long long loSeqNum = seqNum_;
+    unsigned long long hiSeqNum = seqNum_;
+    unsigned long long seqNumBucket[MAX_NODES];
+    int seqNumCount[MAX_NODES];
     int maxBucket = 0;
     bool found;
     int mostCountsIndex;
@@ -5198,10 +6123,26 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
     // Count occurrences of sequence numbers from other nodes
     for (int pnid = 0; pnid < GetConfigPNodesMax(); pnid++)
     {
+        CNode *node= Nodes->GetNode( pnid );
+        if (!node) continue;
+        if (node->GetState() != State_Up) continue;
+        
         seqNum = nodestate[pnid].seq_num;
 
+        if (trace_settings & TRACE_SYNC)
+        {
+            trace_printf( "%s@%d seqNum_=%lld, nodestate[%d].seq_num=%lld\n"
+                        , method_name, __LINE__
+                        , seqNum_
+                        , pnid
+                        , nodestate[pnid].seq_num );
+        }
+
         if (seqNum != 0)
         {
+            loSeqNum = (seqNum < loSeqNum) ? seqNum : loSeqNum;
+            hiSeqNum = (seqNum > hiSeqNum) ? seqNum : hiSeqNum;
+    
             found = false;
             for (int i=0; i<maxBucket; ++i)
             {
@@ -5239,14 +6180,22 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
         }
     }
 
+    lowSeqNum_  = loSeqNum;
+    highSeqNum_ = hiSeqNum;
+    
     if (trace_settings & TRACE_SYNC)
     {
         if ( seqNum_ != seqNumBucket[mostCountsIndex] )
         {
-            trace_printf("%s@%d Most common seq num=%lld (%d nodes), %d buckets"
-                         ", local seq num (%lld) did not match.\n",
-                         method_name, __LINE__, seqNumBucket[mostCountsIndex],
-                         seqNumCount[mostCountsIndex], maxBucket, seqNum_);
+            trace_printf( "%s@%d Most common seq num=%lld (%d nodes), "
+                          "%d buckets, low=%lld, high=%lld, local seq num (%lld) did not match.\n"
+                         , method_name, __LINE__
+                         , seqNumBucket[mostCountsIndex]
+                         , seqNumCount[mostCountsIndex]
+                         , maxBucket
+                         , lowSeqNum_
+                         , highSeqNum_
+                         , seqNum_ );
         }
     }
 
@@ -5268,7 +6217,7 @@ void CCluster::HandleDownNode( int pnid )
         trace_printf("%s@%d - Added down node to list, pnid=%d, name=(%s)\n", method_name, __LINE__, downNode->GetPNid(), downNode->GetName());
 
     // assign new TmLeader if TMLeader node is dead.
-    AssignTmLeader(pnid);
+    AssignTmLeader(pnid, false);
 
     // Build available list of spare nodes
     CNode *spareNode;
@@ -5373,6 +6322,7 @@ void CCluster::UpdateClusterState( bool &doShutdown,
     TRACE_ENTRY;
 
     struct sync_buffer_def *recvBuf;
+    struct sync_buffer_def *sendBuf = Nodes->GetSyncBuffer();
     STATE node_state;
     int change_nid;
     cluster_state_def_t nodestate[GetConfigPNodesMax()];
@@ -5430,13 +6380,14 @@ void CCluster::UpdateClusterState( bool &doShutdown,
         recvBuf = (struct sync_buffer_def *)
             (((char *) syncBuf) + index * CommBufSize);
 
-        if (trace_settings & TRACE_SYNC_DETAIL)
+        if (trace_settings & TRACE_SYNC)
         {
             int nr;
             MPI_Get_count(&status[index], MPI_CHAR, &nr);
             trace_printf("%s@%d - Received %d bytes from node %d, "
-                         "message count=%d\n",
+                         ", seq_num=%lld, message count=%d\n",
                          method_name, __LINE__, nr, index,
+                         recvBuf->nodeInfo.seq_num,
                          recvBuf->msgInfo.msg_count);
         }
 
@@ -5532,23 +6483,23 @@ void CCluster::UpdateClusterState( bool &doShutdown,
             Node[index]->SetState( State_Down );
             --CurNodes;
             // Clear bit in set of "up nodes"
-            upNodes_.upNodes[index/MAX_NODE_BITMASK] &= 
-                ~(1ull << (index%MAX_NODE_BITMASK));
+            upNodes_.upNodes[index/MAX_NODE_BITMASK] &= ~(1ull << (index%MAX_NODE_BITMASK));
         }
     }
 
-    if ( checkSeqNum_ && !ValidateSeqNum( nodestate ) && !enqueuedDown_ )
+    if ( (checkSeqNum_ || reconnectSeqNum_ != 0)
+      && !ValidateSeqNum( nodestate ) 
+      && !enqueuedDown_ )
     {
-        char buf[MON_STRING_BUF_SIZE];
-        snprintf(buf, sizeof(buf), "[%s] Sync cycle sequence number (%lld) "
-                 "incorrect.  Scheduling node down.\n", method_name, seqNum_);
-        mon_log_write(MON_CLU

<TRUNCATED>