You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2017/09/19 02:54:39 UTC
[2/4] incubator-trafodion git commit: [TRAFODION-2651] Fixed
monitor-to-monitor communication by adding reconnect logic to better handle
various communication glitches which previously resulted in false node down
situations.
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/cluster.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx
index 585ff0a..c65116b 100644
--- a/core/sqf/monitor/linux/cluster.cxx
+++ b/core/sqf/monitor/linux/cluster.cxx
@@ -361,13 +361,14 @@ void CCluster::NodeReady( CNode *spareNode )
// Assigns a new TMLeader if given pnid is same as TmLeaderNid
// TmLeader is a logical node num.
// pnid has gone down, so if that node was previously the TM leader, a new one needs to be chosen.
-void CCluster::AssignTmLeader(int pnid)
+void CCluster::AssignTmLeader( int pnid, bool checkProcess )
{
const char method_name[] = "CCluster::AssignTmLeader";
TRACE_ENTRY;
int i = 0;
CNode *node = NULL;
+ CProcess *process = NULL;
int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid();
@@ -375,24 +376,50 @@ void CCluster::AssignTmLeader(int pnid)
{
node = LNode[TmLeaderNid]->GetNode();
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+ if (checkProcess)
{
- trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, isSoftNodeDown=%d\n"
- , method_name, __LINE__
- , node->GetPNid()
- , node->GetName()
- , NodePhaseString(node->GetPhase())
- , node->IsSoftNodeDown());
+ process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM );
+ if (process)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+ {
+ if (node)
+ trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, "
+ "isSoftNodeDown=%d, checkProcess=%d\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , NodePhaseString(node->GetPhase())
+ , node->IsSoftNodeDown()
+ , checkProcess );
+ }
+ return;
+ }
+ }
+ else
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+ {
+ if (node)
+ trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, "
+ "isSoftNodeDown=%d, checkProcess=%d\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , NodePhaseString(node->GetPhase())
+ , node->IsSoftNodeDown()
+ , checkProcess );
+ }
+ return;
}
-
- return;
}
node = Node[TmLeaderPNid];
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
{
- trace_printf("%s@%d" " - Node " "%d" " TmLeader failed." "\n", method_name, __LINE__, TmLeaderNid);
+ trace_printf( "%s@%d" " - Node " "%d" " TmLeader failed! (checkProcess=%d)\n"
+ , method_name, __LINE__, TmLeaderNid, checkProcess );
}
for (i=0; i<GetConfigPNodesMax(); i++)
@@ -436,6 +463,15 @@ void CCluster::AssignTmLeader(int pnid)
TmLeaderNid = node->GetFirstLNode()->GetNid();
+ if (checkProcess)
+ {
+ process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM );
+ if (!process)
+ {
+ continue; // skip this node no DTM process exists
+ }
+ }
+
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
{
trace_printf("%s@%d" " - Node " "%d" " is the new TmLeader." "\n", method_name, __LINE__, TmLeaderNid);
@@ -483,7 +519,11 @@ CCluster::CCluster (void)
integratingPNid_(-1),
joinComm_(MPI_COMM_NULL),
joinSock_(-1),
- seqNum_(0),
+ lastSeqNum_(0),
+ lowSeqNum_(0),
+ highSeqNum_(0),
+ reconnectSeqNum_(0),
+ seqNum_(1),
waitForWatchdogExit_(false)
,checkSeqNum_(false)
,validateNodeDown_(false)
@@ -511,6 +551,9 @@ CCluster::CCluster (void)
method_name, __LINE__,
(checkSeqNum_ ? "enabled" : "disabled"));
+ CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
+ configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
+
// Compute minimum "sync cycles" per second. The minimum is 1/10
// the expected number, assuming "next_test_delay" cycles per second (where
// next_test_delay is in microseconds).
@@ -521,8 +564,6 @@ CCluster::CCluster (void)
agMinElapsed_.tv_sec = 10000;
agMinElapsed_.tv_nsec = 0;
- tmSyncBuffer_ = Nodes->GetSyncBuffer();
-
// Allocate structures for monitor point-to-point communications
//
// The current approach is to allocate to a maximum number (MAX_NODES).
@@ -547,6 +588,7 @@ CCluster::CCluster (void)
{
comms_[i] = MPI_COMM_NULL;
socks_[i] = -1;
+ sockPorts_[i] = -1;
}
env = getenv("SQ_MON_NODE_DOWN_VALIDATION");
@@ -647,7 +689,7 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
{
trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
}
- if (nodestate[i].seq_num > 0)
+ if (nodestate[i].seq_num > 1)
{
if (seqNum == 0)
{
@@ -658,6 +700,10 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
assert(nodestate[i].seq_num == seqNum);
}
}
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
+ }
}
TRACE_EXIT;
@@ -839,7 +885,7 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state)
if ( Emulate_Down )
{
IAmIntegrated = false;
- AssignTmLeader(pnid);
+ AssignTmLeader(pnid, false);
}
TRACE_EXIT;
@@ -940,7 +986,7 @@ void CCluster::SoftNodeDown( int pnid )
}
IAmIntegrated = false;
- AssignTmLeader(pnid);
+ AssignTmLeader(pnid, false);
TRACE_EXIT;
}
@@ -1303,9 +1349,10 @@ int CCluster::HardNodeUp( int pnid, char *node_name )
, method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) );
for ( int i =0; i < Nodes->GetPNodesCount(); i++ )
{
- trace_printf( "%s@%d socks_[indexToPnid_[%d]=%d]=%d\n"
+ trace_printf( "%s@%d socks_[indexToPnid_[%d]=%d]=%d, sockPorts_[indexToPnid_[%d]=%d]=%d\n"
, method_name, __LINE__
- , i, indexToPnid_[i], socks_[indexToPnid_[i]] );
+ , i, indexToPnid_[i], socks_[indexToPnid_[i]]
+ , i, indexToPnid_[i], sockPorts_[indexToPnid_[i]] );
}
}
if ( MyNode->IsCreator() )
@@ -1616,8 +1663,9 @@ const char *SyncStateString( SyncState state)
}
-void CCluster::AddTmsyncMsg (struct sync_def *sync,
- struct internal_msg_def *msg)
+void CCluster::AddTmsyncMsg( struct sync_buffer_def *tmSyncBuffer
+ , struct sync_def *sync
+ , struct internal_msg_def *msg)
{
const char method_name[] = "CCluster::AddTmsyncMsg";
TRACE_ENTRY;
@@ -1647,12 +1695,12 @@ void CCluster::AddTmsyncMsg (struct sync_def *sync,
// Insert the message size into the message header
msg->replSize = msgSize;
- tmSyncBuffer_->msgInfo.msg_count = 1;
- tmSyncBuffer_->msgInfo.msg_offset += msgSize;
+ tmSyncBuffer->msgInfo.msg_count = 1;
+ tmSyncBuffer->msgInfo.msg_offset += msgSize;
// Set end-of-buffer marker
msg = (struct internal_msg_def *)
- &tmSyncBuffer_->msg[tmSyncBuffer_->msgInfo.msg_offset];
+ &tmSyncBuffer->msg[tmSyncBuffer->msgInfo.msg_offset];
msg->type = InternalType_Null;
TRACE_EXIT;
@@ -2787,7 +2835,6 @@ void CCluster::InitializeConfigCluster( void )
MPI_Comm_size (MPI_COMM_WORLD, &worldSize);
int rankToPnid[worldSize];
CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
- configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
CurNodes = worldSize;
@@ -2931,7 +2978,7 @@ void CCluster::InitializeConfigCluster( void )
node->SetCommPort( &commPortNums[ i * MPI_MAX_PORT_NAME] );
node->SetSyncPort( &syncPortNums[ i * MPI_MAX_PORT_NAME] );
rankToPnid[i] = node->GetPNid();
- nodeStatus[i] = true;
+ nodeStatus[rankToPnid[i]] = true;
if (trace_settings & TRACE_INIT)
{
@@ -2967,7 +3014,7 @@ void CCluster::InitializeConfigCluster( void )
// Any nodes not in the initial MPI_COMM_WORLD are down.
for (int i=0; i<GetConfigPNodesCount(); ++i)
{
- if ( nodeStatus[i] == false )
+ if ( nodeStatus[indexToPnid_[i]] == false )
{
if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
trace_printf( "%s@%d - nodeStatus[%d]=%d"
@@ -2982,7 +3029,7 @@ void CCluster::InitializeConfigCluster( void )
// assign new TmLeader if TMLeader node is dead.
if (TmLeaderPNid == indexToPnid_[i])
{
- AssignTmLeader(indexToPnid_[i]);
+ AssignTmLeader(indexToPnid_[i], false);
}
}
else
@@ -3327,6 +3374,132 @@ void CCluster::SendReIntegrateStatus( STATE nodeState, int initErr )
}
}
+bool CCluster::PingSockPeer(CNode *node)
+{
+ const char method_name[] = "CCluster::PingSockPeer";
+ TRACE_ENTRY;
+
+ bool rs = true;
+ int rc;
+ int pingSock = -1;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Pinging remote monitor %s, pnid=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid() );
+ }
+
+ // Attempt to connect with remote monitor
+ for (int i = 0; i < MAX_RECONN_PING_RETRY_COUNT; i++ )
+ {
+ pingSock = Monitor->Connect( node->GetCommPort() );
+ if ( pingSock < 0 )
+ {
+ sleep( MAX_RECONN_PING_WAIT_TIMEOUT );
+ }
+ else
+ {
+ break;
+ }
+ }
+ if ( pingSock < 0 )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Can't connect to remote monitor %s, pnid=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid() );
+ }
+ return(false);
+ }
+
+ nodeId_t nodeInfo;
+
+ nodeInfo.pnid = MyPNID;
+ strcpy(nodeInfo.nodeName, MyNode->GetName());
+ strcpy(nodeInfo.commPort, MyNode->GetCommPort());
+ strcpy(nodeInfo.syncPort, MyNode->GetSyncPort());
+ nodeInfo.ping = true;
+ nodeInfo.creatorPNid = -1;
+ nodeInfo.creator = false;
+ nodeInfo.creatorShellPid = -1;
+ nodeInfo.creatorShellVerifier = -1;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "Sending my nodeInfo.pnid=%d\n"
+ " nodeInfo.nodeName=%s\n"
+ " nodeInfo.commPort=%s\n"
+ " nodeInfo.syncPort=%s\n"
+ " nodeInfo.creatorPNid=%d\n"
+ " nodeInfo.creator=%d\n"
+ " nodeInfo.creatorShellPid=%d\n"
+ " nodeInfo.creatorShellVerifier=%d\n"
+ " nodeInfo.ping=%d\n"
+ , nodeInfo.pnid
+ , nodeInfo.nodeName
+ , nodeInfo.commPort
+ , nodeInfo.syncPort
+ , nodeInfo.creatorPNid
+ , nodeInfo.creator
+ , nodeInfo.creatorShellPid
+ , nodeInfo.creatorShellVerifier
+ , nodeInfo.ping );
+ }
+
+ rc = Monitor->SendSock( (char *) &nodeInfo
+ , sizeof(nodeId_t)
+ , pingSock );
+
+ if ( rc )
+ {
+ rs = false;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Cannot send ping node info to node %s: (%s)\n"
+ , method_name, node->GetName(), ErrorMsg(rc));
+ mon_log_write(MON_PINGSOCKPEER_1, SQ_LOG_ERR, buf);
+ }
+ else
+ {
+ // Get info about connecting monitor
+ rc = Monitor->ReceiveSock( (char *) &nodeInfo
+ , sizeof(nodeId_t)
+ , pingSock );
+ if ( rc )
+ { // Handle error
+ rs = false;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Cannot receive ping node info from node %s: (%s)\n"
+ , method_name, node->GetName(), ErrorMsg(rc));
+ mon_log_write(MON_PINGSOCKPEER_2, SQ_LOG_ERR, buf);
+ }
+ else
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "Received from nodeInfo.pnid=%d\n"
+ " nodeInfo.nodeName=%s\n"
+ " nodeInfo.commPort=%s\n"
+ " nodeInfo.syncPort=%s\n"
+ " nodeInfo.ping=%d\n"
+ , nodeInfo.pnid
+ , nodeInfo.nodeName
+ , nodeInfo.commPort
+ , nodeInfo.syncPort
+ , nodeInfo.ping );
+ }
+ }
+ }
+
+ close( pingSock );
+
+ TRACE_EXIT;
+ return( rs );
+}
+
void CCluster::ReIntegrate( int initProblem )
{
const char method_name[] = "CCluster::ReIntegrate";
@@ -3607,6 +3780,10 @@ void CCluster::ReIntegrateSock( int initProblem )
int rc;
int existingCommFd;
int existingSyncFd;
+ char commPort[MPI_MAX_PORT_NAME];
+ char syncPort[MPI_MAX_PORT_NAME];
+ char *pch1;
+ char *pch2;
// Set bit indicating my node is up
upNodes_.upNodes[MyPNID/MAX_NODE_BITMASK] |= (1ull << (MyPNID%MAX_NODE_BITMASK));
@@ -3642,12 +3819,6 @@ void CCluster::ReIntegrateSock( int initProblem )
TEST_POINT( TP011_NODE_UP );
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf("%s@%d Connected to creator monitor, sending my node info\n",
- method_name, __LINE__);
- }
-
// Send this node's name and port number so creator monitor
// knows who we are, and set flag to let creator monitor it is the CREATOR.
nodeId_t myNodeInfo;
@@ -3662,21 +3833,14 @@ void CCluster::ReIntegrateSock( int initProblem )
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d - Sending my node info to creator monitor\n"
- , method_name, __LINE__);
- trace_printf( "Node info for pnid=%d\n"
- " myNodeInfo.nodeName=%s\n"
- " myNodeInfo.commPort=%s\n"
- " myNodeInfo.syncPort=%s\n"
- " myNodeInfo.creatorPNid=%d\n"
- " myNodeInfo.creator=%d\n"
- " myNodeInfo.creatorShellPid=%d\n"
- " myNodeInfo.creatorShellVerifier=%d\n"
+ trace_printf( "%s@%d - Connected to creator monitor, sending my info, "
+ "node %d (%s), commPort=%s, syncPort=%s, creator=%d, "
+ "creatorShellPid=%d:%d\n"
+ , method_name, __LINE__
, myNodeInfo.pnid
, myNodeInfo.nodeName
, myNodeInfo.commPort
, myNodeInfo.syncPort
- , myNodeInfo.creatorPNid
, myNodeInfo.creator
, myNodeInfo.creatorShellPid
, myNodeInfo.creatorShellVerifier );
@@ -3774,10 +3938,33 @@ void CCluster::ReIntegrateSock( int initProblem )
otherMonRank_[nodeInfo[i].pnid] = 0;
++CurNodes;
- Node[nodeInfo[i].pnid]->SetCommPort( nodeInfo[i].commPort );
- Node[nodeInfo[i].pnid]->SetSyncPort( nodeInfo[i].syncPort );
+ // Store port numbers for the node
+ strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME);
+ strncpy(syncPort, nodeInfo[i].syncPort, MPI_MAX_PORT_NAME);
+
+ Node[nodeInfo[i].pnid]->SetCommPort( commPort );
+ pch1 = strtok (commPort,":");
+ pch1 = strtok (NULL,":");
+ Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+
+ Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
+ pch2 = strtok (syncPort,":");
+ pch2 = strtok (NULL,":");
+ Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+ sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
+
Node[nodeInfo[i].pnid]->SetState( State_Up );
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+ , method_name, __LINE__
+ , Node[nodeInfo[i].pnid]->GetPNid()
+ , Node[nodeInfo[i].pnid]->GetName()
+ , pch1, atoi(pch1)
+ , pch2, atoi(pch2) );
+ }
+
// Tell creator we are ready to accept its connection
int mypnid = MyPNID;
rc = Monitor->SendSock( (char *) &mypnid
@@ -3897,10 +4084,34 @@ void CCluster::ReIntegrateSock( int initProblem )
otherMonRank_[nodeInfo[i].pnid] = 0;
++CurNodes;
- Node[nodeInfo[i].pnid]->SetCommPort( nodeInfo[i].commPort );
- Node[nodeInfo[i].pnid]->SetSyncPort( nodeInfo[i].syncPort );
+
+ // Store port numbers for the node
+ strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME);
+ strncpy(syncPort, nodeInfo[i].syncPort, MPI_MAX_PORT_NAME);
+
+ Node[nodeInfo[i].pnid]->SetCommPort( commPort );
+ pch1 = strtok (commPort,":");
+ pch1 = strtok (NULL,":");
+ Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+
+ Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
+ pch2 = strtok (syncPort,":");
+ pch2 = strtok (NULL,":");
+ Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+ sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
+
Node[nodeInfo[i].pnid]->SetState( State_Up );
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+ , method_name, __LINE__
+ , Node[nodeInfo[i].pnid]->GetPNid()
+ , Node[nodeInfo[i].pnid]->GetName()
+ , pch1, atoi(pch1)
+ , pch2, atoi(pch2) );
+ }
+
// Connect to existing monitor
existingSyncFd = AcceptSyncSock();
if ( existingSyncFd < 0 )
@@ -3969,9 +4180,10 @@ void CCluster::ReIntegrateSock( int initProblem )
}
for ( int i =0; i < pnodeCount; i++ )
{
- trace_printf( "%s@%d socks_[%d]=%d\n"
+ trace_printf( "%s@%d socks_[%d]=%d, sockPorts_[%d]=%d\n"
, method_name, __LINE__
- , nodeInfo[i].pnid, socks_[nodeInfo[i].pnid]);
+ , nodeInfo[i].pnid, socks_[nodeInfo[i].pnid]
+ , nodeInfo[i].pnid, sockPorts_[nodeInfo[i].pnid]);
}
for ( int i =0; i < MAX_NODE_MASKS ; i++ )
{
@@ -4224,16 +4436,24 @@ void CCluster::setNewSock( int pnid )
--CurNodes;
}
- if (trace_settings & TRACE_RECOVERY)
- {
- trace_printf("%s@%d - setting new communicator for pnid %d, "
- "otherRank=%d\n",
- method_name, __LINE__, it->pnid, it->otherRank);
- }
-
+ CNode *node= Nodes->GetNode( it->pnid );
socks_[it->pnid] = it->socket;
+ sockPorts_[it->pnid] = node->GetSyncSocketPort();
otherMonRank_[it->pnid] = it->otherRank;
++CurNodes;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting new communicator for %d (%s), "
+ "socks_[%d]=%d, sockPorts_[%d]=%d, otherMonRank_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , it->pnid, socks_[it->pnid]
+ , it->pnid, sockPorts_[it->pnid]
+ , it->pnid, otherMonRank_[it->pnid] );
+ }
+
// Set bit indicating node is up
upNodes_.upNodes[it->pnid/MAX_NODE_BITMASK] |= (1ull << (it->pnid%MAX_NODE_BITMASK));
@@ -4419,20 +4639,9 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
const char method_name[] = "CCluster::AllgatherSock";
TRACE_ENTRY;
+ bool reconnecting = false;
static int hdrSize = Nodes->GetSyncHdrSize( );
int err = MPI_SUCCESS;
- typedef struct
- {
- int p_sent;
- int p_received;
- int p_n2recv;
- bool p_sending;
- bool p_receiving;
- int p_timeout_count;
- bool p_initial_check;
- char *p_buff;
- struct timespec znodeFailedTime;
- } peer_t;
peer_t p[GetConfigPNodesMax()];
memset( p, 0, sizeof(p) );
tag = 0; // make compiler happy
@@ -4442,12 +4651,12 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
? (ZClient->GetSessionTimeout() * 2) : 120;
int nsent = 0, nrecv = 0;
- for ( int iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
+ for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
{
- peer_t *peer = &p[iPeer];
- stats[iPeer].MPI_ERROR = MPI_SUCCESS;
- stats[iPeer].count = 0;
- if ( iPeer == MyPNID || socks_[iPeer] == -1 )
+ peer_t *peer = &p[indexToPnid_[iPeer]];
+ stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_SUCCESS;
+ stats[indexToPnid_[iPeer]].count = 0;
+ if ( indexToPnid_[iPeer] == MyPNID || socks_[indexToPnid_[iPeer]] == -1 )
{
peer->p_sending = peer->p_receiving = false;
nsent++;
@@ -4460,11 +4669,31 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
peer->p_timeout_count = 0;
peer->p_initial_check = true;
peer->p_n2recv = -1;
- peer->p_buff = ((char *) rbuf) + (iPeer * CommBufSize);
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[iPeer] * CommBufSize);
+
struct epoll_event event;
- event.data.fd = socks_[iPeer];
+ event.data.fd = socks_[indexToPnid_[iPeer]];
event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
- EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[iPeer], &event );
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[iPeer]], &event );
+ }
+ }
+
+ if (trace_settings & (TRACE_SYNC | TRACE_SYNC_DETAIL))
+ {
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ peer_t *peer = &p[indexToPnid_[i]];
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
}
}
@@ -4493,8 +4722,8 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
else
{
// default to 64 seconds
- sv_epoll_wait_timeout = 4000;
- sv_epoll_retry_count = 16;
+ sv_epoll_wait_timeout = 16000;
+ sv_epoll_retry_count = 4;
}
char buf[MON_STRING_BUF_SIZE];
@@ -4512,38 +4741,128 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
struct epoll_event events[2*GetConfigPNodesMax() + 1];
while ( 1 )
{
- int maxEvents = 2*GetConfigPNodesMax() - nsent - nrecv;
+reconnected:
+ int maxEvents = 2*GetConfigPNodesCount() - nsent - nrecv;
if ( maxEvents == 0 ) break;
int nw;
int zerr = ZOK;
+
while ( 1 )
{
nw = epoll_wait( epollFD_, events, maxEvents, sv_epoll_wait_timeout );
if ( nw >= 0 || errno != EINTR ) break;
}
+
if ( nw == 0 )
- {
- for ( int iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
+ { // Timeout, no fd's ready
+ for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
{
- peer_t *peer = &p[iPeer];
- if ( (iPeer != MyPNID) &&
- (socks_[iPeer] != -1) )
+ peer_t *peer = &p[indexToPnid_[iPeer]];
+ if ( (indexToPnid_[iPeer] != MyPNID) && (socks_[indexToPnid_[iPeer]] != -1) )
{
- if ( (peer->p_receiving) ||
- (peer->p_sending) )
+ if ( (peer->p_receiving) || (peer->p_sending) )
{
if ( ! ZClientEnabled )
{
- peer->p_timeout_count++;
+ if (peer->p_initial_check && !reconnecting)
+ {
+ peer->p_initial_check = false;
+ clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
+ peer->znodeFailedTime.tv_sec += sessionTimeout;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n"
+ , method_name, __LINE__
+ , peer->znodeFailedTime.tv_sec);
+ }
+ }
if ( peer->p_timeout_count < sv_epoll_retry_count )
{
+ peer->p_timeout_count++;
+
+ if (IsRealCluster)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+ " timeout count=%d,"
+ " sending=%d,"
+ " receiving=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), iPeer
+ , peer->p_timeout_count
+ , peer->p_sending
+ , peer->p_receiving);
+ }
+ // Attempt reconnect to all peers
+ err = AllgatherSockReconnect( stats );
+ // Redrive IOs on live peer connections
+ nsent = 0; nrecv = 0;
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ peer_t *peer = &p[indexToPnid_[i]];
+ if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+ {
+ peer->p_sending = peer->p_receiving = false;
+ nsent++;
+ nrecv++;
+ }
+ else
+ {
+ peer->p_sending = peer->p_receiving = true;
+ peer->p_sent = peer->p_received = 0;
+ peer->p_timeout_count = 0;
+ peer->p_n2recv = -1;
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+
+ struct epoll_event event;
+ event.data.fd = socks_[indexToPnid_[i]];
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event );
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
+ }
+ reconnectSeqNum_ = seqNum_;
+ reconnecting = true;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Reconnecting!\n"
+ , method_name, __LINE__ );
+ }
+ goto reconnected;
+ }
continue;
}
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d - Peer timeout triggered: "
+ "peer->p_timeout_count %d, "
+ "sv_epoll_retry_count %d\n"
+ " socks_[%d]=%d\n"
+ " stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , peer->p_timeout_count
+ , sv_epoll_retry_count
+ , indexToPnid_[iPeer]
+ , socks_[indexToPnid_[iPeer]]
+ , iPeer
+ , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
+ }
}
else
{
- if (peer->p_initial_check)
+ if (peer->p_initial_check && !reconnecting)
{
peer->p_initial_check = false;
clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
@@ -4557,7 +4876,7 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
}
// If not expired, stay in the loop
- if ( ! ZClient->IsZNodeExpired( Node[iPeer]->GetName(), zerr ))
+ if ( ! ZClient->IsZNodeExpired( Node[indexToPnid_[iPeer]]->GetName(), zerr ))
{
if ( zerr == ZCONNECTIONLOSS || zerr == ZOPERATIONTIMEOUT )
{
@@ -4572,10 +4891,71 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
if ( peer->p_timeout_count < sv_epoll_retry_count )
{
+ if (IsRealCluster)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+ " timeout count=%d,"
+ " sending=%d,"
+ " receiving=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_timeout_count
+ , peer->p_sending
+ , peer->p_receiving);
+ }
+ // Attempt reconnect to all peers
+ err = AllgatherSockReconnect( stats );
+ // Redrive IOs on live peer connections
+ nsent = 0; nrecv = 0;
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ peer_t *peer = &p[indexToPnid_[i]];
+ if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+ {
+ peer->p_sending = peer->p_receiving = false;
+ nsent++;
+ nrecv++;
+ }
+ else
+ {
+ peer->p_sending = peer->p_receiving = true;
+ peer->p_sent = peer->p_received = 0;
+ peer->p_timeout_count = 0;
+ peer->p_n2recv = -1;
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+
+ struct epoll_event event;
+ event.data.fd = socks_[indexToPnid_[i]];
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
+ }
+ reconnectSeqNum_ = seqNum_;
+ reconnecting = true;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Reconnecting!\n"
+ , method_name, __LINE__ );
+ }
+ goto reconnected;
+ }
continue;
}
}
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ if (trace_settings & TRACE_RECOVERY)
{
trace_printf( "%s@%d - Znode Failed triggered\n"
" Current Time %ld(secs)\n"
@@ -4592,58 +4972,99 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
if ( peer->p_timeout_count < sv_epoll_retry_count )
{
+ if (IsRealCluster)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+ " timeout count=%d,"
+ " sending=%d,"
+ " receiving=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_timeout_count
+ , peer->p_sending
+ , peer->p_receiving);
+ }
+ // Attempt reconnect to all peers
+ err = AllgatherSockReconnect( stats );
+ // Redrive IOs on live peer connections
+ nsent = 0; nrecv = 0;
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ peer_t *peer = &p[indexToPnid_[i]];
+ if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+ {
+ peer->p_sending = peer->p_receiving = false;
+ nsent++;
+ nrecv++;
+ }
+ else
+ {
+ peer->p_sending = peer->p_receiving = true;
+ peer->p_sent = peer->p_received = 0;
+ peer->p_timeout_count = 0;
+ peer->p_n2recv = -1;
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+
+ struct epoll_event event;
+ event.data.fd = socks_[indexToPnid_[i]];
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
+ }
+ reconnectSeqNum_ = seqNum_;
+ reconnecting = true;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Reconnecting!\n"
+ , method_name, __LINE__ );
+ }
+ goto reconnected;
+ }
continue;
}
}
}
}
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s@%d] Not heard from peer=%d (node=%s) "
- "(current seq # is %lld)\n"
- , method_name
- , __LINE__
- , iPeer
- , Node[iPeer]->GetName()
- , seqNum_ );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_1, SQ_LOG_CRIT, buf );
-
- stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
- err = MPI_ERR_IN_STATUS;
- if ( peer->p_sending )
- {
- peer->p_sending = false;
- nsent++;
- }
- if ( peer->p_receiving )
+ if (trace_settings & TRACE_RECOVERY)
{
- peer->p_receiving = false;
- nrecv++;
+ trace_printf( "%s@%d - err=%d, socks_[%d]=%d, stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , err
+ , indexToPnid_[iPeer]
+ , socks_[indexToPnid_[iPeer]]
+ , indexToPnid_[iPeer]
+ , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
}
- // setup the epoll structures
- struct epoll_event event;
- event.data.fd = socks_[iPeer];
- int op = 0;
- if ( !peer->p_sending && !peer->p_receiving )
- {
- op = EPOLL_CTL_DEL;
- event.events = 0;
- }
- else if ( peer->p_sending )
- {
- op = EPOLL_CTL_MOD;
- event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
- }
- else if ( peer->p_receiving )
+ if ( err == MPI_ERR_IN_STATUS
+ && stats[indexToPnid_[iPeer]].MPI_ERROR == MPI_ERR_EXITED)
{
- op = EPOLL_CTL_MOD;
- event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
- }
- if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD )
- {
- EpollCtl( epollFD_, op, socks_[iPeer], &event );
+ // At this point, this peer is not responding and
+ // reconnects failed or its znode expired
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] Not heard from peer=%d (node=%s) "
+ "(current seq # is %lld)\n"
+ , method_name
+ , __LINE__
+ , indexToPnid_[iPeer]
+ , Node[indexToPnid_[iPeer]]->GetName()
+ , seqNum_ );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
}
}
}
@@ -4651,35 +5072,43 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
}
if ( nw < 0 )
- {
+ { // Got an error
char ebuff[256];
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf), "[%s@%d] epoll_wait(%d, %d) error: %s\n",
method_name, __LINE__, epollFD_, maxEvents,
strerror_r( errno, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
MPI_Abort( MPI_COMM_SELF,99 );
}
+
+ // Process fd's which are ready to initiate an IO or completed IO
for ( int iEvent = 0; iEvent < nw; iEvent++ )
{
bool stateChange = false;
int fd = events[iEvent].data.fd;
int iPeer;
- for ( iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
- {
- if ( events[iEvent].data.fd == socks_[iPeer] ) break;
+ for ( iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
+ { // Find corresponding peer by matching socket fd
+ if ( events[iEvent].data.fd == socks_[indexToPnid_[iPeer]] ) break;
}
- if ( iPeer < 0 || iPeer >= GetConfigPNodesMax() || iPeer == MyPNID
- || socks_[iPeer] == -1
- || (!p[iPeer].p_sending && !p[iPeer].p_receiving) )
+ if ( indexToPnid_[iPeer] < 0 || indexToPnid_[iPeer] >= GetConfigPNodesMax() || indexToPnid_[iPeer] == MyPNID
+ || socks_[indexToPnid_[iPeer]] == -1
+ || (!p[indexToPnid_[iPeer]].p_sending && !p[indexToPnid_[iPeer]].p_receiving) )
{
char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s@%d] invalid peer %d\n",
- method_name, __LINE__, iPeer );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] Invalid peer %d, "
+ "peer.p_sending=%d, "
+ "peer.p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[iPeer]
+ , indexToPnid_[iPeer] >= GetConfigPNodesMax()?-1:p[indexToPnid_[iPeer]].p_sending
+ , indexToPnid_[iPeer] >= GetConfigPNodesMax()?-1:p[indexToPnid_[iPeer]].p_receiving );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf );
MPI_Abort( MPI_COMM_SELF,99 );
}
- peer_t *peer = &p[iPeer];
+ peer_t *peer = &p[indexToPnid_[iPeer]];
if ( (events[iEvent].events & EPOLLERR) ||
(events[iEvent].events & EPOLLHUP) ||
( !(events[iEvent].events & (EPOLLIN|EPOLLOUT))) )
@@ -4690,13 +5119,13 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
snprintf( buf, sizeof(buf)
, "[%s@%d] Error: peer=%d, events[%d].data.fd=%d, event[%d]=%s\n"
, method_name, __LINE__
- , iPeer
+ , indexToPnid_[iPeer]
, iEvent
, events[iEvent].data.fd
, iEvent
, EpollEventString(events[iEvent].events) );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf );
- stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf );
+ stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
err = MPI_ERR_IN_STATUS;
if ( peer->p_sending )
{
@@ -4708,12 +5137,11 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
peer->p_receiving = false;
nrecv++;
}
- stateChange = 1;
+ stateChange = true;
goto early_exit;
}
- if ( peer->p_receiving
- && events[iEvent].events & EPOLLIN )
- {
+ if ( peer->p_receiving && events[iEvent].events & EPOLLIN )
+ { // Got receive (read) completion
int eagain_ok = 0;
read_again:
char *r = &peer->p_buff[peer->p_received];
@@ -4729,6 +5157,26 @@ read_again:
int nr;
while ( 1 )
{
+ if (trace_settings & TRACE_SYNC_DETAIL)
+ {
+ trace_printf( "%s@%d - EPOLLIN from %s(%d),"
+ " sending=%d,"
+ " receiving=%d (%d)"
+ " sent=%d,"
+ " received=%d"
+ " timeout_count=%d,"
+ " initial_check=%d,"
+ " n2recv=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_sending
+ , peer->p_receiving, n2get
+ , peer->p_sent
+ , peer->p_received
+ , peer->p_timeout_count
+ , peer->p_initial_check
+ , peer->p_n2recv );
+ }
nr = recv( fd, r, n2get, 0 );
if ( nr >= 0 || errno == EINTR ) break;
}
@@ -4746,8 +5194,8 @@ read_again:
snprintf( buf, sizeof(buf)
, "[%s@%d] recv[%d](%d) error %d (%s)\n"
, method_name, __LINE__
- , iPeer, nr , err, strerror(err) );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf );
+ , indexToPnid_[iPeer], nr , err, strerror(err) );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf );
peer->p_receiving = false;
nrecv++;
if ( peer->p_sending )
@@ -4755,7 +5203,7 @@ read_again:
peer->p_sending = false;
nsent++;
}
- stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+ stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
err = MPI_ERR_IN_STATUS;
stateChange = true;
}
@@ -4792,7 +5240,7 @@ read_again:
snprintf( buf, sizeof(buf),
"[%s@%d] error n2recv %d\n",
method_name, __LINE__, peer->p_n2recv );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_7, SQ_LOG_CRIT, buf );
MPI_Abort( MPI_COMM_SELF,99 );
}
if ( peer->p_n2recv == 0 )
@@ -4800,20 +5248,59 @@ read_again:
// this buffer is done
peer->p_receiving = false;
nrecv++;
- stats[iPeer].count = peer->p_received;
+ stats[indexToPnid_[iPeer]].count = peer->p_received;
+ if (trace_settings & TRACE_SYNC_DETAIL)
+ {
+ trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+ " sending=%d,"
+ " receiving=%d (%d)"
+ " sent=%d,"
+ " received=%d"
+ " timeout_count=%d,"
+ " initial_check=%d,"
+ " n2recv=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_sending
+ , peer->p_receiving, n2get
+ , peer->p_sent
+ , peer->p_received
+ , peer->p_timeout_count
+ , peer->p_initial_check
+ , peer->p_n2recv );
+ }
stateChange = true;
}
}
}
}
- if ( peer->p_sending
- && events[iEvent].events & EPOLLOUT )
- {
+ if ( peer->p_sending && events[iEvent].events & EPOLLOUT )
+ { // Got send (write) completion
char *s = &((char *)sbuf)[peer->p_sent];
int n2send = nbytes - peer->p_sent;
int ns;
while ( 1 )
{
+ if (trace_settings & TRACE_SYNC_DETAIL)
+ {
+ trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+ " sending=%d (%d),"
+ " receiving=%d"
+ " sent=%d,"
+ " received=%d"
+ " timeout_count=%d,"
+ " initial_check=%d,"
+ " n2recv=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_sending, n2send
+ , peer->p_receiving
+ , peer->p_sent
+ , peer->p_received
+ , peer->p_timeout_count
+ , peer->p_initial_check
+ , peer->p_n2recv );
+ }
ns = send( fd, s, n2send, 0 );
if ( ns >= 0 || errno != EINTR ) break;
}
@@ -4825,8 +5312,8 @@ read_again:
snprintf( buf, sizeof(buf)
, "[%s@%d] send[%d](%d) error=%d (%s)\n"
, method_name, __LINE__
- , iPeer, ns, err, strerror(err) );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_7, SQ_LOG_CRIT, buf );
+ , indexToPnid_[iPeer], ns, err, strerror(err) );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_8, SQ_LOG_CRIT, buf );
peer->p_sending = false;
nsent++;
if ( peer->p_receiving )
@@ -4834,7 +5321,7 @@ read_again:
peer->p_receiving = false;
nrecv++;
}
- stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+ stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
err = MPI_ERR_IN_STATUS;
stateChange = true;
}
@@ -4846,6 +5333,26 @@ read_again:
// finished sending to this destination
peer->p_sending = false;
nsent++;
+ if (trace_settings & TRACE_SYNC_DETAIL)
+ {
+ trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+ " sending=%d (%d),"
+ " receiving=%d"
+ " sent=%d,"
+ " received=%d"
+ " timeout_count=%d,"
+ " initial_check=%d,"
+ " n2recv=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_sending, n2send
+ , peer->p_receiving
+ , peer->p_sent
+ , peer->p_received
+ , peer->p_timeout_count
+ , peer->p_initial_check
+ , peer->p_n2recv );
+ }
stateChange = true;
}
}
@@ -4854,7 +5361,7 @@ early_exit:
if ( stateChange )
{
struct epoll_event event;
- event.data.fd = socks_[iPeer];
+ event.data.fd = socks_[indexToPnid_[iPeer]];
int op = 0;
if ( !peer->p_sending && !peer->p_receiving )
{
@@ -4888,49 +5395,465 @@ early_exit:
return err;
}
-// When we get a communication error for a point-to-point monitor communicator
-// verify that the other nodes in the cluster also lost communications
-// with that monitor. If all nodes concur we consider that monitor
-// down.
-void CCluster::ValidateClusterState( cluster_state_def_t nodestate[],
- bool haveDivergence)
+int CCluster::AllgatherSockReconnect( MPI_Status *stats )
{
- const char method_name[] = "CCluster::ValidateClusterState";
-
- exitedMons_t::iterator it;
- upNodes_t nodeMask;
+ const char method_name[] = "CCluster::AllgatherSockReconnect";
+ TRACE_ENTRY;
- for ( int i =0; i < MAX_NODE_MASKS ; i++ )
- {
- nodeMask.upNodes[i] = 0;
- }
+ int err = MPI_SUCCESS;
+ int idst;
+ int reconnectSock = -1;
+ CNode *node;
- for ( it = exitedMons_.begin(); it != exitedMons_.end(); )
+ // Loop on each node in the cluster
+ for ( int i = 0; i < GetConfigPNodesMax(); i++ )
{
- if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT))
- {
- trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld"
- " (current seqNum_=%lld)\n", method_name, __LINE__,
- it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_);
- }
-
- if ( seqNum_ >= (it->seqNum + 2) )
+ // Loop on each adjacent node in the cluster
+ for ( int j = i+1; j < GetConfigPNodesMax(); j++ )
{
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, "
- "detected by node %d at seq #%lld "
- "(current seq # is %lld).\n",
- method_name, it->exitedPnid, it->detectingPnid,
- it->seqNum, seqNum_);
- mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf);
-
- int concurringNodes = 0;
+ if ( i == MyPNID )
+ { // Current [i] node is my node, so connect to [j] node
- // Check if all active nodes see the node as down.
- nodeMask.upNodes[it->exitedPnid/MAX_NODE_BITMASK] = 1ull << (it->exitedPnid%MAX_NODE_BITMASK);
- string setSeesUp;
- string setSeesDown;
- char nodeX[10];
+ idst = j;
+ node = Nodes->GetNode( idst );
+ if (!node) continue;
+ if (node->GetState() != State_Up)
+ {
+ if (socks_[idst] != -1)
+ { // Peer socket is still active
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
+ }
+ continue;
+ }
+ reconnectSock = ConnectSockPeer( node, idst );
+ if (reconnectSock == -1)
+ {
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
+ if (node->GetState() != State_Up)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
+ }
+ }
+ }
+ else if ( j == MyPNID )
+ { // Current [j] is my node, accept connection from peer [i] node
+
+ idst = i;
+ node = Nodes->GetNode( idst );
+ if (!node) continue;
+ if (node->GetState() != State_Up)
+ {
+ if (socks_[idst] != -1)
+ { // Peer socket is still active
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
+ }
+ continue;
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Pinging Node %s (%d) to see if it's up\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid() );
+ }
+ if (PingSockPeer(node))
+ {
+ reconnectSock = AcceptSockPeer( node, idst );
+ if (reconnectSock == -1)
+ {
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
+ if (node->GetState() != State_Up)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
+ }
+ }
+ }
+ else
+ {
+ if (socks_[idst] != -1)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ }
+ reconnectSock = -1;
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
+ }
+ }
+ else
+ {
+ idst = -1;
+ }
+ if ( idst >= 0
+ && reconnectSock != -1
+ && fcntl( socks_[idst], F_SETFL, O_NONBLOCK ) )
+ {
+ err = MPI_ERR_AMODE;
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] fcntl(NONBLOCK) error: %s\n",
+ method_name, __LINE__, strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCKRECONN_1, SQ_LOG_CRIT, buf );
+ }
+ }
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , indexToPnid_[i]
+ , ErrorMsg(stats[indexToPnid_[i]].MPI_ERROR) );
+ }
+ }
+ trace_printf( "%s@%d - Returning err=%d\n"
+ , method_name, __LINE__, err );
+ }
+
+ TRACE_EXIT;
+ return( err );
+}
+
+int CCluster::AcceptSockPeer( CNode *node, int peer )
+{
+ const char method_name[] = "CCluster::AcceptSockPeer";
+ TRACE_ENTRY;
+
+ int rc = MPI_SUCCESS;
+ int reconnectSock = -1;
+ unsigned char srcaddr[4];
+ struct hostent *he;
+
+ // Get my host structure via my node name
+ he = gethostbyname( MyNode->GetName() );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] gethostbyname(%s) error: %s\n"
+ , method_name, __LINE__
+ , MyNode->GetName()
+ , strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_1, SQ_LOG_CRIT, buf );
+ abort();
+ }
+ else
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d Accepting server socket: from %s(%d), src=%d.%d.%d.%d, port=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , (int)((unsigned char *)srcaddr)[0]
+ , (int)((unsigned char *)srcaddr)[1]
+ , (int)((unsigned char *)srcaddr)[2]
+ , (int)((unsigned char *)srcaddr)[3]
+ , MyNode->GetSyncSocketPort() );
+ }
+
+ // Accept connection from peer
+ reconnectSock = AcceptSock( syncSock_ );
+ if (reconnectSock != -1)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d Server %s(%d) accepted from client %s(%d), old socks_[%d]=%d, new socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , MyNode->GetName(), MyPNID
+ , node->GetName(), node->GetPNid()
+ , peer, socks_[peer]
+ , peer, reconnectSock);
+ }
+ }
+ else
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] AcceptSock(%d) failed!\n",
+ method_name, __LINE__, syncSock_ );
+ mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_2, SQ_LOG_ERR, buf );
+ rc = -1;
+ }
+
+ if (socks_[peer] != -1)
+ {
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[peer];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[peer], &event );
+ }
+ if (reconnectSock != -1)
+ {
+ socks_[peer] = reconnectSock;
+ }
+ }
+
+ TRACE_EXIT;
+ return rc;
+}
+
+int CCluster::ConnectSockPeer( CNode *node, int peer )
+{
+ const char method_name[] = "CCluster::ConnectSockPeer";
+ TRACE_ENTRY;
+
+ int rc = MPI_SUCCESS;
+ int reconnectSock = -1;
+ unsigned char srcaddr[4], dstaddr[4];
+ struct hostent *he;
+
+ // Get my host structure via my node name
+ he = gethostbyname( MyNode->GetName() );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] gethostbyname(%s) error: %s\n"
+ , method_name, __LINE__
+ , MyNode->GetName()
+ , strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_1, SQ_LOG_CRIT, buf );
+ abort();
+ }
+ else
+ {
+ // Initialize my source address structure
+ memcpy( srcaddr, he->h_addr, 4 );
+ // Get peer's host structure via its node name
+ he = gethostbyname( node->GetName() );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf),
+ "[%s@%d] gethostbyname(%s) error: %s\n",
+ method_name, __LINE__, node->GetName(),
+ strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_2, SQ_LOG_CRIT, buf );
+ abort();
+ }
+ // Initialize peer's destination address structure
+ memcpy( dstaddr, he->h_addr, 4 );
+
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d Creating client socket: src=%d.%d.%d.%d, "
+ "dst(%s)=%d.%d.%d.%d, dst port=%d\n"
+ , method_name, __LINE__
+ , (int)((unsigned char *)srcaddr)[0]
+ , (int)((unsigned char *)srcaddr)[1]
+ , (int)((unsigned char *)srcaddr)[2]
+ , (int)((unsigned char *)srcaddr)[3]
+ , node->GetName()
+ , (int)((unsigned char *)dstaddr)[0]
+ , (int)((unsigned char *)dstaddr)[1]
+ , (int)((unsigned char *)dstaddr)[2]
+ , (int)((unsigned char *)dstaddr)[3]
+ , sockPorts_[peer] );
+ }
+ // Connect to peer
+ reconnectSock = MkCltSock( srcaddr, dstaddr, sockPorts_[peer] );
+ if (reconnectSock != -1)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d Client %s(%d) connected to server %s(%d), old socks_[%d]=%d, new socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , MyNode->GetName(), MyPNID
+ , node->GetName(), node->GetPNid()
+ , peer, socks_[peer]
+ , peer, reconnectSock);
+ }
+ }
+ else
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] MkCltSock() src=%d.%d.%d.%d, "
+ "dst(%s)=%d.%d.%d.%d failed!\n"
+ , method_name, __LINE__
+ , (int)((unsigned char *)srcaddr)[0]
+ , (int)((unsigned char *)srcaddr)[1]
+ , (int)((unsigned char *)srcaddr)[2]
+ , (int)((unsigned char *)srcaddr)[3]
+ , node->GetName()
+ , (int)((unsigned char *)dstaddr)[0]
+ , (int)((unsigned char *)dstaddr)[1]
+ , (int)((unsigned char *)dstaddr)[2]
+ , (int)((unsigned char *)dstaddr)[3] );
+ mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_3, SQ_LOG_ERR, buf );
+ rc = -1;
+ }
+
+ if (socks_[peer] != -1)
+ {
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[peer];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[peer], &event );
+ }
+ if (reconnectSock != -1)
+ {
+ socks_[peer] = reconnectSock;
+ }
+ }
+
+ TRACE_EXIT;
+ return( rc );
+}
+
+// When we get a communication error for a point-to-point monitor communicator
+// verify that the other nodes in the cluster also lost communications
+// with that monitor. If all nodes concur we consider that monitor
+// down.
+void CCluster::ValidateClusterState( cluster_state_def_t nodestate[],
+ bool haveDivergence)
+{
+ const char method_name[] = "CCluster::ValidateClusterState";
+
+ exitedMons_t::iterator it;
+ upNodes_t nodeMask;
+
+ for ( int i =0; i < MAX_NODE_MASKS ; i++ )
+ {
+ nodeMask.upNodes[i] = 0;
+ }
+
+ for ( it = exitedMons_.begin(); it != exitedMons_.end(); )
+ {
+ if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT))
+ {
+ trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld"
+ " (current seqNum_=%lld)\n", method_name, __LINE__,
+ it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_);
+ }
+
+ if ( seqNum_ >= (it->seqNum + 2) )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, "
+ "detected by node %d at seq #%lld "
+ "(current seq # is %lld).\n",
+ method_name, it->exitedPnid, it->detectingPnid,
+ it->seqNum, seqNum_);
+ mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf);
+
+ int concurringNodes = 0;
+
+ // Check if all active nodes see the node as down.
+ nodeMask.upNodes[it->exitedPnid/MAX_NODE_BITMASK] = 1ull << (it->exitedPnid%MAX_NODE_BITMASK);
+ string setSeesUp;
+ string setSeesDown;
+ char nodeX[10];
// Evaluate each active (up) node in the cluster
int pnodesCount = 0;
@@ -5187,8 +6110,10 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
const char method_name[] = "CCluster::ValidateSeqNum";
unsigned long long seqNum;
- unsigned long long seqNumBucket[256];
- int seqNumCount[256];
+ unsigned long long loSeqNum = seqNum_;
+ unsigned long long hiSeqNum = seqNum_;
+ unsigned long long seqNumBucket[MAX_NODES];
+ int seqNumCount[MAX_NODES];
int maxBucket = 0;
bool found;
int mostCountsIndex;
@@ -5198,10 +6123,26 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
// Count occurrences of sequence numbers from other nodes
for (int pnid = 0; pnid < GetConfigPNodesMax(); pnid++)
{
+ CNode *node= Nodes->GetNode( pnid );
+ if (!node) continue;
+ if (node->GetState() != State_Up) continue;
+
seqNum = nodestate[pnid].seq_num;
+ if (trace_settings & TRACE_SYNC)
+ {
+ trace_printf( "%s@%d seqNum_=%lld, nodestate[%d].seq_num=%lld\n"
+ , method_name, __LINE__
+ , seqNum_
+ , pnid
+ , nodestate[pnid].seq_num );
+ }
+
if (seqNum != 0)
{
+ loSeqNum = (seqNum < loSeqNum) ? seqNum : loSeqNum;
+ hiSeqNum = (seqNum > hiSeqNum) ? seqNum : hiSeqNum;
+
found = false;
for (int i=0; i<maxBucket; ++i)
{
@@ -5239,14 +6180,22 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
}
}
+ lowSeqNum_ = loSeqNum;
+ highSeqNum_ = hiSeqNum;
+
if (trace_settings & TRACE_SYNC)
{
if ( seqNum_ != seqNumBucket[mostCountsIndex] )
{
- trace_printf("%s@%d Most common seq num=%lld (%d nodes), %d buckets"
- ", local seq num (%lld) did not match.\n",
- method_name, __LINE__, seqNumBucket[mostCountsIndex],
- seqNumCount[mostCountsIndex], maxBucket, seqNum_);
+ trace_printf( "%s@%d Most common seq num=%lld (%d nodes), "
+ "%d buckets, low=%lld, high=%lld, local seq num (%lld) did not match.\n"
+ , method_name, __LINE__
+ , seqNumBucket[mostCountsIndex]
+ , seqNumCount[mostCountsIndex]
+ , maxBucket
+ , lowSeqNum_
+ , highSeqNum_
+ , seqNum_ );
}
}
@@ -5268,7 +6217,7 @@ void CCluster::HandleDownNode( int pnid )
trace_printf("%s@%d - Added down node to list, pnid=%d, name=(%s)\n", method_name, __LINE__, downNode->GetPNid(), downNode->GetName());
// assign new TmLeader if TMLeader node is dead.
- AssignTmLeader(pnid);
+ AssignTmLeader(pnid, false);
// Build available list of spare nodes
CNode *spareNode;
@@ -5373,6 +6322,7 @@ void CCluster::UpdateClusterState( bool &doShutdown,
TRACE_ENTRY;
struct sync_buffer_def *recvBuf;
+ struct sync_buffer_def *sendBuf = Nodes->GetSyncBuffer();
STATE node_state;
int change_nid;
cluster_state_def_t nodestate[GetConfigPNodesMax()];
@@ -5430,13 +6380,14 @@ void CCluster::UpdateClusterState( bool &doShutdown,
recvBuf = (struct sync_buffer_def *)
(((char *) syncBuf) + index * CommBufSize);
- if (trace_settings & TRACE_SYNC_DETAIL)
+ if (trace_settings & TRACE_SYNC)
{
int nr;
MPI_Get_count(&status[index], MPI_CHAR, &nr);
trace_printf("%s@%d - Received %d bytes from node %d, "
- "message count=%d\n",
+ ", seq_num=%lld, message count=%d\n",
method_name, __LINE__, nr, index,
+ recvBuf->nodeInfo.seq_num,
recvBuf->msgInfo.msg_count);
}
@@ -5532,23 +6483,23 @@ void CCluster::UpdateClusterState( bool &doShutdown,
Node[index]->SetState( State_Down );
--CurNodes;
// Clear bit in set of "up nodes"
- upNodes_.upNodes[index/MAX_NODE_BITMASK] &=
- ~(1ull << (index%MAX_NODE_BITMASK));
+ upNodes_.upNodes[index/MAX_NODE_BITMASK] &= ~(1ull << (index%MAX_NODE_BITMASK));
}
}
- if ( checkSeqNum_ && !ValidateSeqNum( nodestate ) && !enqueuedDown_ )
+ if ( (checkSeqNum_ || reconnectSeqNum_ != 0)
+ && !ValidateSeqNum( nodestate )
+ && !enqueuedDown_ )
{
- char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "[%s] Sync cycle sequence number (%lld) "
- "incorrect. Scheduling node down.\n", method_name, seqNum_);
- mon_log_write(MON_CLU
<TRUNCATED>