You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2017/09/19 02:54:38 UTC

[1/4] incubator-trafodion git commit: [TRAFODION-2651] Fixed monitor-to-monitor communication by adding reconnect logic to better handle various communication glitches which previously resulted in false node down situations.

Repository: incubator-trafodion
Updated Branches:
  refs/heads/master e2789f97c -> aeb9ef223


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/cluster.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h
index c7d097b..e743341 100644
--- a/core/sqf/monitor/linux/cluster.h
+++ b/core/sqf/monitor/linux/cluster.h
@@ -57,6 +57,19 @@ enum MonXChngTags
     MON_XCHNG_DATA
 };
 
+typedef struct
+{
+    int p_sent;
+    int p_received;
+    int p_n2recv;
+    bool p_sending;
+    bool p_receiving;
+    int p_timeout_count;
+    bool p_initial_check;
+    char *p_buff;
+    struct timespec znodeFailedTime;
+} peer_t;
+
 class CNode;
 class CLNode;
 
@@ -100,7 +113,7 @@ public:
 #ifndef USE_BARRIER
     void ArmWakeUpSignal (void);
 #endif
-    void AssignTmLeader( int pnid );
+    void AssignTmLeader( int pnid, bool checkProcess );
     void stats();
     void CompleteSyncCycle()
         { syncCycle_.lock(); syncCycle_.wait(); syncCycle_.unlock(); }
@@ -118,7 +131,7 @@ public:
     void addNewSock(int nid, int otherRank, int sockFd );
 
     bool exchangeNodeData ( );
-    void exchangeTmSyncData ( struct sync_def *sync );
+    void exchangeTmSyncData ( struct sync_def *sync, bool bumpSync );
     int GetConfigPNodesCount() { return configPNodesCount_; }
     int GetConfigPNodesMax() { return configPNodesMax_; }
     bool ImAlive( bool needed=false, struct sync_def *sync = NULL );
@@ -191,12 +204,12 @@ protected:
 
     CNode  **Node;           // array of nodes
     CLNode **LNode;          // array of logical nodes
-
     int      TmSyncPNid;     // Physical Node ID of current TmSync operations master
 
 
-    void AddTmsyncMsg (struct sync_def *sync,
-                                 struct internal_msg_def *msg);
+    void AddTmsyncMsg( struct sync_buffer_def *tmSyncBuffer
+                     , struct sync_def *sync
+                     , struct internal_msg_def *msg);
     void AddReplData (struct internal_msg_def *msg);
     void AddMyNodeState ();
     void TraceTMSyncState(struct sync_buffer_def *recv_buffer,
@@ -252,6 +265,10 @@ private:
     int integratingPNid_;       // pnid of node when re-integration in progress
     MPI_Comm  joinComm_;        // new to creator communicator (1-to-1)
     int joinSock_;              // new to creator socket used at join phase
+    unsigned long long lastSeqNum_;
+    unsigned long long lowSeqNum_;
+    unsigned long long highSeqNum_;
+    unsigned long long reconnectSeqNum_;
     unsigned long long seqNum_;
     int cumulativeDelaySec_;
 
@@ -281,10 +298,6 @@ private:
 
     bool agTimeStats(struct timespec & ts_begin,
                      struct timespec & ts_end);
-    
-
-
-    struct sync_buffer_def *tmSyncBuffer_;
 
     // Size of send and receive buffers used for communication between monitors
     enum { CommBufSize = MAX_SYNC_SIZE };
@@ -323,6 +336,9 @@ private:
     int Allgather(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
     int AllgatherIB(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
     int AllgatherSock(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
+    int AllgatherSockReconnect( MPI_Status *stats );
+    int AcceptSockPeer( CNode *node, int peer );
+    int ConnectSockPeer( CNode *node, int peer );
 
     void ValidateClusterState( cluster_state_def_t nodestate[],
                                bool haveDivergence );
@@ -331,6 +347,7 @@ private:
     void HandleReintegrateError( int rc, int err,
                                  int nid, nodeId_t *nodeInfo,
                                  bool abort );
+    bool PingSockPeer(CNode *node);
     void ReIntegrateMPI( int initProblem );
     void ReIntegrateSock( int initProblem );
     void SendReIntegrateStatus( STATE nodeState, int status );
@@ -357,8 +374,9 @@ private:
 
     void InitClusterSocks( int worldSize, int myRank, char *nodeNames,int *rankToPnid );
     void InitServerSock( void );
-    int AcceptSock( int sock );
+    int  AcceptSock( int sock );
     void EpollCtl( int efd, int op, int fd, struct epoll_event *event );
+    void EpollCtlDelete( int efd, int fd, struct epoll_event *event );
     int  MkSrvSock( int *pport );
     int  MkCltSock( unsigned char srcip[4], unsigned char dstip[4], int port );
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/commaccept.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/commaccept.cxx b/core/sqf/monitor/linux/commaccept.cxx
index 9867e11..a9045af 100644
--- a/core/sqf/monitor/linux/commaccept.cxx
+++ b/core/sqf/monitor/linux/commaccept.cxx
@@ -195,6 +195,20 @@ bool CCommAccept::sendNodeInfoSock( int sockFd )
                     sizeof(nodeInfo[i].syncPort));
             nodeInfo[i].pnid = node->GetPNid();
             nodeInfo[i].creatorPNid = (nodeInfo[i].pnid == MyPNID) ? MyPNID : -1;
+
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d - Node info for pnid=%d (%s)\n"
+                              "        CommPort=%s\n"
+                              "        SyncPort=%s\n"
+                              "        creatorPNid=%d\n"
+                            , method_name, __LINE__
+                            , nodeInfo[i].pnid
+                            , nodeInfo[i].nodeName
+                            , nodeInfo[i].commPort
+                            , nodeInfo[i].syncPort
+                            , nodeInfo[i].creatorPNid );
+            }
         }
         else
         {
@@ -499,6 +513,7 @@ void CCommAccept::processNewSock( int joinFd )
     int rc;
     int integratingFd; 
     nodeId_t nodeId;
+    CNode *node;
 
     mem_log_write(CMonLog::MON_CONNTONEWMON_2);
 
@@ -516,25 +531,17 @@ void CCommAccept::processNewSock( int joinFd )
         return;
     }
 
-    if ( nodeId.creator )
-    {
-        // Indicate that this node is the creator monitor for the node up
-        // operation.
-        MyNode->SetCreator( true
-                          , nodeId.creatorShellPid
-                          , nodeId.creatorShellVerifier );
-    }
-    
     if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
     {
         trace_printf( "%s@%d - Accepted connection from pnid=%d\n"
-                      "        myNodeInfo.nodeName=%s\n"
-                      "        myNodeInfo.commPort=%s\n"
-                      "        myNodeInfo.syncPort=%s\n"
-                      "        myNodeInfo.creatorPNid=%d\n"
-                      "        myNodeInfo.creator=%d\n"
-                      "        myNodeInfo.creatorShellPid=%d\n"
-                      "        myNodeInfo.creatorShellVerifier=%d\n"
+                      "        nodeId.nodeName=%s\n"
+                      "        nodeId.commPort=%s\n"
+                      "        nodeId.syncPort=%s\n"
+                      "        nodeId.creatorPNid=%d\n"
+                      "        nodeId.creator=%d\n"
+                      "        nodeId.creatorShellPid=%d\n"
+                      "        nodeId.creatorShellVerifier=%d\n"
+                      "        nodeId.ping=%d\n"
                     , method_name, __LINE__
                     , nodeId.pnid
                     , nodeId.nodeName
@@ -543,16 +550,93 @@ void CCommAccept::processNewSock( int joinFd )
                     , nodeId.creatorPNid
                     , nodeId.creator
                     , nodeId.creatorShellPid
-                    , nodeId.creatorShellVerifier );
+                    , nodeId.creatorShellVerifier
+                    , nodeId.ping );
     }
 
-    CNode *node= Nodes->GetNode( nodeId.nodeName );
+    node= Nodes->GetNode( nodeId.nodeName );
+
+    if ( nodeId.ping )
+    {
+        // Reply with my node info
+        nodeId.pnid = MyPNID;
+        strcpy(nodeId.nodeName, MyNode->GetName());
+        strcpy(nodeId.commPort, MyNode->GetCommPort());
+        strcpy(nodeId.syncPort, MyNode->GetSyncPort());
+        nodeId.ping = true;
+        nodeId.creatorPNid = -1;
+        nodeId.creator = false;
+        nodeId.creatorShellPid = -1;
+        nodeId.creatorShellVerifier = -1;
+
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "Sending my nodeInfo.pnid=%d\n"
+                          "        nodeInfo.nodeName=%s\n"
+                          "        nodeInfo.commPort=%s\n"
+                          "        nodeInfo.syncPort=%s\n"
+                          "        nodeInfo.ping=%d\n"
+                        , nodeId.pnid
+                        , nodeId.nodeName
+                        , nodeId.commPort
+                        , nodeId.syncPort
+                        , nodeId.ping );
+        }
+    
+        rc = Monitor->SendSock( (char *) &nodeId
+                              , sizeof(nodeId_t)
+                              , joinFd );
+        if ( rc )
+        {
+            close( joinFd );
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf)
+                    , "[%s], Cannot send ping node info to node %s: (%s)\n"
+                    , method_name, node?node->GetName():"", ErrorMsg(rc));
+            mon_log_write(MON_COMMACCEPT_19, SQ_LOG_ERR, buf);    
+        }
+        return;
+    }
+    
+    if ( nodeId.creator )
+    {
+        // Indicate that this node is the creator monitor for the node up
+        // operation.
+        MyNode->SetCreator( true
+                          , nodeId.creatorShellPid
+                          , nodeId.creatorShellVerifier );
+    }
+    
     int pnid = -1;
     if ( node != NULL )
-    {   // Store port number for the node
+    {   // Store port numbers for the node
+        char commPort[MPI_MAX_PORT_NAME];
+        char syncPort[MPI_MAX_PORT_NAME];
+        strncpy(commPort, nodeId.commPort, MPI_MAX_PORT_NAME);
+        strncpy(syncPort, nodeId.syncPort, MPI_MAX_PORT_NAME);
+        char *pch1;
+        char *pch2;
         pnid = nodeId.pnid;
-        node->SetCommPort( nodeId.commPort );
-        node->SetSyncPort( nodeId.syncPort );
+
+        node->SetCommPort( commPort );
+        pch1 = strtok (commPort,":");
+        pch1 = strtok (NULL,":");
+        node->SetCommSocketPort( atoi(pch1) );
+
+        node->SetSyncPort( syncPort );
+        pch2 = strtok (syncPort,":");
+        pch2 = strtok (NULL,":");
+        node->SetSyncSocketPort( atoi(pch2) );
+
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+                        , method_name, __LINE__
+                        , node->GetPNid()
+                        , node->GetName()
+                        , pch1, atoi(pch1)
+                        , pch2, atoi(pch2) );
+        }
     }
     else
     {

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/internal.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/internal.h b/core/sqf/monitor/linux/internal.h
index be13055..f69f424 100644
--- a/core/sqf/monitor/linux/internal.h
+++ b/core/sqf/monitor/linux/internal.h
@@ -443,6 +443,9 @@ typedef struct nodeId_s
     int  creatorShellPid;
     Verifier_t creatorShellVerifier;
     bool creator;  // NEW monitor set to true to tell creator it is the CREATOR
+    bool ping;     // Monitor sets to true to tell remote monitor
+                   // it is just checking that it can communicate with it.
+                   // Used during allgather reconnect 
 } nodeId_t;
 
 typedef struct nodeStatus_s

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/msgdef.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/msgdef.h b/core/sqf/monitor/linux/msgdef.h
index 44445bf..ed0f8e8 100644
--- a/core/sqf/monitor/linux/msgdef.h
+++ b/core/sqf/monitor/linux/msgdef.h
@@ -88,6 +88,8 @@
 #define MAX_PROCESS_NAME_STR 12
 #define MAX_PROCESS_PATH 256
 #define MAX_PROCESSOR_NAME 128
+#define MAX_RECONN_PING_WAIT_TIMEOUT 5
+#define MAX_RECONN_PING_RETRY_COUNT  3
 #define MAX_REASON_TEXT  256
 #define MAX_ROLEBUF_SIZE 84
 #define MAX_SEARCH_PATH  BUFSIZ

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/pnode.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx
index 19d36f6..57ce0a6 100644
--- a/core/sqf/monitor/linux/pnode.cxx
+++ b/core/sqf/monitor/linux/pnode.cxx
@@ -418,7 +418,14 @@ int CNode::AssignNid(void)
     CLNode *lnode = AssignLNode();
     
     TRACE_EXIT;
-    return( lnode->Nid );
+    if (lnode)
+    {
+        return( lnode->Nid );
+    }
+    else
+    {
+        return -1;
+    }
 }
 
 CLNode *CNode::AssignLNode (void)
@@ -1416,6 +1423,7 @@ CNodeContainer::CNodeContainer( void )
                ,head_(NULL)
                ,tail_(NULL)
                ,syncBufferFreeSpace_(MAX_SYNC_SIZE)
+               ,lastSyncBuffer_(NULL)
                ,SyncBuffer(NULL)
 {
     const char method_name[] = "CNodeContainer::CNodeContainer";
@@ -1424,6 +1432,7 @@ CNodeContainer::CNodeContainer( void )
     // Add eyecatcher sequence as a debugging aid
     memcpy(&eyecatcher_, "NCTR", 4);
 
+    lastSyncBuffer_ = new struct sync_buffer_def;
     SyncBuffer = new struct sync_buffer_def;
 
     // Load cluster configuration
@@ -1454,6 +1463,10 @@ CNodeContainer::~CNodeContainer( void )
 
     const char method_name[] = "CNodeContainer::~CNodeContainer";
     TRACE_ENTRY;
+    if (lastSyncBuffer_)
+    {
+        delete lastSyncBuffer_;
+    }
     if (SyncBuffer)
     {
         delete SyncBuffer;
@@ -2955,7 +2968,10 @@ struct internal_msg_def *CNodeContainer::PopMsg( struct sync_buffer_def *recvBuf
     return msg;
 }
 
-
+void CNodeContainer::SaveMyLastSyncBuffer( void )
+{
+    memcpy( (void*)lastSyncBuffer_, (void*)SyncBuffer, sizeof(sync_buffer_def) );
+}
 
 bool CNodeContainer::SpaceAvail ( int msgSize )
 {

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/pnode.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/pnode.h b/core/sqf/monitor/linux/pnode.h
index 97b02db..6d07f4b 100644
--- a/core/sqf/monitor/linux/pnode.h
+++ b/core/sqf/monitor/linux/pnode.h
@@ -122,6 +122,7 @@ public:
                               + SyncBuffer->msgInfo.msg_offset; };
     inline int GetSyncHdrSize() { return  sizeof(cluster_state_def_t)
                                           + sizeof(msgInfo_t); };
+    struct sync_buffer_def * GetLastSyncBuffer() { return lastSyncBuffer_; };
     struct sync_buffer_def * GetSyncBuffer() { return SyncBuffer; };
     bool    IsShutdownActive( void );
     void    KillAll( CProcess *process );
@@ -131,6 +132,7 @@ public:
     struct internal_msg_def *PopMsg( struct sync_buffer_def *recvBuf );
     bool    SpaceAvail ( int msgSize );
     void    AddMsg (struct internal_msg_def *&msg, int msgSize );
+    void    SaveMyLastSyncBuffer( void );
     void    SetClusterConfig( CClusterConfig *clusterConfig ) { clusterConfig_ = clusterConfig; }
     void    SetupCluster( CNode ***pnode_list, CLNode ***lnode_list, int **indexToPnid );
     void    RemoveFromSpareNodesList( CNode *node );
@@ -156,6 +158,7 @@ private:
     CNode  *tail_;  // tail of physical nodes linked list
     size_t  syncBufferFreeSpace_;
 
+    struct sync_buffer_def *lastSyncBuffer_;
     struct sync_buffer_def *SyncBuffer;
 
     void    AddLNodes( CNode  *node );

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/reqtmleader.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqtmleader.cxx b/core/sqf/monitor/linux/reqtmleader.cxx
index 7783d5a..9cfe6ea 100644
--- a/core/sqf/monitor/linux/reqtmleader.cxx
+++ b/core/sqf/monitor/linux/reqtmleader.cxx
@@ -123,6 +123,25 @@ void CExtTmLeaderReq::performRequest()
 
             process = Nodes->GetLNode(tmLeaderNid)->GetProcessLByType( ProcessType_DTM );
 
+            if (!process)
+            {
+                int pnid = Nodes->GetLNode(tmLeaderNid)->GetNode()->GetPNid();
+
+                // If there is a Regroup in progress in the sync thread, 
+                // wait until it is completed. 
+                if ( !Emulate_Down )
+                {
+                    Monitor->EnterSyncCycle();
+                }
+                Monitor->AssignTmLeader( pnid, true );
+                tmLeaderNid = Monitor->GetTmLeader();
+                process = Nodes->GetLNode(tmLeaderNid)->GetProcessLByType( ProcessType_DTM );
+                if ( !Emulate_Down )
+                {
+                    Monitor->ExitSyncCycle();
+                }
+            }
+
             assert(process); 
 
             // populate the TM leader process info

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/shell.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/shell.cxx b/core/sqf/monitor/linux/shell.cxx
index c23c20c..7bbc6e3 100644
--- a/core/sqf/monitor/linux/shell.cxx
+++ b/core/sqf/monitor/linux/shell.cxx
@@ -98,6 +98,8 @@ bool MpiInitialized = false;
 bool SpareNodeColdStandby = true;
 bool ElasticityEnabled = true;
 
+int   lastDeathNid = -1;
+int   lastDeathPid = -1;
 bool  waitDeathPending = false;
 int   waitDeathNid;
 int   waitDeathPid;
@@ -993,6 +995,7 @@ void nodePendingComplete()
 void waitDeathComplete()
 {
     waitDeathLock.lock();
+    waitDeathPending = false;
     waitDeathLock.wakeOne();
     waitDeathLock.unlock();
 }
@@ -1094,7 +1097,6 @@ void recv_notice_msg(struct message_def *recv_msg, int )
         {
             if ( recv_msg->u.request.u.down.nid == waitDeathNid )
             {   // Node is down so process is down too.
-                waitDeathPending = false;
                 waitDeathComplete();
             }
         }
@@ -1127,7 +1129,6 @@ void recv_notice_msg(struct message_def *recv_msg, int )
         {
             if ( msg->u.request.u.quiesce.nid == waitDeathNid )
             {   // Node is quiesced so process is down too.
-                waitDeathPending = false;
                 waitDeathComplete();
             }
         }
@@ -1179,12 +1180,13 @@ void recv_notice_msg(struct message_def *recv_msg, int )
                     recv_msg->u.request.u.death.nid,
                     recv_msg->u.request.u.death.pid);
         }
+        lastDeathNid = recv_msg->u.request.u.death.nid;
+        lastDeathPid = recv_msg->u.request.u.death.pid;
         if ( waitDeathPending )
         {
             if ( recv_msg->u.request.u.death.nid == waitDeathNid
               && recv_msg->u.request.u.death.pid == waitDeathPid )
             {
-                waitDeathPending = false;
                 waitDeathComplete();
             }
         }
@@ -1604,7 +1606,6 @@ char *find_end_of_token (char *cmd, int maxlen, bool isEqDelim, bool isDashDelim
        && *ptr 
        && *ptr != ' ' 
        && *ptr != ',' 
-       && *ptr != ';' 
        && *ptr != '{' 
        && *ptr != '}' 
        && *ptr != ':')
@@ -2706,6 +2707,23 @@ void get_server_death (int nid, int pid)
         request_notice(nid, pid, transid);
     }
 
+    if ( lastDeathNid == nid
+      && lastDeathPid == pid )
+    {
+        if ( trace_settings & TRACE_SHELL_CMD )
+            trace_printf("%s@%d [%s] death message already received from nid=%d, "
+                         "pid=%d.\n", method_name, __LINE__, MyName, nid, pid);
+
+        lastDeathNid = -1;
+        lastDeathPid = -1;
+
+        if ( trace_settings & TRACE_SHELL_CMD )
+            trace_printf("%s@%d [%s] Exiting wait for process death\n",
+                         method_name, __LINE__, MyName);
+
+        return;
+    }
+    
     if ( trace_settings & TRACE_SHELL_CMD )
         trace_printf("%s@%d [%s] waiting for death message from nid=%d, "
                      "pid=%d.\n", method_name, __LINE__, MyName, nid, pid);

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/tmsync.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/tmsync.cxx b/core/sqf/monitor/linux/tmsync.cxx
index b72b896..3e72241 100644
--- a/core/sqf/monitor/linux/tmsync.cxx
+++ b/core/sqf/monitor/linux/tmsync.cxx
@@ -236,7 +236,7 @@ void CTmSync_Container::CommitTmDataBlock( int return_code )
         if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
            trace_printf("%s@%d" " - Getting all nodes TmSyncState\n", method_name, __LINE__);
 
-        ExchangeTmSyncState();
+        ExchangeTmSyncState( false );
         state = Nodes->GetTmState( SyncState_Commit );
         if (( state == SyncState_Abort ) ||
             ( state == SyncState_Null  )   )
@@ -267,7 +267,7 @@ void CTmSync_Container::CommitTmDataBlock( int return_code )
     // End the TM sync processing cycle for my node.
     MyNode->SetTmSyncState( SyncState_Null );
     MyNode->SetTmSyncNid( -1 );
-    ExchangeTmSyncState();
+    ExchangeTmSyncState( false );
     if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
        trace_printf("%s@%d" " - Physical Node " "%d"  " TmSyncState updated (" "%d" ")" "\n", method_name, __LINE__, MyPNID, MyNode->GetTmSyncState());
     
@@ -318,9 +318,9 @@ int CTmSync_Container::CoordinateTmDataBlock ( struct sync_def *sync )
                    trace_printf("%s@%d" " - Physical Node %d TmSyncState updated (nid=%d, state=%d)\n", method_name, __LINE__, MyPNID, MyNode->GetTmSyncNid(), MyNode->GetTmSyncState());
                 }
                 syncCycle_.lock();
-                exchangeTmSyncData( sync );
+                exchangeTmSyncData( sync, false );
                 syncCycle_.unlock();
-                ExchangeTmSyncState();
+                ExchangeTmSyncState( false );
                 if (( Monitor->TmSyncPNid == MyPNID                           ) &&
                     ( Nodes->GetTmState( SyncState_Start ) == SyncState_Start )   )
                 {
@@ -391,7 +391,7 @@ int CTmSync_Container::CoordinateTmDataBlock ( struct sync_def *sync )
                trace_printf("%s@%d" " - Physical Node " "%d"  " TmSyncState updated (" "%d" ")" "\n", method_name, __LINE__, MyPNID, MyNode->GetTmSyncState());
             }
             UnPackSyncData(sync);
-            ExchangeTmSyncState();
+            ExchangeTmSyncState( true );
         }
         else
         {
@@ -700,7 +700,7 @@ void CTmSync_Container::ProcessTmSyncReply( struct message_def * msg )
     TRACE_EXIT;
 }
 
-void CTmSync_Container::ExchangeTmSyncState( void )
+void CTmSync_Container::ExchangeTmSyncState( bool bumpSync )
 {
     struct sync_def sync;
 
@@ -714,7 +714,7 @@ void CTmSync_Container::ExchangeTmSyncState( void )
     sync.count = 0;
     sync.length = 0;
     syncCycle_.lock();
-    exchangeTmSyncData( &sync );
+    exchangeTmSyncData( &sync, bumpSync );
     syncCycle_.unlock();
 
     TRACE_EXIT;

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/tmsync.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/tmsync.h b/core/sqf/monitor/linux/tmsync.h
index 75d4f47..9d0aaf4 100644
--- a/core/sqf/monitor/linux/tmsync.h
+++ b/core/sqf/monitor/linux/tmsync.h
@@ -138,7 +138,7 @@ private:
 
     void EndTmSync( MSGTYPE type );
     void EndPendingTmSync( struct sync_def *sync );
-    void ExchangeTmSyncState( void );
+    void ExchangeTmSyncState( bool bumpSync );
     struct sync_def *PackSyncData( void );        // return sync_def for current TmSyncReqQ entries
     void SendUnsolicitedMessages( void );
     void UnPackSyncData( struct sync_def *sync ); // process sync_def received from another node

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/zclient.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx
index aca0a85..23dca8a 100644
--- a/core/sqf/monitor/linux/zclient.cxx
+++ b/core/sqf/monitor/linux/zclient.cxx
@@ -708,9 +708,17 @@ int CZClient::InitializeZClient( void )
     TRACE_ENTRY;
 
     int rc;
+    int retries = 0;
 
     rc = MakeClusterZNodes();
 
+    while ( rc != ZOK && retries < ZOOKEEPER_RETRY_COUNT)
+    {
+        sleep(ZOOKEEPER_RETRY_WAIT);
+        retries++;
+        rc = MakeClusterZNodes();
+    }
+
     if ( rc == ZOK )
     {
         rc = RegisterMyNodeZNode();

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/sqenvcom.sh
----------------------------------------------------------------------
diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh
index 6321ecf..bb1638e 100644
--- a/core/sqf/sqenvcom.sh
+++ b/core/sqf/sqenvcom.sh
@@ -47,10 +47,6 @@ export PRODUCT_COPYRIGHT_HEADER="2015-2017 Apache Software Foundation"
 ##############################################################
 export TRAFODION_ENABLE_AUTHENTICATION=${TRAFODION_ENABLE_AUTHENTICATION:-NO}
 
-# Uncomment Trafodion Configuration store type
-#export TRAF_CONFIG_DBSTORE=Sqlite
-#export TRAF_CONFIG_DBSTORE=Zookeeper
-
 # default SQ_IC to TCP if it is not set in sqenv.sh. Values are
 # IBV for infiniband, TCP for tcp
 export SQ_IC=${SQ_IC:-TCP}
@@ -681,10 +677,9 @@ export SQ_MON_KEEPINTVL=6
 export SQ_MON_KEEPCNT=5
 
 # Monitor sync thread epoll wait timeout is in seconds
-# Currently set to 64 seconds (4 second timeout, 16 retries)
-export SQ_MON_EPOLL_WAIT_TIMEOUT=4
-# Note: the retry count is ignored when SQ_MON_ZCLIENT_ENABLED=1 (the default)
-export SQ_MON_EPOLL_RETRY_COUNT=16
+# Currently set to 64 seconds (16 second timeout, 4 retries)
+export SQ_MON_EPOLL_WAIT_TIMEOUT=16
+export SQ_MON_EPOLL_RETRY_COUNT=4
 
 # Monitor Zookeeper client
 #  - A zero value disables the zclient logic in the monitor process.


[2/4] incubator-trafodion git commit: [TRAFODION-2651] Fixed monitor-to-monitor communication by adding reconnect logic to better handle various communication glitches which previously resulted in false node down situations.

Posted by su...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/cluster.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx
index 585ff0a..c65116b 100644
--- a/core/sqf/monitor/linux/cluster.cxx
+++ b/core/sqf/monitor/linux/cluster.cxx
@@ -361,13 +361,14 @@ void CCluster::NodeReady( CNode *spareNode )
 // Assigns a new TMLeader if given pnid is same as TmLeaderNid 
 // TmLeader is a logical node num. 
 // pnid has gone down, so if that node was previously the TM leader, a new one needs to be chosen.
-void CCluster::AssignTmLeader(int pnid)
+void CCluster::AssignTmLeader( int pnid, bool checkProcess )
 {
     const char method_name[] = "CCluster::AssignTmLeader";
     TRACE_ENTRY;
 
     int i = 0;
     CNode *node = NULL;
+    CProcess *process = NULL;
 
     int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid();
 
@@ -375,24 +376,50 @@ void CCluster::AssignTmLeader(int pnid)
     {
         node = LNode[TmLeaderNid]->GetNode();
 
-        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+        if (checkProcess)
         {
-            trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, isSoftNodeDown=%d\n"
-                        , method_name, __LINE__
-                        , node->GetPNid()
-                        , node->GetName()
-                        , NodePhaseString(node->GetPhase())
-                        , node->IsSoftNodeDown());
+            process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM );
+            if (process)
+            {
+                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+                {
+                    if (node)
+                        trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, "
+                                      "isSoftNodeDown=%d, checkProcess=%d\n"
+                                    , method_name, __LINE__
+                                    , node->GetPNid()
+                                    , node->GetName()
+                                    , NodePhaseString(node->GetPhase())
+                                    , node->IsSoftNodeDown()
+                                    , checkProcess );
+                }
+                return;
+            }
+        }
+        else
+        {
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+            {
+                if (node)
+                    trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, "
+                                  "isSoftNodeDown=%d, checkProcess=%d\n"
+                                , method_name, __LINE__
+                                , node->GetPNid()
+                                , node->GetName()
+                                , NodePhaseString(node->GetPhase())
+                                , node->IsSoftNodeDown()
+                                , checkProcess );
+            }
+            return;
         }
-    
-        return;
     }
 
     node = Node[TmLeaderPNid];
 
     if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
     {
-        trace_printf("%s@%d" " - Node "  "%d" " TmLeader failed." "\n", method_name, __LINE__, TmLeaderNid);
+        trace_printf( "%s@%d" " - Node "  "%d" " TmLeader failed! (checkProcess=%d)\n"
+                    , method_name, __LINE__, TmLeaderNid, checkProcess );
     }
 
     for (i=0; i<GetConfigPNodesMax(); i++)
@@ -436,6 +463,15 @@ void CCluster::AssignTmLeader(int pnid)
 
         TmLeaderNid = node->GetFirstLNode()->GetNid();
 
+        if (checkProcess)
+        {
+            process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM );
+            if (!process)
+            {
+                continue; // skip this node no DTM process exists
+            }
+        }
+
         if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
         {
             trace_printf("%s@%d" " - Node "  "%d" " is the new TmLeader." "\n", method_name, __LINE__, TmLeaderNid);
@@ -483,7 +519,11 @@ CCluster::CCluster (void)
       integratingPNid_(-1),
       joinComm_(MPI_COMM_NULL),
       joinSock_(-1),
-      seqNum_(0),
+      lastSeqNum_(0),
+      lowSeqNum_(0),
+      highSeqNum_(0),
+      reconnectSeqNum_(0),
+      seqNum_(1),
       waitForWatchdogExit_(false)
       ,checkSeqNum_(false)
       ,validateNodeDown_(false)
@@ -511,6 +551,9 @@ CCluster::CCluster (void)
                     method_name, __LINE__,
                     (checkSeqNum_ ? "enabled" : "disabled"));
     
+    CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
+    configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
+
     // Compute minimum "sync cycles" per second.   The minimum is 1/10
     // the expected number, assuming "next_test_delay" cycles per second (where
     // next_test_delay is in microseconds).
@@ -521,8 +564,6 @@ CCluster::CCluster (void)
     agMinElapsed_.tv_sec = 10000;
     agMinElapsed_.tv_nsec = 0;
 
-    tmSyncBuffer_ = Nodes->GetSyncBuffer();
-
     // Allocate structures for monitor point-to-point communications
     //
     //   The current approach is to allocate to a maximum number (MAX_NODES).
@@ -547,6 +588,7 @@ CCluster::CCluster (void)
     {
         comms_[i] = MPI_COMM_NULL;
         socks_[i] = -1;
+        sockPorts_[i] = -1;
     }
 
     env = getenv("SQ_MON_NODE_DOWN_VALIDATION");
@@ -647,7 +689,7 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
         {
             trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
         }
-        if (nodestate[i].seq_num > 0)
+        if (nodestate[i].seq_num > 1)
         {
             if (seqNum == 0) 
             {
@@ -658,6 +700,10 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
                 assert(nodestate[i].seq_num == seqNum);
             }
         }
+        if (trace_settings & TRACE_RECOVERY)
+        {
+            trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
+        }
     }
 
     TRACE_EXIT;
@@ -839,7 +885,7 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state)
     if ( Emulate_Down )
     {
         IAmIntegrated = false;
-        AssignTmLeader(pnid);
+        AssignTmLeader(pnid, false);
     }
 
     TRACE_EXIT;
@@ -940,7 +986,7 @@ void CCluster::SoftNodeDown( int pnid )
     }
 
     IAmIntegrated = false;
-    AssignTmLeader(pnid);
+    AssignTmLeader(pnid, false);
 
     TRACE_EXIT;
 }
@@ -1303,9 +1349,10 @@ int CCluster::HardNodeUp( int pnid, char *node_name )
                             , method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) );
                 for ( int i =0; i < Nodes->GetPNodesCount(); i++ )
                 {
-                    trace_printf( "%s@%d socks_[indexToPnid_[%d]=%d]=%d\n"
+                    trace_printf( "%s@%d socks_[indexToPnid_[%d]=%d]=%d, sockPorts_[indexToPnid_[%d]=%d]=%d\n"
                                 , method_name, __LINE__
-                                , i, indexToPnid_[i], socks_[indexToPnid_[i]] );
+                                , i, indexToPnid_[i], socks_[indexToPnid_[i]]
+                                , i, indexToPnid_[i], sockPorts_[indexToPnid_[i]] );
                 }
             }
             if ( MyNode->IsCreator() )
@@ -1616,8 +1663,9 @@ const char *SyncStateString( SyncState state)
 }
 
 
-void CCluster::AddTmsyncMsg (struct sync_def *sync,
-                             struct internal_msg_def *msg)
+void CCluster::AddTmsyncMsg( struct sync_buffer_def *tmSyncBuffer
+                           , struct sync_def *sync
+                           , struct internal_msg_def *msg)
 {
     const char method_name[] = "CCluster::AddTmsyncMsg";
     TRACE_ENTRY;
@@ -1647,12 +1695,12 @@ void CCluster::AddTmsyncMsg (struct sync_def *sync,
 
     // Insert the message size into the message header
     msg->replSize = msgSize;
-    tmSyncBuffer_->msgInfo.msg_count = 1;
-    tmSyncBuffer_->msgInfo.msg_offset += msgSize;
+    tmSyncBuffer->msgInfo.msg_count = 1;
+    tmSyncBuffer->msgInfo.msg_offset += msgSize;
 
     // Set end-of-buffer marker
     msg = (struct internal_msg_def *)
-        &tmSyncBuffer_->msg[tmSyncBuffer_->msgInfo.msg_offset];
+        &tmSyncBuffer->msg[tmSyncBuffer->msgInfo.msg_offset];
     msg->type = InternalType_Null;
 
     TRACE_EXIT;
@@ -2787,7 +2835,6 @@ void CCluster::InitializeConfigCluster( void )
     MPI_Comm_size (MPI_COMM_WORLD, &worldSize);    
     int rankToPnid[worldSize];
     CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
-    configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
     
     CurNodes = worldSize;
 
@@ -2931,7 +2978,7 @@ void CCluster::InitializeConfigCluster( void )
                     node->SetCommPort( &commPortNums[ i * MPI_MAX_PORT_NAME] );
                     node->SetSyncPort( &syncPortNums[ i * MPI_MAX_PORT_NAME] );
                     rankToPnid[i] = node->GetPNid();
-                    nodeStatus[i] = true;
+                    nodeStatus[rankToPnid[i]] = true;
 
                     if (trace_settings & TRACE_INIT)
                     {
@@ -2967,7 +3014,7 @@ void CCluster::InitializeConfigCluster( void )
             // Any nodes not in the initial MPI_COMM_WORLD are down.
             for (int i=0; i<GetConfigPNodesCount(); ++i)
             {
-                if ( nodeStatus[i] == false )
+                if ( nodeStatus[indexToPnid_[i]] == false )
                 {
                     if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
                         trace_printf( "%s@%d - nodeStatus[%d]=%d"
@@ -2982,7 +3029,7 @@ void CCluster::InitializeConfigCluster( void )
                     // assign new TmLeader if TMLeader node is dead.
                     if (TmLeaderPNid == indexToPnid_[i]) 
                     {
-                        AssignTmLeader(indexToPnid_[i]);
+                        AssignTmLeader(indexToPnid_[i], false);
                     }
                 }
                 else
@@ -3327,6 +3374,132 @@ void CCluster::SendReIntegrateStatus( STATE nodeState, int initErr )
     }
 }
 
+bool CCluster::PingSockPeer(CNode *node)
+{
+    const char method_name[] = "CCluster::PingSockPeer";
+    TRACE_ENTRY;
+
+    bool rs = true;
+    int  rc;
+    int  pingSock = -1;
+
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        trace_printf( "%s@%d - Pinging remote monitor %s, pnid=%d\n"
+                    , method_name, __LINE__
+                    , node->GetName(), node->GetPNid() );
+    }
+
+    // Attempt to connect with remote monitor
+    for (int i = 0; i < MAX_RECONN_PING_RETRY_COUNT; i++ )
+    {
+        pingSock = Monitor->Connect( node->GetCommPort() );
+        if ( pingSock < 0 )
+        {
+            sleep( MAX_RECONN_PING_WAIT_TIMEOUT );
+        }
+        else
+        {
+            break;
+        }
+    }
+    if ( pingSock < 0 )
+    {
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Can't connect to remote monitor %s, pnid=%d\n"
+                        , method_name, __LINE__
+                        , node->GetName(), node->GetPNid() );
+        }
+        return(false);
+    }
+
+    nodeId_t nodeInfo;
+
+    nodeInfo.pnid = MyPNID;
+    strcpy(nodeInfo.nodeName, MyNode->GetName());
+    strcpy(nodeInfo.commPort, MyNode->GetCommPort());
+    strcpy(nodeInfo.syncPort, MyNode->GetSyncPort());
+    nodeInfo.ping = true;
+    nodeInfo.creatorPNid = -1;
+    nodeInfo.creator = false;
+    nodeInfo.creatorShellPid = -1;
+    nodeInfo.creatorShellVerifier = -1;
+    
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        trace_printf( "Sending my nodeInfo.pnid=%d\n"
+                      "        nodeInfo.nodeName=%s\n"
+                      "        nodeInfo.commPort=%s\n"
+                      "        nodeInfo.syncPort=%s\n"
+                      "        nodeInfo.creatorPNid=%d\n"
+                      "        nodeInfo.creator=%d\n"
+                      "        nodeInfo.creatorShellPid=%d\n"
+                      "        nodeInfo.creatorShellVerifier=%d\n"
+                      "        nodeInfo.ping=%d\n"
+                    , nodeInfo.pnid
+                    , nodeInfo.nodeName
+                    , nodeInfo.commPort
+                    , nodeInfo.syncPort
+                    , nodeInfo.creatorPNid
+                    , nodeInfo.creator
+                    , nodeInfo.creatorShellPid
+                    , nodeInfo.creatorShellVerifier
+                    , nodeInfo.ping );
+    }
+
+    rc = Monitor->SendSock( (char *) &nodeInfo
+                          , sizeof(nodeId_t)
+                          , pingSock );
+
+    if ( rc )
+    {
+        rs = false;
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], Cannot send ping node info to node %s: (%s)\n"
+                , method_name, node->GetName(), ErrorMsg(rc));
+        mon_log_write(MON_PINGSOCKPEER_1, SQ_LOG_ERR, buf);    
+    }
+    else
+    {
+        // Get info about connecting monitor
+        rc = Monitor->ReceiveSock( (char *) &nodeInfo
+                                 , sizeof(nodeId_t)
+                                 , pingSock );
+        if ( rc )
+        {   // Handle error
+            rs = false;
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf)
+                    , "[%s], Cannot receive ping node info from node %s: (%s)\n"
+                    , method_name, node->GetName(), ErrorMsg(rc));
+            mon_log_write(MON_PINGSOCKPEER_2, SQ_LOG_ERR, buf);    
+        }
+        else
+        {
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "Received from nodeInfo.pnid=%d\n"
+                              "        nodeInfo.nodeName=%s\n"
+                              "        nodeInfo.commPort=%s\n"
+                              "        nodeInfo.syncPort=%s\n"
+                              "        nodeInfo.ping=%d\n"
+                            , nodeInfo.pnid
+                            , nodeInfo.nodeName
+                            , nodeInfo.commPort
+                            , nodeInfo.syncPort
+                            , nodeInfo.ping );
+            }
+        }
+    }
+
+    close( pingSock );
+
+    TRACE_EXIT;
+    return( rs );
+}
+
 void CCluster::ReIntegrate( int initProblem )
 {
     const char method_name[] = "CCluster::ReIntegrate";
@@ -3607,6 +3780,10 @@ void CCluster::ReIntegrateSock( int initProblem )
     int rc;
     int existingCommFd;
     int existingSyncFd;
+    char commPort[MPI_MAX_PORT_NAME];
+    char syncPort[MPI_MAX_PORT_NAME];
+    char *pch1;
+    char *pch2;
 
     // Set bit indicating my node is up
     upNodes_.upNodes[MyPNID/MAX_NODE_BITMASK] |= (1ull << (MyPNID%MAX_NODE_BITMASK));
@@ -3642,12 +3819,6 @@ void CCluster::ReIntegrateSock( int initProblem )
 
     TEST_POINT( TP011_NODE_UP );
 
-    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
-    {
-        trace_printf("%s@%d Connected to creator monitor, sending my node info\n",
-                     method_name, __LINE__); 
-    }
-
     // Send this node's name and port number so creator monitor 
     // knows who we are, and set flag to let creator monitor it is the CREATOR.
     nodeId_t myNodeInfo;
@@ -3662,21 +3833,14 @@ void CCluster::ReIntegrateSock( int initProblem )
 
     if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
     {
-        trace_printf( "%s@%d - Sending my node info to creator monitor\n"
-                    , method_name, __LINE__);
-        trace_printf( "Node info for pnid=%d\n"
-                      "        myNodeInfo.nodeName=%s\n"
-                      "        myNodeInfo.commPort=%s\n"
-                      "        myNodeInfo.syncPort=%s\n"
-                      "        myNodeInfo.creatorPNid=%d\n"
-                      "        myNodeInfo.creator=%d\n"
-                      "        myNodeInfo.creatorShellPid=%d\n"
-                      "        myNodeInfo.creatorShellVerifier=%d\n"
+        trace_printf( "%s@%d - Connected to creator monitor, sending my info, "
+                      "node %d (%s), commPort=%s, syncPort=%s, creator=%d, "
+                      "creatorShellPid=%d:%d\n"
+                    , method_name, __LINE__
                     , myNodeInfo.pnid
                     , myNodeInfo.nodeName
                     , myNodeInfo.commPort
                     , myNodeInfo.syncPort
-                    , myNodeInfo.creatorPNid
                     , myNodeInfo.creator
                     , myNodeInfo.creatorShellPid
                     , myNodeInfo.creatorShellVerifier );
@@ -3774,10 +3938,33 @@ void CCluster::ReIntegrateSock( int initProblem )
             otherMonRank_[nodeInfo[i].pnid] = 0;
             ++CurNodes;
 
-            Node[nodeInfo[i].pnid]->SetCommPort( nodeInfo[i].commPort );
-            Node[nodeInfo[i].pnid]->SetSyncPort( nodeInfo[i].syncPort );
+            // Store port numbers for the node
+            strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME);
+            strncpy(syncPort, nodeInfo[i].syncPort, MPI_MAX_PORT_NAME);
+
+            Node[nodeInfo[i].pnid]->SetCommPort( commPort );
+            pch1 = strtok (commPort,":");
+            pch1 = strtok (NULL,":");
+            Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+
+            Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
+            pch2 = strtok (syncPort,":");
+            pch2 = strtok (NULL,":");
+            Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+            sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
+
             Node[nodeInfo[i].pnid]->SetState( State_Up );
 
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+                            , method_name, __LINE__
+                            , Node[nodeInfo[i].pnid]->GetPNid()
+                            , Node[nodeInfo[i].pnid]->GetName()
+                            , pch1, atoi(pch1)
+                            , pch2, atoi(pch2) );
+            }
+
             // Tell creator we are ready to accept its connection
             int mypnid = MyPNID;
             rc = Monitor->SendSock( (char *) &mypnid
@@ -3897,10 +4084,34 @@ void CCluster::ReIntegrateSock( int initProblem )
 
             otherMonRank_[nodeInfo[i].pnid] = 0;
             ++CurNodes;
-            Node[nodeInfo[i].pnid]->SetCommPort( nodeInfo[i].commPort );
-            Node[nodeInfo[i].pnid]->SetSyncPort( nodeInfo[i].syncPort );
+
+            // Store port numbers for the node
+            strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME);
+            strncpy(syncPort, nodeInfo[i].syncPort, MPI_MAX_PORT_NAME);
+        
+            Node[nodeInfo[i].pnid]->SetCommPort( commPort );
+            pch1 = strtok (commPort,":");
+            pch1 = strtok (NULL,":");
+            Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+        
+            Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
+            pch2 = strtok (syncPort,":");
+            pch2 = strtok (NULL,":");
+            Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+            sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
+
             Node[nodeInfo[i].pnid]->SetState( State_Up );
 
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+                            , method_name, __LINE__
+                            , Node[nodeInfo[i].pnid]->GetPNid()
+                            , Node[nodeInfo[i].pnid]->GetName()
+                            , pch1, atoi(pch1)
+                            , pch2, atoi(pch2) );
+            }
+
             // Connect to existing monitor
             existingSyncFd = AcceptSyncSock();
             if ( existingSyncFd < 0 )
@@ -3969,9 +4180,10 @@ void CCluster::ReIntegrateSock( int initProblem )
         }
         for ( int i =0; i < pnodeCount; i++ )
         {
-            trace_printf( "%s@%d socks_[%d]=%d\n"
+            trace_printf( "%s@%d socks_[%d]=%d, sockPorts_[%d]=%d\n"
                         , method_name, __LINE__
-                        , nodeInfo[i].pnid, socks_[nodeInfo[i].pnid]);
+                        , nodeInfo[i].pnid, socks_[nodeInfo[i].pnid]
+                        , nodeInfo[i].pnid, sockPorts_[nodeInfo[i].pnid]);
         }
         for ( int i =0; i < MAX_NODE_MASKS ; i++ )
         {
@@ -4224,16 +4436,24 @@ void CCluster::setNewSock( int pnid )
                 --CurNodes;
             }
 
-            if (trace_settings & TRACE_RECOVERY)
-            {
-                trace_printf("%s@%d - setting new communicator for pnid %d, "
-                             "otherRank=%d\n",
-                             method_name, __LINE__, it->pnid, it->otherRank);
-            }
-
+            CNode *node= Nodes->GetNode( it->pnid );
             socks_[it->pnid] = it->socket;
+            sockPorts_[it->pnid] = node->GetSyncSocketPort();
             otherMonRank_[it->pnid] = it->otherRank;
             ++CurNodes;
+
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d - Setting new communicator for %d (%s), "
+                              "socks_[%d]=%d, sockPorts_[%d]=%d, otherMonRank_[%d]=%d\n"
+                            , method_name, __LINE__
+                            , node->GetPNid()
+                            , node->GetName()
+                            , it->pnid, socks_[it->pnid]
+                            , it->pnid, sockPorts_[it->pnid]
+                            , it->pnid, otherMonRank_[it->pnid] );
+            }
+
             // Set bit indicating node is up
             upNodes_.upNodes[it->pnid/MAX_NODE_BITMASK] |= (1ull << (it->pnid%MAX_NODE_BITMASK));
 
@@ -4419,20 +4639,9 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
     const char method_name[] = "CCluster::AllgatherSock";
     TRACE_ENTRY;
 
+    bool reconnecting = false;
     static int hdrSize = Nodes->GetSyncHdrSize( );
     int err = MPI_SUCCESS;
-    typedef struct
-    {
-        int p_sent;
-        int p_received;
-        int p_n2recv;
-        bool p_sending;
-        bool p_receiving;
-        int p_timeout_count;
-        bool p_initial_check;
-        char *p_buff;
-        struct timespec znodeFailedTime;
-    } peer_t;
     peer_t p[GetConfigPNodesMax()];
     memset( p, 0, sizeof(p) );
     tag = 0; // make compiler happy
@@ -4442,12 +4651,12 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
                                 ? (ZClient->GetSessionTimeout() * 2) : 120;
 
     int nsent = 0, nrecv = 0;
-    for ( int iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
+    for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
     {
-        peer_t *peer = &p[iPeer];
-        stats[iPeer].MPI_ERROR = MPI_SUCCESS;
-        stats[iPeer].count = 0;
-        if ( iPeer == MyPNID || socks_[iPeer] == -1 )
+        peer_t *peer = &p[indexToPnid_[iPeer]];
+        stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_SUCCESS;
+        stats[indexToPnid_[iPeer]].count = 0;
+        if ( indexToPnid_[iPeer] == MyPNID || socks_[indexToPnid_[iPeer]] == -1 )
         {
             peer->p_sending = peer->p_receiving = false;
             nsent++;
@@ -4460,11 +4669,31 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
             peer->p_timeout_count = 0;
             peer->p_initial_check = true;
             peer->p_n2recv = -1;
-            peer->p_buff = ((char *) rbuf) + (iPeer * CommBufSize);
+            peer->p_buff = ((char *) rbuf) + (indexToPnid_[iPeer] * CommBufSize);
+
             struct epoll_event event;
-            event.data.fd = socks_[iPeer];
+            event.data.fd = socks_[indexToPnid_[iPeer]];
             event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
-            EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[iPeer], &event );
+            EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[iPeer]], &event );
+        }
+    }
+
+    if (trace_settings & (TRACE_SYNC | TRACE_SYNC_DETAIL))
+    {
+        for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+        {
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                peer_t *peer = &p[indexToPnid_[i]];
+                trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                              "peer->p_sending=%d, "
+                              "peer->p_receiving=%d\n"
+                            , method_name, __LINE__
+                            , indexToPnid_[i]
+                            , socks_[indexToPnid_[i]]
+                            , peer->p_sending
+                            , peer->p_receiving );
+            }
         }
     }
 
@@ -4493,8 +4722,8 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
         else
         {
             // default to 64 seconds
-            sv_epoll_wait_timeout = 4000;
-            sv_epoll_retry_count = 16;
+            sv_epoll_wait_timeout = 16000;
+            sv_epoll_retry_count = 4;
         }
 
         char buf[MON_STRING_BUF_SIZE];
@@ -4512,38 +4741,128 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
     struct epoll_event events[2*GetConfigPNodesMax() + 1];
     while ( 1 )
     {
-        int maxEvents = 2*GetConfigPNodesMax() - nsent - nrecv;
+reconnected:
+        int maxEvents = 2*GetConfigPNodesCount() - nsent - nrecv;
         if ( maxEvents == 0 ) break;
         int nw;
         int zerr = ZOK;
+
         while ( 1 )
         {
             nw = epoll_wait( epollFD_, events, maxEvents, sv_epoll_wait_timeout );
             if ( nw >= 0 || errno != EINTR ) break;
         }
+
         if ( nw == 0 )
-        {
-            for ( int iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
+        { // Timeout, no fd's ready
+            for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
             {
-                peer_t *peer = &p[iPeer];
-                if ( (iPeer != MyPNID) &&
-                    (socks_[iPeer] != -1) )
+                peer_t *peer = &p[indexToPnid_[iPeer]];
+                if ( (indexToPnid_[iPeer] != MyPNID) && (socks_[indexToPnid_[iPeer]] != -1) )
                 {
-                    if ( (peer->p_receiving) ||
-                        (peer->p_sending) )
+                    if ( (peer->p_receiving) || (peer->p_sending) )
                     {
                         if ( ! ZClientEnabled )
                         {
-                            peer->p_timeout_count++;
+                            if (peer->p_initial_check && !reconnecting)
+                            {
+                                peer->p_initial_check = false;
+                                clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
+                                peer->znodeFailedTime.tv_sec += sessionTimeout;
+                                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                {
+                                    trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n"
+                                                , method_name, __LINE__
+                                                , peer->znodeFailedTime.tv_sec);
+                                }
+                            }
     
                             if ( peer->p_timeout_count < sv_epoll_retry_count )
                             {
+                                peer->p_timeout_count++;
+
+                                if (IsRealCluster)
+                                {
+                                    if (trace_settings & TRACE_RECOVERY)
+                                    {
+                                        trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+                                                      " timeout count=%d,"
+                                                      " sending=%d,"
+                                                      " receiving=%d\n"
+                                                    , method_name, __LINE__
+                                                    , Node[indexToPnid_[iPeer]]->GetName(), iPeer
+                                                    , peer->p_timeout_count
+                                                    , peer->p_sending
+                                                    , peer->p_receiving);
+                                    }
+                                    // Attempt reconnect to all peers
+                                    err = AllgatherSockReconnect( stats );
+                                    // Redrive IOs on live peer connections
+                                    nsent = 0; nrecv = 0;
+                                    for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+                                    {
+                                        peer_t *peer = &p[indexToPnid_[i]];
+                                        if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+                                        {
+                                            peer->p_sending = peer->p_receiving = false;
+                                            nsent++;
+                                            nrecv++;
+                                        }
+                                        else
+                                        {
+                                            peer->p_sending = peer->p_receiving = true;
+                                            peer->p_sent = peer->p_received = 0;
+                                            peer->p_timeout_count = 0;
+                                            peer->p_n2recv = -1;
+                                            peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+                                
+                                            struct epoll_event event;
+                                            event.data.fd = socks_[indexToPnid_[i]];
+                                            event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+                                            EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event );
+                                        }
+                                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                        {
+                                            trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                                                          "peer->p_sending=%d, "
+                                                          "peer->p_receiving=%d\n"
+                                                        , method_name, __LINE__
+                                                        , indexToPnid_[i]
+                                                        , socks_[indexToPnid_[i]]
+                                                        , peer->p_sending
+                                                        , peer->p_receiving );
+                                        }
+                                    }
+                                    reconnectSeqNum_ = seqNum_;
+                                    reconnecting = true;
+                                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                    {
+                                        trace_printf( "%s@%d" " - Reconnecting!\n"
+                                                    , method_name, __LINE__ );
+                                    }
+                                    goto reconnected;
+                                }
                                 continue;
                             }
+                            if (trace_settings & TRACE_RECOVERY)
+                            {
+                                trace_printf( "%s@%d - Peer timeout triggered: "
+                                              "peer->p_timeout_count %d, "
+                                              "sv_epoll_retry_count %d\n"
+                                              "        socks_[%d]=%d\n"
+                                              "        stats[%d].MPI_ERROR=%s\n"
+                                            , method_name, __LINE__
+                                            , peer->p_timeout_count
+                                            , sv_epoll_retry_count
+                                            , indexToPnid_[iPeer]
+                                            , socks_[indexToPnid_[iPeer]]
+                                            , iPeer
+                                            , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
+                            }
                         }
                         else
                         {
-                            if (peer->p_initial_check)
+                            if (peer->p_initial_check && !reconnecting)
                             {
                                 peer->p_initial_check = false;
                                 clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
@@ -4557,7 +4876,7 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
                                 
                             }
                             // If not expired, stay in the loop
-                            if ( ! ZClient->IsZNodeExpired( Node[iPeer]->GetName(), zerr ))
+                            if ( ! ZClient->IsZNodeExpired( Node[indexToPnid_[iPeer]]->GetName(), zerr ))
                             {
                                 if ( zerr == ZCONNECTIONLOSS || zerr == ZOPERATIONTIMEOUT )
                                 {
@@ -4572,10 +4891,71 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
    
                                         if ( peer->p_timeout_count < sv_epoll_retry_count )
                                         {
+                                            if (IsRealCluster)
+                                            {
+                                                if (trace_settings & TRACE_RECOVERY)
+                                                {
+                                                    trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+                                                                  " timeout count=%d,"
+                                                                  " sending=%d,"
+                                                                  " receiving=%d\n"
+                                                                , method_name, __LINE__
+                                                                , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                                                , peer->p_timeout_count
+                                                                , peer->p_sending
+                                                                , peer->p_receiving);
+                                                }
+                                                // Attempt reconnect to all peers
+                                                err = AllgatherSockReconnect( stats );
+                                                // Redrive IOs on live peer connections
+                                                nsent = 0; nrecv = 0;
+                                                for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+                                                {
+                                                    peer_t *peer = &p[indexToPnid_[i]];
+                                                    if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+                                                    {
+                                                        peer->p_sending = peer->p_receiving = false;
+                                                        nsent++;
+                                                        nrecv++;
+                                                    }
+                                                    else
+                                                    {
+                                                        peer->p_sending = peer->p_receiving = true;
+                                                        peer->p_sent = peer->p_received = 0;
+                                                        peer->p_timeout_count = 0;
+                                                        peer->p_n2recv = -1;
+                                                        peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+                                            
+                                                        struct epoll_event event;
+                                                        event.data.fd = socks_[indexToPnid_[i]];
+                                                        event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+                                                        EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
+                                                    }
+                                                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                                    {
+                                                        trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                                                                      "peer->p_sending=%d, "
+                                                                      "peer->p_receiving=%d\n"
+                                                                    , method_name, __LINE__
+                                                                    , indexToPnid_[i]
+                                                                    , socks_[indexToPnid_[i]]
+                                                                    , peer->p_sending
+                                                                    , peer->p_receiving );
+                                                    }
+                                                }
+                                                reconnectSeqNum_ = seqNum_;
+                                                reconnecting = true;
+                                                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                                {
+                                                    trace_printf( "%s@%d" " - Reconnecting!\n"
+                                                                , method_name, __LINE__ );
+                                                }
+                                                goto reconnected;
+                                            }
                                             continue;
                                         }
                                     }
-                                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                    if (trace_settings & TRACE_RECOVERY)
                                     {
                                         trace_printf( "%s@%d - Znode Failed triggered\n"
                                                       "        Current Time    %ld(secs)\n"
@@ -4592,58 +4972,99 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
 
                                     if ( peer->p_timeout_count < sv_epoll_retry_count )
                                     {
+                                        if (IsRealCluster)
+                                        {
+                                            if (trace_settings & TRACE_RECOVERY)
+                                            {
+                                                trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+                                                              " timeout count=%d,"
+                                                              " sending=%d,"
+                                                              " receiving=%d\n"
+                                                            , method_name, __LINE__
+                                                            , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                                            , peer->p_timeout_count
+                                                            , peer->p_sending
+                                                            , peer->p_receiving);
+                                            }
+                                            // Attempt reconnect to all peers
+                                            err = AllgatherSockReconnect( stats );
+                                            // Redrive IOs on live peer connections
+                                            nsent = 0; nrecv = 0;
+                                            for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+                                            {
+                                                peer_t *peer = &p[indexToPnid_[i]];
+                                                if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+                                                {
+                                                    peer->p_sending = peer->p_receiving = false;
+                                                    nsent++;
+                                                    nrecv++;
+                                                }
+                                                else
+                                                {
+                                                    peer->p_sending = peer->p_receiving = true;
+                                                    peer->p_sent = peer->p_received = 0;
+                                                    peer->p_timeout_count = 0;
+                                                    peer->p_n2recv = -1;
+                                                    peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+                                        
+                                                    struct epoll_event event;
+                                                    event.data.fd = socks_[indexToPnid_[i]];
+                                                    event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+                                                    EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
+                                                }
+                                                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                                {
+                                                    trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                                                                  "peer->p_sending=%d, "
+                                                                  "peer->p_receiving=%d\n"
+                                                                , method_name, __LINE__
+                                                                , indexToPnid_[i]
+                                                                , socks_[indexToPnid_[i]]
+                                                                , peer->p_sending
+                                                                , peer->p_receiving );
+                                                }
+                                            }
+                                            reconnectSeqNum_ = seqNum_;
+                                            reconnecting = true;
+                                            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                                            {
+                                                trace_printf( "%s@%d" " - Reconnecting!\n"
+                                                            , method_name, __LINE__ );
+                                            }
+                                            goto reconnected;
+                                        }
                                         continue;
                                     }
                                 }
                             }
                         }
 
-                        char buf[MON_STRING_BUF_SIZE];
-                        snprintf( buf, sizeof(buf)
-                                , "[%s@%d] Not heard from peer=%d (node=%s) "
-                                  "(current seq # is %lld)\n"
-                                , method_name
-                                ,  __LINE__
-                                , iPeer
-                                , Node[iPeer]->GetName()
-                                , seqNum_ );
-                        mon_log_write( MON_CLUSTER_ALLGATHERSOCK_1, SQ_LOG_CRIT, buf );
-
-                        stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
-                        err = MPI_ERR_IN_STATUS;
-                        if ( peer->p_sending )
-                        {
-                            peer->p_sending = false;
-                            nsent++;
-                        }
-                        if ( peer->p_receiving )
+                        if (trace_settings & TRACE_RECOVERY)
                         {
-                            peer->p_receiving = false;
-                            nrecv++;
+                            trace_printf( "%s@%d - err=%d, socks_[%d]=%d, stats[%d].MPI_ERROR=%s\n"
+                                        , method_name, __LINE__
+                                        , err
+                                        , indexToPnid_[iPeer]
+                                        , socks_[indexToPnid_[iPeer]]
+                                        , indexToPnid_[iPeer]
+                                        , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
                         }
 
-                        // setup the epoll structures 
-                        struct epoll_event event;
-                        event.data.fd = socks_[iPeer];
-                        int op = 0;
-                        if ( !peer->p_sending && !peer->p_receiving )
-                        {
-                            op = EPOLL_CTL_DEL;
-                            event.events = 0;
-                        }
-                        else if ( peer->p_sending )
-                        {
-                            op = EPOLL_CTL_MOD;
-                            event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
-                        }
-                        else if ( peer->p_receiving )
+                        if ( err == MPI_ERR_IN_STATUS
+                          && stats[indexToPnid_[iPeer]].MPI_ERROR == MPI_ERR_EXITED)
                         {
-                            op = EPOLL_CTL_MOD;
-                            event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
-                        }
-                        if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD )
-                        {
-                            EpollCtl( epollFD_, op, socks_[iPeer], &event );
+                            // At this point, this peer is not responding and
+                            // reconnects failed or its znode expired
+                            char buf[MON_STRING_BUF_SIZE];
+                            snprintf( buf, sizeof(buf)
+                                    , "[%s@%d] Not heard from peer=%d (node=%s) "
+                                      "(current seq # is %lld)\n"
+                                    , method_name
+                                    ,  __LINE__
+                                    , indexToPnid_[iPeer]
+                                    , Node[indexToPnid_[iPeer]]->GetName()
+                                    , seqNum_ );
+                            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
                         }
                     }
                 }
@@ -4651,35 +5072,43 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
         }
  
         if ( nw < 0 )
-        {
+        { // Got an error
             char ebuff[256];
             char buf[MON_STRING_BUF_SIZE];
             snprintf( buf, sizeof(buf), "[%s@%d] epoll_wait(%d, %d) error: %s\n",
                 method_name, __LINE__, epollFD_, maxEvents,
                 strerror_r( errno, ebuff, 256 ) );
-            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
+            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
             MPI_Abort( MPI_COMM_SELF,99 );
         }
+
+        // Process fd's which are ready to initiate an IO or completed IO
         for ( int iEvent = 0; iEvent < nw; iEvent++ )
         {
             bool stateChange = false;
             int fd = events[iEvent].data.fd;
             int iPeer;
-            for ( iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
-            {
-                if ( events[iEvent].data.fd == socks_[iPeer] ) break;
+            for ( iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
+            { // Find corresponding peer by matching socket fd
+                if ( events[iEvent].data.fd == socks_[indexToPnid_[iPeer]] ) break;
             }
-            if ( iPeer < 0 || iPeer >= GetConfigPNodesMax() || iPeer == MyPNID
-                || socks_[iPeer] == -1
-                || (!p[iPeer].p_sending && !p[iPeer].p_receiving) )
+            if ( indexToPnid_[iPeer] < 0 || indexToPnid_[iPeer] >= GetConfigPNodesMax() || indexToPnid_[iPeer] == MyPNID
+                || socks_[indexToPnid_[iPeer]] == -1
+                || (!p[indexToPnid_[iPeer]].p_sending && !p[indexToPnid_[iPeer]].p_receiving) )
             {
                 char buf[MON_STRING_BUF_SIZE];
-                snprintf( buf, sizeof(buf), "[%s@%d] invalid peer %d\n",
-                    method_name, __LINE__, iPeer );
-                mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
+                snprintf( buf, sizeof(buf)
+                        , "[%s@%d] Invalid peer %d, "
+                          "peer.p_sending=%d, "
+                          "peer.p_receiving=%d\n"
+                        , method_name, __LINE__
+                        , indexToPnid_[iPeer]
+                        , indexToPnid_[iPeer] >= GetConfigPNodesMax()?-1:p[indexToPnid_[iPeer]].p_sending
+                        , indexToPnid_[iPeer] >= GetConfigPNodesMax()?-1:p[indexToPnid_[iPeer]].p_receiving );
+                mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf );
                 MPI_Abort( MPI_COMM_SELF,99 );
             }
-            peer_t *peer = &p[iPeer];
+            peer_t *peer = &p[indexToPnid_[iPeer]];
             if ( (events[iEvent].events & EPOLLERR) ||
                  (events[iEvent].events & EPOLLHUP) ||
                  ( !(events[iEvent].events & (EPOLLIN|EPOLLOUT))) )
@@ -4690,13 +5119,13 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
                 snprintf( buf, sizeof(buf)
                         , "[%s@%d] Error: peer=%d, events[%d].data.fd=%d, event[%d]=%s\n"
                         , method_name, __LINE__
-                        , iPeer
+                        , indexToPnid_[iPeer]
                         , iEvent
                         , events[iEvent].data.fd
                         , iEvent
                         , EpollEventString(events[iEvent].events) );
-                mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf );
-                stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+                mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf );
+                stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
                 err = MPI_ERR_IN_STATUS;
                 if ( peer->p_sending )
                 {
@@ -4708,12 +5137,11 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
                     peer->p_receiving = false;
                     nrecv++;
                 }
-                stateChange = 1;
+                stateChange = true;
                 goto early_exit;
             }
-            if ( peer->p_receiving
-                && events[iEvent].events & EPOLLIN )
-            {
+            if ( peer->p_receiving && events[iEvent].events & EPOLLIN )
+            { // Got receive (read) completion
                 int eagain_ok = 0;
 read_again:
                 char *r = &peer->p_buff[peer->p_received];
@@ -4729,6 +5157,26 @@ read_again:
                 int nr;
                 while ( 1 )
                 {
+                    if (trace_settings & TRACE_SYNC_DETAIL)
+                    {
+                        trace_printf( "%s@%d - EPOLLIN from %s(%d),"
+                                      " sending=%d,"
+                                      " receiving=%d (%d)"
+                                      " sent=%d,"
+                                      " received=%d"
+                                      " timeout_count=%d,"
+                                      " initial_check=%d,"
+                                      " n2recv=%d\n"
+                                    , method_name, __LINE__
+                                    , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                    , peer->p_sending
+                                    , peer->p_receiving, n2get
+                                    , peer->p_sent
+                                    , peer->p_received
+                                    , peer->p_timeout_count
+                                    , peer->p_initial_check
+                                    , peer->p_n2recv );
+                    }
                     nr = recv( fd, r, n2get, 0 );
                     if ( nr >= 0 || errno == EINTR ) break;
                 }
@@ -4746,8 +5194,8 @@ read_again:
                         snprintf( buf, sizeof(buf)
                                 , "[%s@%d] recv[%d](%d) error %d (%s)\n"
                                 , method_name, __LINE__
-                                , iPeer, nr , err, strerror(err) );
-                        mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf );
+                                , indexToPnid_[iPeer], nr , err, strerror(err) );
+                        mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf );
                         peer->p_receiving = false;
                         nrecv++;
                         if ( peer->p_sending )
@@ -4755,7 +5203,7 @@ read_again:
                             peer->p_sending = false;
                             nsent++;
                         }
-                        stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+                        stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
                         err = MPI_ERR_IN_STATUS;
                         stateChange = true;
                     }
@@ -4792,7 +5240,7 @@ read_again:
                             snprintf( buf, sizeof(buf),
                                 "[%s@%d] error n2recv %d\n",
                                 method_name, __LINE__, peer->p_n2recv );
-                            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf );
+                            mon_log_write( MON_CLUSTER_ALLGATHERSOCK_7, SQ_LOG_CRIT, buf );
                             MPI_Abort( MPI_COMM_SELF,99 );
                         }
                         if ( peer->p_n2recv == 0 )
@@ -4800,20 +5248,59 @@ read_again:
                             // this buffer is done
                             peer->p_receiving = false;
                             nrecv++;
-                            stats[iPeer].count = peer->p_received;
+                            stats[indexToPnid_[iPeer]].count = peer->p_received;
+                            if (trace_settings & TRACE_SYNC_DETAIL)
+                            {
+                                trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+                                              " sending=%d,"
+                                              " receiving=%d (%d)"
+                                              " sent=%d,"
+                                              " received=%d"
+                                              " timeout_count=%d,"
+                                              " initial_check=%d,"
+                                              " n2recv=%d\n"
+                                            , method_name, __LINE__
+                                            , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                            , peer->p_sending
+                                            , peer->p_receiving, n2get
+                                            , peer->p_sent
+                                            , peer->p_received
+                                            , peer->p_timeout_count
+                                            , peer->p_initial_check
+                                            , peer->p_n2recv );
+                            }
                             stateChange = true;
                         }
                     }
                 }
             }
-            if ( peer->p_sending
-                && events[iEvent].events & EPOLLOUT )
-            {
+            if ( peer->p_sending  && events[iEvent].events & EPOLLOUT )
+            { // Got send (write) completion
                 char *s = &((char *)sbuf)[peer->p_sent];
                 int n2send = nbytes - peer->p_sent;
                 int ns;
                 while ( 1 )
                 {
+                    if (trace_settings & TRACE_SYNC_DETAIL)
+                    {
+                        trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+                                      " sending=%d (%d),"
+                                      " receiving=%d"
+                                      " sent=%d,"
+                                      " received=%d"
+                                      " timeout_count=%d,"
+                                      " initial_check=%d,"
+                                      " n2recv=%d\n"
+                                    , method_name, __LINE__
+                                    , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                    , peer->p_sending, n2send
+                                    , peer->p_receiving
+                                    , peer->p_sent
+                                    , peer->p_received
+                                    , peer->p_timeout_count
+                                    , peer->p_initial_check
+                                    , peer->p_n2recv );
+                    }
                     ns = send( fd, s, n2send, 0 );
                     if ( ns >= 0 || errno != EINTR ) break;
                 }
@@ -4825,8 +5312,8 @@ read_again:
                     snprintf( buf, sizeof(buf)
                             , "[%s@%d] send[%d](%d) error=%d (%s)\n"
                             , method_name, __LINE__
-                            , iPeer, ns, err, strerror(err) );
-                    mon_log_write( MON_CLUSTER_ALLGATHERSOCK_7, SQ_LOG_CRIT, buf );
+                            , indexToPnid_[iPeer], ns, err, strerror(err) );
+                    mon_log_write( MON_CLUSTER_ALLGATHERSOCK_8, SQ_LOG_CRIT, buf );
                     peer->p_sending = false;
                     nsent++;
                     if ( peer->p_receiving )
@@ -4834,7 +5321,7 @@ read_again:
                         peer->p_receiving = false;
                         nrecv++;
                     }
-                    stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+                    stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
                     err = MPI_ERR_IN_STATUS;
                     stateChange = true;
                 }
@@ -4846,6 +5333,26 @@ read_again:
                         // finished sending to this destination
                         peer->p_sending = false;
                         nsent++;
+                        if (trace_settings & TRACE_SYNC_DETAIL)
+                        {
+                            trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+                                          " sending=%d (%d),"
+                                          " receiving=%d"
+                                          " sent=%d,"
+                                          " received=%d"
+                                          " timeout_count=%d,"
+                                          " initial_check=%d,"
+                                          " n2recv=%d\n"
+                                        , method_name, __LINE__
+                                        , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+                                        , peer->p_sending, n2send
+                                        , peer->p_receiving
+                                        , peer->p_sent
+                                        , peer->p_received
+                                        , peer->p_timeout_count
+                                        , peer->p_initial_check
+                                        , peer->p_n2recv );
+                        }
                         stateChange = true;
                     }
                 }
@@ -4854,7 +5361,7 @@ early_exit:
             if ( stateChange )
             {
                 struct epoll_event event;
-                event.data.fd = socks_[iPeer];
+                event.data.fd = socks_[indexToPnid_[iPeer]];
                 int op = 0;
                 if ( !peer->p_sending && !peer->p_receiving )
                 {
@@ -4888,49 +5395,465 @@ early_exit:
     return err;
 }
 
-// When we get a communication error for a point-to-point monitor communicator
-// verify that the other nodes in the cluster also lost communications
-// with that monitor.  If all nodes concur we consider that monitor
-// down.
-void CCluster::ValidateClusterState( cluster_state_def_t nodestate[],
-                                     bool haveDivergence)
+int CCluster::AllgatherSockReconnect( MPI_Status *stats )
 {
-    const char method_name[] = "CCluster::ValidateClusterState";
-
-    exitedMons_t::iterator it;
-    upNodes_t nodeMask;
+    const char method_name[] = "CCluster::AllgatherSockReconnect";
+    TRACE_ENTRY;
 
-    for ( int i =0; i < MAX_NODE_MASKS ; i++ )
-    {
-        nodeMask.upNodes[i] = 0;
-    }
+    int err = MPI_SUCCESS;
+    int idst;
+    int reconnectSock = -1;
+    CNode *node;
 
-    for ( it = exitedMons_.begin(); it != exitedMons_.end(); )
+    // Loop on each node in the cluster
+    for ( int i = 0; i < GetConfigPNodesMax(); i++ )
     {
-        if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT))
-        {
-            trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld"
-                         " (current seqNum_=%lld)\n", method_name, __LINE__,
-                         it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_);
-        }
-
-        if ( seqNum_ >= (it->seqNum + 2) )
+        // Loop on each adjacent node in the cluster
+        for ( int j = i+1; j < GetConfigPNodesMax(); j++ )
         {
-            char buf[MON_STRING_BUF_SIZE];
-            snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, "
-                      "detected by node %d at seq #%lld "
-                      "(current seq # is %lld).\n",
-                      method_name, it->exitedPnid, it->detectingPnid,
-                      it->seqNum, seqNum_);
-            mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf);
-
-            int concurringNodes = 0;
+            if ( i == MyPNID )
+            { // Current [i] node is my node, so connect to [j] node
 
-            // Check if all active nodes see the node as down.
-            nodeMask.upNodes[it->exitedPnid/MAX_NODE_BITMASK] = 1ull << (it->exitedPnid%MAX_NODE_BITMASK);
-            string setSeesUp;
-            string setSeesDown;
-            char nodeX[10];
+                idst = j;
+                node = Nodes->GetNode( idst );
+                if (!node) continue;
+                if (node->GetState() != State_Up)
+                {
+                    if (socks_[idst] != -1)
+                    { // Peer socket is still active
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                          "removing old socket from epoll set, "
+                                          "socks_[%d]=%d\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst, socks_[idst] );
+                        }
+                        // Remove old socket from epoll set, it may not be there
+                        struct epoll_event event;
+                        event.data.fd = socks_[idst];
+                        event.events = 0;
+                        EpollCtlDelete( epollFD_, socks_[idst], &event );
+                        socks_[idst] = -1;
+                    }
+                    continue;
+                }
+                reconnectSock = ConnectSockPeer( node, idst );
+                if (reconnectSock == -1)
+                {
+                    stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+                    stats[idst].count = 0;
+                    err = MPI_ERR_IN_STATUS;
+                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                    {
+                        trace_printf( "%s@%d - Setting Node %s (%d) status to "
+                                      "stats[%d].MPI_ERROR=%s\n"
+                                    , method_name, __LINE__
+                                    , node->GetName(), node->GetPNid()
+                                    , idst
+                                    , ErrorMsg(stats[idst].MPI_ERROR) );
+                    }
+                    if (node->GetState() != State_Up)
+                    {
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                          "removing old socket from epoll set, "
+                                          "socks_[%d]=%d\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst, socks_[idst] );
+                        }
+                        // Remove old socket from epoll set, it may not be there
+                        struct epoll_event event;
+                        event.data.fd = socks_[idst];
+                        event.events = 0;
+                        EpollCtlDelete( epollFD_, socks_[idst], &event );
+                        socks_[idst] = -1;
+                    }
+                }
+            }
+            else if ( j == MyPNID )
+            { // Current [j] is my node, accept connection from peer [i] node
+
+                idst = i;
+                node = Nodes->GetNode( idst );
+                if (!node) continue;
+                if (node->GetState() != State_Up)
+                {
+                    if (socks_[idst] != -1)
+                    { // Peer socket is still active
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                          "removing old socket from epoll set, "
+                                          "socks_[%d]=%d\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst, socks_[idst] );
+                        }
+                        // Remove old socket from epoll set, it may not be there
+                        struct epoll_event event;
+                        event.data.fd = socks_[idst];
+                        event.events = 0;
+                        EpollCtlDelete( epollFD_, socks_[idst], &event );
+                        socks_[idst] = -1;
+                    }
+                    continue;
+                }
+                if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                {
+                    trace_printf( "%s@%d - Pinging Node %s (%d) to see if it's up\n"
+                                , method_name, __LINE__
+                                , node->GetName(), node->GetPNid() );
+                }
+                if (PingSockPeer(node))
+                {
+                    reconnectSock = AcceptSockPeer( node, idst );
+                    if (reconnectSock == -1)
+                    {
+                        stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+                        stats[idst].count = 0;
+                        err = MPI_ERR_IN_STATUS;
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Setting Node %s (%d) status to "
+                                          "stats[%d].MPI_ERROR=%s\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst
+                                        , ErrorMsg(stats[idst].MPI_ERROR) );
+                        }
+                        if (node->GetState() != State_Up)
+                        {
+                            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                            {
+                                trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                              "removing old socket from epoll set, "
+                                              "socks_[%d]=%d\n"
+                                            , method_name, __LINE__
+                                            , node->GetName(), node->GetPNid()
+                                            , idst, socks_[idst] );
+                            }
+                            // Remove old socket from epoll set, it may not be there
+                            struct epoll_event event;
+                            event.data.fd = socks_[idst];
+                            event.events = 0;
+                            EpollCtlDelete( epollFD_, socks_[idst], &event );
+                            socks_[idst] = -1;
+                        }
+                    }
+                }
+                else
+                {
+                    if (socks_[idst] != -1)
+                    {
+                        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                        {
+                            trace_printf( "%s@%d - Node %s (%d) is not up, "
+                                          "removing old socket from epoll set, "
+                                          "socks_[%d]=%d\n"
+                                        , method_name, __LINE__
+                                        , node->GetName(), node->GetPNid()
+                                        , idst, socks_[idst] );
+                        }
+                        // Remove old socket from epoll set, it may not be there
+                        struct epoll_event event;
+                        event.data.fd = socks_[idst];
+                        event.events = 0;
+                        EpollCtlDelete( epollFD_, socks_[idst], &event );
+                    }
+                    reconnectSock = -1;
+                    stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+                    stats[idst].count = 0;
+                    err = MPI_ERR_IN_STATUS;
+                    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+                    {
+                        trace_printf( "%s@%d - Setting Node %s (%d) status to "
+                                      "stats[%d].MPI_ERROR=%s\n"
+                                    , method_name, __LINE__
+                                    , node->GetName(), node->GetPNid()
+                                    , idst
+                                    , ErrorMsg(stats[idst].MPI_ERROR) );
+                    }
+                }
+            }
+            else
+            {
+                idst = -1;
+            }
+            if ( idst >= 0 
+              && reconnectSock != -1
+              && fcntl( socks_[idst], F_SETFL, O_NONBLOCK ) )
+            {
+                err = MPI_ERR_AMODE;
+                char ebuff[256];
+                char buf[MON_STRING_BUF_SIZE];
+                snprintf( buf, sizeof(buf), "[%s@%d] fcntl(NONBLOCK) error: %s\n",
+                    method_name, __LINE__, strerror_r( errno, ebuff, 256 ) );
+                mon_log_write( MON_CLUSTER_ALLGATHERSOCKRECONN_1, SQ_LOG_CRIT, buf );
+            }
+        }
+    }
+
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+        {
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d" " - socks_[%d]=%d, "
+                              "stats[%d].MPI_ERROR=%s\n"
+                            , method_name, __LINE__
+                            , indexToPnid_[i]
+                            , socks_[indexToPnid_[i]]
+                            , indexToPnid_[i]
+                            , ErrorMsg(stats[indexToPnid_[i]].MPI_ERROR) );
+            }
+        }
+        trace_printf( "%s@%d - Returning err=%d\n"
+                    , method_name, __LINE__, err );
+    }
+
+    TRACE_EXIT;
+    return( err );
+}
+
+int CCluster::AcceptSockPeer( CNode *node, int peer )
+{
+    const char method_name[] = "CCluster::AcceptSockPeer";
+    TRACE_ENTRY;
+
+    int rc = MPI_SUCCESS;
+    int reconnectSock = -1;
+    unsigned char srcaddr[4];
+    struct hostent *he;
+
+    // Get my host structure via my node name
+    he = gethostbyname( MyNode->GetName() );
+    if ( !he )
+    {
+        char ebuff[256];
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s@%d] gethostbyname(%s) error: %s\n"
+                , method_name, __LINE__
+                , MyNode->GetName()
+                , strerror_r( errno, ebuff, 256 ) );
+        mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_1, SQ_LOG_CRIT, buf );
+        abort();
+    }
+    else
+    {
+        if (trace_settings & TRACE_RECOVERY)
+        {
+            trace_printf( "%s@%d Accepting server socket: from %s(%d), src=%d.%d.%d.%d, port=%d\n"
+                        , method_name, __LINE__
+                        , node->GetName(), node->GetPNid()
+                        , (int)((unsigned char *)srcaddr)[0]
+                        , (int)((unsigned char *)srcaddr)[1]
+                        , (int)((unsigned char *)srcaddr)[2]
+                        , (int)((unsigned char *)srcaddr)[3]
+                        , MyNode->GetSyncSocketPort() );
+        }
+
+        // Accept connection from peer
+        reconnectSock = AcceptSock( syncSock_ );
+        if (reconnectSock != -1)
+        {
+            if (trace_settings & TRACE_RECOVERY)
+            {
+                trace_printf( "%s@%d Server %s(%d) accepted from client %s(%d), old socks_[%d]=%d, new socks_[%d]=%d\n"
+                            , method_name, __LINE__
+                            , MyNode->GetName(), MyPNID
+                            , node->GetName(), node->GetPNid()
+                            , peer, socks_[peer]
+                            , peer, reconnectSock);
+            }
+        }
+        else
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf), "[%s@%d] AcceptSock(%d) failed!\n",
+                method_name, __LINE__, syncSock_ );
+            mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_2, SQ_LOG_ERR, buf );
+            rc = -1;
+        }
+
+        if (socks_[peer] != -1)
+        {
+            // Remove old socket from epoll set, it may not be there
+            struct epoll_event event;
+            event.data.fd = socks_[peer];
+            event.events = 0;
+            EpollCtlDelete( epollFD_, socks_[peer], &event );
+        }
+        if (reconnectSock != -1)
+        {
+            socks_[peer] = reconnectSock;
+        }
+    }
+
+    TRACE_EXIT;
+    return rc;
+}
+
+int CCluster::ConnectSockPeer( CNode *node, int peer )
+{
+    const char method_name[] = "CCluster::ConnectSockPeer";
+    TRACE_ENTRY;
+
+    int rc = MPI_SUCCESS;
+    int reconnectSock = -1;
+    unsigned char srcaddr[4], dstaddr[4];
+    struct hostent *he;
+
+    // Get my host structure via my node name
+    he = gethostbyname( MyNode->GetName() );
+    if ( !he )
+    {
+        char ebuff[256];
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s@%d] gethostbyname(%s) error: %s\n"
+                , method_name, __LINE__
+                , MyNode->GetName()
+                , strerror_r( errno, ebuff, 256 ) );
+        mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_1, SQ_LOG_CRIT, buf );
+        abort();
+    }
+    else
+    {
+        // Initialize my source address structure
+        memcpy( srcaddr, he->h_addr, 4 );
+        // Get peer's host structure via its node name 
+        he = gethostbyname( node->GetName() );
+        if ( !he )
+        {
+            char ebuff[256];
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf),
+                "[%s@%d] gethostbyname(%s) error: %s\n",
+                method_name, __LINE__, node->GetName(),
+                strerror_r( errno, ebuff, 256 ) );
+            mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_2, SQ_LOG_CRIT, buf );
+            abort();
+        }
+        // Initialize peer's destination address structure
+        memcpy( dstaddr, he->h_addr, 4 );
+
+        if (trace_settings & TRACE_RECOVERY)
+        {
+            trace_printf( "%s@%d Creating client socket: src=%d.%d.%d.%d, "
+                          "dst(%s)=%d.%d.%d.%d, dst port=%d\n"
+                        , method_name, __LINE__
+                        , (int)((unsigned char *)srcaddr)[0]
+                        , (int)((unsigned char *)srcaddr)[1]
+                        , (int)((unsigned char *)srcaddr)[2]
+                        , (int)((unsigned char *)srcaddr)[3]
+                        ,  node->GetName()
+                        , (int)((unsigned char *)dstaddr)[0]
+                        , (int)((unsigned char *)dstaddr)[1]
+                        , (int)((unsigned char *)dstaddr)[2]
+                        , (int)((unsigned char *)dstaddr)[3]
+                        , sockPorts_[peer] );
+        }
+        // Connect to peer
+        reconnectSock = MkCltSock( srcaddr, dstaddr, sockPorts_[peer] );
+        if (reconnectSock != -1)
+        {
+            if (trace_settings & TRACE_RECOVERY)
+            {
+                trace_printf( "%s@%d Client %s(%d) connected to server %s(%d), old socks_[%d]=%d, new socks_[%d]=%d\n"
+                            , method_name, __LINE__
+                            , MyNode->GetName(), MyPNID
+                            , node->GetName(), node->GetPNid()
+                            , peer, socks_[peer]
+                            , peer, reconnectSock);
+            }
+        }
+        else
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf)
+                    , "[%s@%d] MkCltSock() src=%d.%d.%d.%d, "
+                      "dst(%s)=%d.%d.%d.%d failed!\n"
+                    , method_name, __LINE__
+                    , (int)((unsigned char *)srcaddr)[0]
+                    , (int)((unsigned char *)srcaddr)[1]
+                    , (int)((unsigned char *)srcaddr)[2]
+                    , (int)((unsigned char *)srcaddr)[3]
+                    ,  node->GetName()
+                    , (int)((unsigned char *)dstaddr)[0]
+                    , (int)((unsigned char *)dstaddr)[1]
+                    , (int)((unsigned char *)dstaddr)[2]
+                    , (int)((unsigned char *)dstaddr)[3] );
+            mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_3, SQ_LOG_ERR, buf );
+            rc = -1;
+        }
+
+        if (socks_[peer] != -1)
+        {
+            // Remove old socket from epoll set, it may not be there
+            struct epoll_event event;
+            event.data.fd = socks_[peer];
+            event.events = 0;
+            EpollCtlDelete( epollFD_, socks_[peer], &event );
+        }
+        if (reconnectSock != -1)
+        {
+            socks_[peer] = reconnectSock;
+        }
+    }
+
+    TRACE_EXIT;
+    return( rc );
+}
+
+// When we get a communication error for a point-to-point monitor communicator
+// verify that the other nodes in the cluster also lost communications
+// with that monitor.  If all nodes concur we consider that monitor
+// down.
+void CCluster::ValidateClusterState( cluster_state_def_t nodestate[],
+                                     bool haveDivergence)
+{
+    const char method_name[] = "CCluster::ValidateClusterState";
+
+    exitedMons_t::iterator it;
+    upNodes_t nodeMask;
+
+    for ( int i =0; i < MAX_NODE_MASKS ; i++ )
+    {
+        nodeMask.upNodes[i] = 0;
+    }
+
+    for ( it = exitedMons_.begin(); it != exitedMons_.end(); )
+    {
+        if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT))
+        {
+            trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld"
+                         " (current seqNum_=%lld)\n", method_name, __LINE__,
+                         it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_);
+        }
+
+        if ( seqNum_ >= (it->seqNum + 2) )
+        {
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, "
+                      "detected by node %d at seq #%lld "
+                      "(current seq # is %lld).\n",
+                      method_name, it->exitedPnid, it->detectingPnid,
+                      it->seqNum, seqNum_);
+            mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf);
+
+            int concurringNodes = 0;
+
+            // Check if all active nodes see the node as down.
+            nodeMask.upNodes[it->exitedPnid/MAX_NODE_BITMASK] = 1ull << (it->exitedPnid%MAX_NODE_BITMASK);
+            string setSeesUp;
+            string setSeesDown;
+            char nodeX[10];
 
             // Evaluate each active (up) node in the cluster
             int pnodesCount = 0;
@@ -5187,8 +6110,10 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
     const char method_name[] = "CCluster::ValidateSeqNum";
 
     unsigned long long seqNum;
-    unsigned long long seqNumBucket[256];
-    int seqNumCount[256];
+    unsigned long long loSeqNum = seqNum_;
+    unsigned long long hiSeqNum = seqNum_;
+    unsigned long long seqNumBucket[MAX_NODES];
+    int seqNumCount[MAX_NODES];
     int maxBucket = 0;
     bool found;
     int mostCountsIndex;
@@ -5198,10 +6123,26 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
     // Count occurrences of sequence numbers from other nodes
     for (int pnid = 0; pnid < GetConfigPNodesMax(); pnid++)
     {
+        CNode *node= Nodes->GetNode( pnid );
+        if (!node) continue;
+        if (node->GetState() != State_Up) continue;
+        
         seqNum = nodestate[pnid].seq_num;
 
+        if (trace_settings & TRACE_SYNC)
+        {
+            trace_printf( "%s@%d seqNum_=%lld, nodestate[%d].seq_num=%lld\n"
+                        , method_name, __LINE__
+                        , seqNum_
+                        , pnid
+                        , nodestate[pnid].seq_num );
+        }
+
         if (seqNum != 0)
         {
+            loSeqNum = (seqNum < loSeqNum) ? seqNum : loSeqNum;
+            hiSeqNum = (seqNum > hiSeqNum) ? seqNum : hiSeqNum;
+    
             found = false;
             for (int i=0; i<maxBucket; ++i)
             {
@@ -5239,14 +6180,22 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
         }
     }
 
+    lowSeqNum_  = loSeqNum;
+    highSeqNum_ = hiSeqNum;
+    
     if (trace_settings & TRACE_SYNC)
     {
         if ( seqNum_ != seqNumBucket[mostCountsIndex] )
         {
-            trace_printf("%s@%d Most common seq num=%lld (%d nodes), %d buckets"
-                         ", local seq num (%lld) did not match.\n",
-                         method_name, __LINE__, seqNumBucket[mostCountsIndex],
-                         seqNumCount[mostCountsIndex], maxBucket, seqNum_);
+            trace_printf( "%s@%d Most common seq num=%lld (%d nodes), "
+                          "%d buckets, low=%lld, high=%lld, local seq num (%lld) did not match.\n"
+                         , method_name, __LINE__
+                         , seqNumBucket[mostCountsIndex]
+                         , seqNumCount[mostCountsIndex]
+                         , maxBucket
+                         , lowSeqNum_
+                         , highSeqNum_
+                         , seqNum_ );
         }
     }
 
@@ -5268,7 +6217,7 @@ void CCluster::HandleDownNode( int pnid )
         trace_printf("%s@%d - Added down node to list, pnid=%d, name=(%s)\n", method_name, __LINE__, downNode->GetPNid(), downNode->GetName());
 
     // assign new TmLeader if TMLeader node is dead.
-    AssignTmLeader(pnid);
+    AssignTmLeader(pnid, false);
 
     // Build available list of spare nodes
     CNode *spareNode;
@@ -5373,6 +6322,7 @@ void CCluster::UpdateClusterState( bool &doShutdown,
     TRACE_ENTRY;
 
     struct sync_buffer_def *recvBuf;
+    struct sync_buffer_def *sendBuf = Nodes->GetSyncBuffer();
     STATE node_state;
     int change_nid;
     cluster_state_def_t nodestate[GetConfigPNodesMax()];
@@ -5430,13 +6380,14 @@ void CCluster::UpdateClusterState( bool &doShutdown,
         recvBuf = (struct sync_buffer_def *)
             (((char *) syncBuf) + index * CommBufSize);
 
-        if (trace_settings & TRACE_SYNC_DETAIL)
+        if (trace_settings & TRACE_SYNC)
         {
             int nr;
             MPI_Get_count(&status[index], MPI_CHAR, &nr);
             trace_printf("%s@%d - Received %d bytes from node %d, "
-                         "message count=%d\n",
+                         ", seq_num=%lld, message count=%d\n",
                          method_name, __LINE__, nr, index,
+                         recvBuf->nodeInfo.seq_num,
                          recvBuf->msgInfo.msg_count);
         }
 
@@ -5532,23 +6483,23 @@ void CCluster::UpdateClusterState( bool &doShutdown,
             Node[index]->SetState( State_Down );
             --CurNodes;
             // Clear bit in set of "up nodes"
-            upNodes_.upNodes[index/MAX_NODE_BITMASK] &= 
-                ~(1ull << (index%MAX_NODE_BITMASK));
+            upNodes_.upNodes[index/MAX_NODE_BITMASK] &= ~(1ull << (index%MAX_NODE_BITMASK));
         }
     }
 
-    if ( checkSeqNum_ && !ValidateSeqNum( nodestate ) && !enqueuedDown_ )
+    if ( (checkSeqNum_ || reconnectSeqNum_ != 0)
+      && !ValidateSeqNum( nodestate ) 
+      && !enqueuedDown_ )
     {
-        char buf[MON_STRING_BUF_SIZE];
-        snprintf(buf, sizeof(buf), "[%s] Sync cycle sequence number (%lld) "
-                 "incorrect.  Scheduling node down.\n", method_name, seqNum_);
-        mon_log_write(MON_CLU

<TRUNCATED>


[4/4] incubator-trafodion git commit: Merge [TRAFODION-2651] PR-1233 Fixed monitor-to-monitor communication by adding

Posted by su...@apache.org.
Merge [TRAFODION-2651] PR-1233 Fixed monitor-to-monitor communication by adding


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/aeb9ef22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/aeb9ef22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/aeb9ef22

Branch: refs/heads/master
Commit: aeb9ef223e7c1d8f6341a2441abf09dd28f6d466
Parents: e2789f9 5e09a1e
Author: Suresh Subbiah <su...@apache.org>
Authored: Tue Sep 19 02:54:10 2017 +0000
Committer: Suresh Subbiah <su...@apache.org>
Committed: Tue Sep 19 02:54:10 2017 +0000

----------------------------------------------------------------------
 .../export/include/common/evl_sqlog_eventnum.h  |   28 +-
 core/sqf/monitor/linux/cluster.cxx              | 1851 +++++++++++++++---
 core/sqf/monitor/linux/cluster.h                |   38 +-
 core/sqf/monitor/linux/commaccept.cxx           |  126 +-
 core/sqf/monitor/linux/internal.h               |    3 +
 core/sqf/monitor/linux/msgdef.h                 |    2 +
 core/sqf/monitor/linux/pnode.cxx                |   20 +-
 core/sqf/monitor/linux/pnode.h                  |    3 +
 core/sqf/monitor/linux/reqtmleader.cxx          |   19 +
 core/sqf/monitor/linux/shell.cxx                |   26 +-
 core/sqf/monitor/linux/tmsync.cxx               |   14 +-
 core/sqf/monitor/linux/tmsync.h                 |    2 +-
 core/sqf/monitor/linux/zclient.cxx              |    8 +
 core/sqf/sqenvcom.sh                            |   11 +-
 14 files changed, 1805 insertions(+), 346 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-trafodion git commit: [TRAFODION-2651] Fixed monitor-to-monitor communication by adding reconnect logic to better handle various communication glitches which previously resulted in false node down situations.

Posted by su...@apache.org.
[TRAFODION-2651] Fixed monitor-to-monitor communication by adding reconnect
logic to better handle various communication glitches which previously
resulted in false node down situations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/5e09a1e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/5e09a1e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/5e09a1e3

Branch: refs/heads/master
Commit: 5e09a1e3a41d81394a0beee7eb67f7475a344daf
Parents: cd54195
Author: Zalo Correa <za...@esgyn.com>
Authored: Fri Sep 15 15:39:06 2017 -0700
Committer: Zalo Correa <za...@esgyn.com>
Committed: Fri Sep 15 15:39:06 2017 -0700

----------------------------------------------------------------------
 .../export/include/common/evl_sqlog_eventnum.h  |   28 +-
 core/sqf/monitor/linux/cluster.cxx              | 1851 +++++++++++++++---
 core/sqf/monitor/linux/cluster.h                |   38 +-
 core/sqf/monitor/linux/commaccept.cxx           |  126 +-
 core/sqf/monitor/linux/internal.h               |    3 +
 core/sqf/monitor/linux/msgdef.h                 |    2 +
 core/sqf/monitor/linux/pnode.cxx                |   20 +-
 core/sqf/monitor/linux/pnode.h                  |    3 +
 core/sqf/monitor/linux/reqtmleader.cxx          |   19 +
 core/sqf/monitor/linux/shell.cxx                |   26 +-
 core/sqf/monitor/linux/tmsync.cxx               |   14 +-
 core/sqf/monitor/linux/tmsync.h                 |    2 +-
 core/sqf/monitor/linux/zclient.cxx              |    8 +
 core/sqf/sqenvcom.sh                            |   11 +-
 14 files changed, 1805 insertions(+), 346 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/export/include/common/evl_sqlog_eventnum.h
----------------------------------------------------------------------
diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h
index dbf8d92..c8b4d59 100644
--- a/core/sqf/export/include/common/evl_sqlog_eventnum.h
+++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h
@@ -143,6 +143,7 @@
 #define MON_CLUSTER_ALLGATHERSOCK_5         101013305
 #define MON_CLUSTER_ALLGATHERSOCK_6         101013306
 #define MON_CLUSTER_ALLGATHERSOCK_7         101013307
+#define MON_CLUSTER_ALLGATHERSOCK_8         101013308
 #define MON_CLUSTER_EPOLLCTL_1              101013401
 #define MON_CLUSTER_INITCLUSTERSOCKS_1      101013501
 #define MON_CLUSTER_INITCLUSTERSOCKS_2      101013502
@@ -158,6 +159,10 @@
 #define MON_CLUSTER_MKSRVSOCK_2             101013702
 #define MON_CLUSTER_MKSRVSOCK_3             101013703
 #define MON_CLUSTER_MKSRVSOCK_4             101013704
+#define MON_CLUSTER_MKSRVSOCK_5             101013705
+#define MON_CLUSTER_MKSRVSOCK_6             101013706
+#define MON_CLUSTER_MKSRVSOCK_7             101013707
+#define MON_CLUSTER_MKSRVSOCK_8             101013708
 #define MON_CLUSTER_MKCLTSOCK_1             101013801
 #define MON_CLUSTER_MKCLTSOCK_2             101013802
 #define MON_CLUSTER_MKCLTSOCK_3             101013803
@@ -167,6 +172,9 @@
 #define MON_CLUSTER_MKCLTSOCK_7             101013807
 #define MON_CLUSTER_MKCLTSOCK_8             101013808
 #define MON_CLUSTER_MKCLTSOCK_9             101013809
+#define MON_CLUSTER_MKCLTSOCK_10            101013810
+#define MON_CLUSTER_MKCLTSOCK_11            101013811
+#define MON_CLUSTER_MKCLTSOCK_12            101013812
 #define MON_CLUSTER_CONNECT_1               101013901
 #define MON_CLUSTER_CONNECT_2               101013902
 #define MON_CLUSTER_CONNECT_3               101013903
@@ -190,8 +198,23 @@
 #define MON_CLUSTER_SETKEEPALIVESOCKOPT_2   101014502
 #define MON_CLUSTER_SETKEEPALIVESOCKOPT_3   101014503
 #define MON_CLUSTER_SETKEEPALIVESOCKOPT_4   101014504
-#define MON_CLUSTER_HARDNODEUP_1            101014601
-#define MON_CLUSTER_NO_LICENSE_VERIFIERS    101015001
+#define MON_CLUSTER_NO_LICENSE_VERIFIERS    101014601
+
+#define MON_CLUSTER_ALLGATHERSOCKRECONN_1   101014701
+
+#define MON_CLUSTER_HARDNODEUP_1            101014801
+
+#define MON_CLUSTER_ACCEPTSOCKPEER_1        101014901
+#define MON_CLUSTER_ACCEPTSOCKPEER_2        101014902
+
+#define MON_CLUSTER_CONNECTSOCKPEER_1       101015001
+#define MON_CLUSTER_CONNECTSOCKPEER_2       101015002
+#define MON_CLUSTER_CONNECTSOCKPEER_3       101015003
+
+#define MON_CLUSTER_EPOLLCTLDELETE_1        101015101
+
+#define MON_PINGSOCKPEER_1                  101015201
+#define MON_PINGSOCKPEER_2                  101015202
 
 /* Module: monitor.cxx = 02 */
 
@@ -800,6 +823,7 @@
 #define MON_COMMACCEPT_16                   101320116
 #define MON_COMMACCEPT_17                   101320117
 #define MON_COMMACCEPT_18                   101320118
+#define MON_COMMACCEPT_19                   101320119
 
 /* Module: reqnodedown.cxx = 33 */
 #define MON_EXT_NODEDOWN_REQ                101330101