You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2017/09/19 02:54:38 UTC
[1/4] incubator-trafodion git commit: [TRAFODION-2651] Fixed
monitor-to-monitor communication by adding reconnect logic to better handle
various communication glitches which previously resulted in false node down
situations.
Repository: incubator-trafodion
Updated Branches:
refs/heads/master e2789f97c -> aeb9ef223
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/cluster.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h
index c7d097b..e743341 100644
--- a/core/sqf/monitor/linux/cluster.h
+++ b/core/sqf/monitor/linux/cluster.h
@@ -57,6 +57,19 @@ enum MonXChngTags
MON_XCHNG_DATA
};
+typedef struct
+{
+ int p_sent;
+ int p_received;
+ int p_n2recv;
+ bool p_sending;
+ bool p_receiving;
+ int p_timeout_count;
+ bool p_initial_check;
+ char *p_buff;
+ struct timespec znodeFailedTime;
+} peer_t;
+
class CNode;
class CLNode;
@@ -100,7 +113,7 @@ public:
#ifndef USE_BARRIER
void ArmWakeUpSignal (void);
#endif
- void AssignTmLeader( int pnid );
+ void AssignTmLeader( int pnid, bool checkProcess );
void stats();
void CompleteSyncCycle()
{ syncCycle_.lock(); syncCycle_.wait(); syncCycle_.unlock(); }
@@ -118,7 +131,7 @@ public:
void addNewSock(int nid, int otherRank, int sockFd );
bool exchangeNodeData ( );
- void exchangeTmSyncData ( struct sync_def *sync );
+ void exchangeTmSyncData ( struct sync_def *sync, bool bumpSync );
int GetConfigPNodesCount() { return configPNodesCount_; }
int GetConfigPNodesMax() { return configPNodesMax_; }
bool ImAlive( bool needed=false, struct sync_def *sync = NULL );
@@ -191,12 +204,12 @@ protected:
CNode **Node; // array of nodes
CLNode **LNode; // array of logical nodes
-
int TmSyncPNid; // Physical Node ID of current TmSync operations master
- void AddTmsyncMsg (struct sync_def *sync,
- struct internal_msg_def *msg);
+ void AddTmsyncMsg( struct sync_buffer_def *tmSyncBuffer
+ , struct sync_def *sync
+ , struct internal_msg_def *msg);
void AddReplData (struct internal_msg_def *msg);
void AddMyNodeState ();
void TraceTMSyncState(struct sync_buffer_def *recv_buffer,
@@ -252,6 +265,10 @@ private:
int integratingPNid_; // pnid of node when re-integration in progress
MPI_Comm joinComm_; // new to creator communicator (1-to-1)
int joinSock_; // new to creator socket used at join phase
+ unsigned long long lastSeqNum_;
+ unsigned long long lowSeqNum_;
+ unsigned long long highSeqNum_;
+ unsigned long long reconnectSeqNum_;
unsigned long long seqNum_;
int cumulativeDelaySec_;
@@ -281,10 +298,6 @@ private:
bool agTimeStats(struct timespec & ts_begin,
struct timespec & ts_end);
-
-
-
- struct sync_buffer_def *tmSyncBuffer_;
// Size of send and receive buffers used for communication between monitors
enum { CommBufSize = MAX_SYNC_SIZE };
@@ -323,6 +336,9 @@ private:
int Allgather(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
int AllgatherIB(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
int AllgatherSock(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
+ int AllgatherSockReconnect( MPI_Status *stats );
+ int AcceptSockPeer( CNode *node, int peer );
+ int ConnectSockPeer( CNode *node, int peer );
void ValidateClusterState( cluster_state_def_t nodestate[],
bool haveDivergence );
@@ -331,6 +347,7 @@ private:
void HandleReintegrateError( int rc, int err,
int nid, nodeId_t *nodeInfo,
bool abort );
+ bool PingSockPeer(CNode *node);
void ReIntegrateMPI( int initProblem );
void ReIntegrateSock( int initProblem );
void SendReIntegrateStatus( STATE nodeState, int status );
@@ -357,8 +374,9 @@ private:
void InitClusterSocks( int worldSize, int myRank, char *nodeNames,int *rankToPnid );
void InitServerSock( void );
- int AcceptSock( int sock );
+ int AcceptSock( int sock );
void EpollCtl( int efd, int op, int fd, struct epoll_event *event );
+ void EpollCtlDelete( int efd, int fd, struct epoll_event *event );
int MkSrvSock( int *pport );
int MkCltSock( unsigned char srcip[4], unsigned char dstip[4], int port );
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/commaccept.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/commaccept.cxx b/core/sqf/monitor/linux/commaccept.cxx
index 9867e11..a9045af 100644
--- a/core/sqf/monitor/linux/commaccept.cxx
+++ b/core/sqf/monitor/linux/commaccept.cxx
@@ -195,6 +195,20 @@ bool CCommAccept::sendNodeInfoSock( int sockFd )
sizeof(nodeInfo[i].syncPort));
nodeInfo[i].pnid = node->GetPNid();
nodeInfo[i].creatorPNid = (nodeInfo[i].pnid == MyPNID) ? MyPNID : -1;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node info for pnid=%d (%s)\n"
+ " CommPort=%s\n"
+ " SyncPort=%s\n"
+ " creatorPNid=%d\n"
+ , method_name, __LINE__
+ , nodeInfo[i].pnid
+ , nodeInfo[i].nodeName
+ , nodeInfo[i].commPort
+ , nodeInfo[i].syncPort
+ , nodeInfo[i].creatorPNid );
+ }
}
else
{
@@ -499,6 +513,7 @@ void CCommAccept::processNewSock( int joinFd )
int rc;
int integratingFd;
nodeId_t nodeId;
+ CNode *node;
mem_log_write(CMonLog::MON_CONNTONEWMON_2);
@@ -516,25 +531,17 @@ void CCommAccept::processNewSock( int joinFd )
return;
}
- if ( nodeId.creator )
- {
- // Indicate that this node is the creator monitor for the node up
- // operation.
- MyNode->SetCreator( true
- , nodeId.creatorShellPid
- , nodeId.creatorShellVerifier );
- }
-
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - Accepted connection from pnid=%d\n"
- " myNodeInfo.nodeName=%s\n"
- " myNodeInfo.commPort=%s\n"
- " myNodeInfo.syncPort=%s\n"
- " myNodeInfo.creatorPNid=%d\n"
- " myNodeInfo.creator=%d\n"
- " myNodeInfo.creatorShellPid=%d\n"
- " myNodeInfo.creatorShellVerifier=%d\n"
+ " nodeId.nodeName=%s\n"
+ " nodeId.commPort=%s\n"
+ " nodeId.syncPort=%s\n"
+ " nodeId.creatorPNid=%d\n"
+ " nodeId.creator=%d\n"
+ " nodeId.creatorShellPid=%d\n"
+ " nodeId.creatorShellVerifier=%d\n"
+ " nodeId.ping=%d\n"
, method_name, __LINE__
, nodeId.pnid
, nodeId.nodeName
@@ -543,16 +550,93 @@ void CCommAccept::processNewSock( int joinFd )
, nodeId.creatorPNid
, nodeId.creator
, nodeId.creatorShellPid
- , nodeId.creatorShellVerifier );
+ , nodeId.creatorShellVerifier
+ , nodeId.ping );
}
- CNode *node= Nodes->GetNode( nodeId.nodeName );
+ node= Nodes->GetNode( nodeId.nodeName );
+
+ if ( nodeId.ping )
+ {
+ // Reply with my node info
+ nodeId.pnid = MyPNID;
+ strcpy(nodeId.nodeName, MyNode->GetName());
+ strcpy(nodeId.commPort, MyNode->GetCommPort());
+ strcpy(nodeId.syncPort, MyNode->GetSyncPort());
+ nodeId.ping = true;
+ nodeId.creatorPNid = -1;
+ nodeId.creator = false;
+ nodeId.creatorShellPid = -1;
+ nodeId.creatorShellVerifier = -1;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "Sending my nodeInfo.pnid=%d\n"
+ " nodeInfo.nodeName=%s\n"
+ " nodeInfo.commPort=%s\n"
+ " nodeInfo.syncPort=%s\n"
+ " nodeInfo.ping=%d\n"
+ , nodeId.pnid
+ , nodeId.nodeName
+ , nodeId.commPort
+ , nodeId.syncPort
+ , nodeId.ping );
+ }
+
+ rc = Monitor->SendSock( (char *) &nodeId
+ , sizeof(nodeId_t)
+ , joinFd );
+ if ( rc )
+ {
+ close( joinFd );
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Cannot send ping node info to node %s: (%s)\n"
+ , method_name, node?node->GetName():"", ErrorMsg(rc));
+ mon_log_write(MON_COMMACCEPT_19, SQ_LOG_ERR, buf);
+ }
+ return;
+ }
+
+ if ( nodeId.creator )
+ {
+ // Indicate that this node is the creator monitor for the node up
+ // operation.
+ MyNode->SetCreator( true
+ , nodeId.creatorShellPid
+ , nodeId.creatorShellVerifier );
+ }
+
int pnid = -1;
if ( node != NULL )
- { // Store port number for the node
+ { // Store port numbers for the node
+ char commPort[MPI_MAX_PORT_NAME];
+ char syncPort[MPI_MAX_PORT_NAME];
+ strncpy(commPort, nodeId.commPort, MPI_MAX_PORT_NAME);
+ strncpy(syncPort, nodeId.syncPort, MPI_MAX_PORT_NAME);
+ char *pch1;
+ char *pch2;
pnid = nodeId.pnid;
- node->SetCommPort( nodeId.commPort );
- node->SetSyncPort( nodeId.syncPort );
+
+ node->SetCommPort( commPort );
+ pch1 = strtok (commPort,":");
+ pch1 = strtok (NULL,":");
+ node->SetCommSocketPort( atoi(pch1) );
+
+ node->SetSyncPort( syncPort );
+ pch2 = strtok (syncPort,":");
+ pch2 = strtok (NULL,":");
+ node->SetSyncSocketPort( atoi(pch2) );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , pch1, atoi(pch1)
+ , pch2, atoi(pch2) );
+ }
}
else
{
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/internal.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/internal.h b/core/sqf/monitor/linux/internal.h
index be13055..f69f424 100644
--- a/core/sqf/monitor/linux/internal.h
+++ b/core/sqf/monitor/linux/internal.h
@@ -443,6 +443,9 @@ typedef struct nodeId_s
int creatorShellPid;
Verifier_t creatorShellVerifier;
bool creator; // NEW monitor set to true to tell creator it is the CREATOR
+ bool ping; // Monitor sets to true to tell remote monitor
+ // it is just checking that it can communicate with it.
+ // Used during allgather reconnect
} nodeId_t;
typedef struct nodeStatus_s
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/msgdef.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/msgdef.h b/core/sqf/monitor/linux/msgdef.h
index 44445bf..ed0f8e8 100644
--- a/core/sqf/monitor/linux/msgdef.h
+++ b/core/sqf/monitor/linux/msgdef.h
@@ -88,6 +88,8 @@
#define MAX_PROCESS_NAME_STR 12
#define MAX_PROCESS_PATH 256
#define MAX_PROCESSOR_NAME 128
+#define MAX_RECONN_PING_WAIT_TIMEOUT 5
+#define MAX_RECONN_PING_RETRY_COUNT 3
#define MAX_REASON_TEXT 256
#define MAX_ROLEBUF_SIZE 84
#define MAX_SEARCH_PATH BUFSIZ
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/pnode.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx
index 19d36f6..57ce0a6 100644
--- a/core/sqf/monitor/linux/pnode.cxx
+++ b/core/sqf/monitor/linux/pnode.cxx
@@ -418,7 +418,14 @@ int CNode::AssignNid(void)
CLNode *lnode = AssignLNode();
TRACE_EXIT;
- return( lnode->Nid );
+ if (lnode)
+ {
+ return( lnode->Nid );
+ }
+ else
+ {
+ return -1;
+ }
}
CLNode *CNode::AssignLNode (void)
@@ -1416,6 +1423,7 @@ CNodeContainer::CNodeContainer( void )
,head_(NULL)
,tail_(NULL)
,syncBufferFreeSpace_(MAX_SYNC_SIZE)
+ ,lastSyncBuffer_(NULL)
,SyncBuffer(NULL)
{
const char method_name[] = "CNodeContainer::CNodeContainer";
@@ -1424,6 +1432,7 @@ CNodeContainer::CNodeContainer( void )
// Add eyecatcher sequence as a debugging aid
memcpy(&eyecatcher_, "NCTR", 4);
+ lastSyncBuffer_ = new struct sync_buffer_def;
SyncBuffer = new struct sync_buffer_def;
// Load cluster configuration
@@ -1454,6 +1463,10 @@ CNodeContainer::~CNodeContainer( void )
const char method_name[] = "CNodeContainer::~CNodeContainer";
TRACE_ENTRY;
+ if (lastSyncBuffer_)
+ {
+ delete lastSyncBuffer_;
+ }
if (SyncBuffer)
{
delete SyncBuffer;
@@ -2955,7 +2968,10 @@ struct internal_msg_def *CNodeContainer::PopMsg( struct sync_buffer_def *recvBuf
return msg;
}
-
+void CNodeContainer::SaveMyLastSyncBuffer( void )
+{
+ memcpy( (void*)lastSyncBuffer_, (void*)SyncBuffer, sizeof(sync_buffer_def) );
+}
bool CNodeContainer::SpaceAvail ( int msgSize )
{
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/pnode.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/pnode.h b/core/sqf/monitor/linux/pnode.h
index 97b02db..6d07f4b 100644
--- a/core/sqf/monitor/linux/pnode.h
+++ b/core/sqf/monitor/linux/pnode.h
@@ -122,6 +122,7 @@ public:
+ SyncBuffer->msgInfo.msg_offset; };
inline int GetSyncHdrSize() { return sizeof(cluster_state_def_t)
+ sizeof(msgInfo_t); };
+ struct sync_buffer_def * GetLastSyncBuffer() { return lastSyncBuffer_; };
struct sync_buffer_def * GetSyncBuffer() { return SyncBuffer; };
bool IsShutdownActive( void );
void KillAll( CProcess *process );
@@ -131,6 +132,7 @@ public:
struct internal_msg_def *PopMsg( struct sync_buffer_def *recvBuf );
bool SpaceAvail ( int msgSize );
void AddMsg (struct internal_msg_def *&msg, int msgSize );
+ void SaveMyLastSyncBuffer( void );
void SetClusterConfig( CClusterConfig *clusterConfig ) { clusterConfig_ = clusterConfig; }
void SetupCluster( CNode ***pnode_list, CLNode ***lnode_list, int **indexToPnid );
void RemoveFromSpareNodesList( CNode *node );
@@ -156,6 +158,7 @@ private:
CNode *tail_; // tail of physical nodes linked list
size_t syncBufferFreeSpace_;
+ struct sync_buffer_def *lastSyncBuffer_;
struct sync_buffer_def *SyncBuffer;
void AddLNodes( CNode *node );
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/reqtmleader.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/reqtmleader.cxx b/core/sqf/monitor/linux/reqtmleader.cxx
index 7783d5a..9cfe6ea 100644
--- a/core/sqf/monitor/linux/reqtmleader.cxx
+++ b/core/sqf/monitor/linux/reqtmleader.cxx
@@ -123,6 +123,25 @@ void CExtTmLeaderReq::performRequest()
process = Nodes->GetLNode(tmLeaderNid)->GetProcessLByType( ProcessType_DTM );
+ if (!process)
+ {
+ int pnid = Nodes->GetLNode(tmLeaderNid)->GetNode()->GetPNid();
+
+ // If there is a Regroup in progress in the sync thread,
+ // wait until it is completed.
+ if ( !Emulate_Down )
+ {
+ Monitor->EnterSyncCycle();
+ }
+ Monitor->AssignTmLeader( pnid, true );
+ tmLeaderNid = Monitor->GetTmLeader();
+ process = Nodes->GetLNode(tmLeaderNid)->GetProcessLByType( ProcessType_DTM );
+ if ( !Emulate_Down )
+ {
+ Monitor->ExitSyncCycle();
+ }
+ }
+
assert(process);
// populate the TM leader process info
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/shell.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/shell.cxx b/core/sqf/monitor/linux/shell.cxx
index c23c20c..7bbc6e3 100644
--- a/core/sqf/monitor/linux/shell.cxx
+++ b/core/sqf/monitor/linux/shell.cxx
@@ -98,6 +98,8 @@ bool MpiInitialized = false;
bool SpareNodeColdStandby = true;
bool ElasticityEnabled = true;
+int lastDeathNid = -1;
+int lastDeathPid = -1;
bool waitDeathPending = false;
int waitDeathNid;
int waitDeathPid;
@@ -993,6 +995,7 @@ void nodePendingComplete()
void waitDeathComplete()
{
waitDeathLock.lock();
+ waitDeathPending = false;
waitDeathLock.wakeOne();
waitDeathLock.unlock();
}
@@ -1094,7 +1097,6 @@ void recv_notice_msg(struct message_def *recv_msg, int )
{
if ( recv_msg->u.request.u.down.nid == waitDeathNid )
{ // Node is down so process is down too.
- waitDeathPending = false;
waitDeathComplete();
}
}
@@ -1127,7 +1129,6 @@ void recv_notice_msg(struct message_def *recv_msg, int )
{
if ( msg->u.request.u.quiesce.nid == waitDeathNid )
{ // Node is quiesced so process is down too.
- waitDeathPending = false;
waitDeathComplete();
}
}
@@ -1179,12 +1180,13 @@ void recv_notice_msg(struct message_def *recv_msg, int )
recv_msg->u.request.u.death.nid,
recv_msg->u.request.u.death.pid);
}
+ lastDeathNid = recv_msg->u.request.u.death.nid;
+ lastDeathPid = recv_msg->u.request.u.death.pid;
if ( waitDeathPending )
{
if ( recv_msg->u.request.u.death.nid == waitDeathNid
&& recv_msg->u.request.u.death.pid == waitDeathPid )
{
- waitDeathPending = false;
waitDeathComplete();
}
}
@@ -1604,7 +1606,6 @@ char *find_end_of_token (char *cmd, int maxlen, bool isEqDelim, bool isDashDelim
&& *ptr
&& *ptr != ' '
&& *ptr != ','
- && *ptr != ';'
&& *ptr != '{'
&& *ptr != '}'
&& *ptr != ':')
@@ -2706,6 +2707,23 @@ void get_server_death (int nid, int pid)
request_notice(nid, pid, transid);
}
+ if ( lastDeathNid == nid
+ && lastDeathPid == pid )
+ {
+ if ( trace_settings & TRACE_SHELL_CMD )
+ trace_printf("%s@%d [%s] death message already received from nid=%d, "
+ "pid=%d.\n", method_name, __LINE__, MyName, nid, pid);
+
+ lastDeathNid = -1;
+ lastDeathPid = -1;
+
+ if ( trace_settings & TRACE_SHELL_CMD )
+ trace_printf("%s@%d [%s] Exiting wait for process death\n",
+ method_name, __LINE__, MyName);
+
+ return;
+ }
+
if ( trace_settings & TRACE_SHELL_CMD )
trace_printf("%s@%d [%s] waiting for death message from nid=%d, "
"pid=%d.\n", method_name, __LINE__, MyName, nid, pid);
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/tmsync.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/tmsync.cxx b/core/sqf/monitor/linux/tmsync.cxx
index b72b896..3e72241 100644
--- a/core/sqf/monitor/linux/tmsync.cxx
+++ b/core/sqf/monitor/linux/tmsync.cxx
@@ -236,7 +236,7 @@ void CTmSync_Container::CommitTmDataBlock( int return_code )
if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
trace_printf("%s@%d" " - Getting all nodes TmSyncState\n", method_name, __LINE__);
- ExchangeTmSyncState();
+ ExchangeTmSyncState( false );
state = Nodes->GetTmState( SyncState_Commit );
if (( state == SyncState_Abort ) ||
( state == SyncState_Null ) )
@@ -267,7 +267,7 @@ void CTmSync_Container::CommitTmDataBlock( int return_code )
// End the TM sync processing cycle for my node.
MyNode->SetTmSyncState( SyncState_Null );
MyNode->SetTmSyncNid( -1 );
- ExchangeTmSyncState();
+ ExchangeTmSyncState( false );
if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
trace_printf("%s@%d" " - Physical Node " "%d" " TmSyncState updated (" "%d" ")" "\n", method_name, __LINE__, MyPNID, MyNode->GetTmSyncState());
@@ -318,9 +318,9 @@ int CTmSync_Container::CoordinateTmDataBlock ( struct sync_def *sync )
trace_printf("%s@%d" " - Physical Node %d TmSyncState updated (nid=%d, state=%d)\n", method_name, __LINE__, MyPNID, MyNode->GetTmSyncNid(), MyNode->GetTmSyncState());
}
syncCycle_.lock();
- exchangeTmSyncData( sync );
+ exchangeTmSyncData( sync, false );
syncCycle_.unlock();
- ExchangeTmSyncState();
+ ExchangeTmSyncState( false );
if (( Monitor->TmSyncPNid == MyPNID ) &&
( Nodes->GetTmState( SyncState_Start ) == SyncState_Start ) )
{
@@ -391,7 +391,7 @@ int CTmSync_Container::CoordinateTmDataBlock ( struct sync_def *sync )
trace_printf("%s@%d" " - Physical Node " "%d" " TmSyncState updated (" "%d" ")" "\n", method_name, __LINE__, MyPNID, MyNode->GetTmSyncState());
}
UnPackSyncData(sync);
- ExchangeTmSyncState();
+ ExchangeTmSyncState( true );
}
else
{
@@ -700,7 +700,7 @@ void CTmSync_Container::ProcessTmSyncReply( struct message_def * msg )
TRACE_EXIT;
}
-void CTmSync_Container::ExchangeTmSyncState( void )
+void CTmSync_Container::ExchangeTmSyncState( bool bumpSync )
{
struct sync_def sync;
@@ -714,7 +714,7 @@ void CTmSync_Container::ExchangeTmSyncState( void )
sync.count = 0;
sync.length = 0;
syncCycle_.lock();
- exchangeTmSyncData( &sync );
+ exchangeTmSyncData( &sync, bumpSync );
syncCycle_.unlock();
TRACE_EXIT;
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/tmsync.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/tmsync.h b/core/sqf/monitor/linux/tmsync.h
index 75d4f47..9d0aaf4 100644
--- a/core/sqf/monitor/linux/tmsync.h
+++ b/core/sqf/monitor/linux/tmsync.h
@@ -138,7 +138,7 @@ private:
void EndTmSync( MSGTYPE type );
void EndPendingTmSync( struct sync_def *sync );
- void ExchangeTmSyncState( void );
+ void ExchangeTmSyncState( bool bumpSync );
struct sync_def *PackSyncData( void ); // return sync_def for current TmSyncReqQ entries
void SendUnsolicitedMessages( void );
void UnPackSyncData( struct sync_def *sync ); // process sync_def received from another node
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/zclient.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx
index aca0a85..23dca8a 100644
--- a/core/sqf/monitor/linux/zclient.cxx
+++ b/core/sqf/monitor/linux/zclient.cxx
@@ -708,9 +708,17 @@ int CZClient::InitializeZClient( void )
TRACE_ENTRY;
int rc;
+ int retries = 0;
rc = MakeClusterZNodes();
+ while ( rc != ZOK && retries < ZOOKEEPER_RETRY_COUNT)
+ {
+ sleep(ZOOKEEPER_RETRY_WAIT);
+ retries++;
+ rc = MakeClusterZNodes();
+ }
+
if ( rc == ZOK )
{
rc = RegisterMyNodeZNode();
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/sqenvcom.sh
----------------------------------------------------------------------
diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh
index 6321ecf..bb1638e 100644
--- a/core/sqf/sqenvcom.sh
+++ b/core/sqf/sqenvcom.sh
@@ -47,10 +47,6 @@ export PRODUCT_COPYRIGHT_HEADER="2015-2017 Apache Software Foundation"
##############################################################
export TRAFODION_ENABLE_AUTHENTICATION=${TRAFODION_ENABLE_AUTHENTICATION:-NO}
-# Uncomment Trafodion Configuration store type
-#export TRAF_CONFIG_DBSTORE=Sqlite
-#export TRAF_CONFIG_DBSTORE=Zookeeper
-
# default SQ_IC to TCP if it is not set in sqenv.sh. Values are
# IBV for infiniband, TCP for tcp
export SQ_IC=${SQ_IC:-TCP}
@@ -681,10 +677,9 @@ export SQ_MON_KEEPINTVL=6
export SQ_MON_KEEPCNT=5
# Monitor sync thread epoll wait timeout is in seconds
-# Currently set to 64 seconds (4 second timeout, 16 retries)
-export SQ_MON_EPOLL_WAIT_TIMEOUT=4
-# Note: the retry count is ignored when SQ_MON_ZCLIENT_ENABLED=1 (the default)
-export SQ_MON_EPOLL_RETRY_COUNT=16
+# Currently set to 64 seconds (16 second timeout, 4 retries)
+export SQ_MON_EPOLL_WAIT_TIMEOUT=16
+export SQ_MON_EPOLL_RETRY_COUNT=4
# Monitor Zookeeper client
# - A zero value disables the zclient logic in the monitor process.
[2/4] incubator-trafodion git commit: [TRAFODION-2651] Fixed
monitor-to-monitor communication by adding reconnect logic to better handle
various communication glitches which previously resulted in false node down
situations.
Posted by su...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/monitor/linux/cluster.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx
index 585ff0a..c65116b 100644
--- a/core/sqf/monitor/linux/cluster.cxx
+++ b/core/sqf/monitor/linux/cluster.cxx
@@ -361,13 +361,14 @@ void CCluster::NodeReady( CNode *spareNode )
// Assigns a new TMLeader if given pnid is same as TmLeaderNid
// TmLeader is a logical node num.
// pnid has gone down, so if that node was previously the TM leader, a new one needs to be chosen.
-void CCluster::AssignTmLeader(int pnid)
+void CCluster::AssignTmLeader( int pnid, bool checkProcess )
{
const char method_name[] = "CCluster::AssignTmLeader";
TRACE_ENTRY;
int i = 0;
CNode *node = NULL;
+ CProcess *process = NULL;
int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid();
@@ -375,24 +376,50 @@ void CCluster::AssignTmLeader(int pnid)
{
node = LNode[TmLeaderNid]->GetNode();
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+ if (checkProcess)
{
- trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, isSoftNodeDown=%d\n"
- , method_name, __LINE__
- , node->GetPNid()
- , node->GetName()
- , NodePhaseString(node->GetPhase())
- , node->IsSoftNodeDown());
+ process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM );
+ if (process)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+ {
+ if (node)
+ trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, "
+ "isSoftNodeDown=%d, checkProcess=%d\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , NodePhaseString(node->GetPhase())
+ , node->IsSoftNodeDown()
+ , checkProcess );
+ }
+ return;
+ }
+ }
+ else
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
+ {
+ if (node)
+ trace_printf( "%s@%d - Node pnid=%d (%s), phase=%s, "
+ "isSoftNodeDown=%d, checkProcess=%d\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , NodePhaseString(node->GetPhase())
+ , node->IsSoftNodeDown()
+ , checkProcess );
+ }
+ return;
}
-
- return;
}
node = Node[TmLeaderPNid];
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
{
- trace_printf("%s@%d" " - Node " "%d" " TmLeader failed." "\n", method_name, __LINE__, TmLeaderNid);
+ trace_printf( "%s@%d" " - Node " "%d" " TmLeader failed! (checkProcess=%d)\n"
+ , method_name, __LINE__, TmLeaderNid, checkProcess );
}
for (i=0; i<GetConfigPNodesMax(); i++)
@@ -436,6 +463,15 @@ void CCluster::AssignTmLeader(int pnid)
TmLeaderNid = node->GetFirstLNode()->GetNid();
+ if (checkProcess)
+ {
+ process = LNode[TmLeaderNid]->GetProcessLByType( ProcessType_DTM );
+ if (!process)
+ {
+ continue; // skip this node no DTM process exists
+ }
+ }
+
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
{
trace_printf("%s@%d" " - Node " "%d" " is the new TmLeader." "\n", method_name, __LINE__, TmLeaderNid);
@@ -483,7 +519,11 @@ CCluster::CCluster (void)
integratingPNid_(-1),
joinComm_(MPI_COMM_NULL),
joinSock_(-1),
- seqNum_(0),
+ lastSeqNum_(0),
+ lowSeqNum_(0),
+ highSeqNum_(0),
+ reconnectSeqNum_(0),
+ seqNum_(1),
waitForWatchdogExit_(false)
,checkSeqNum_(false)
,validateNodeDown_(false)
@@ -511,6 +551,9 @@ CCluster::CCluster (void)
method_name, __LINE__,
(checkSeqNum_ ? "enabled" : "disabled"));
+ CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
+ configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
+
// Compute minimum "sync cycles" per second. The minimum is 1/10
// the expected number, assuming "next_test_delay" cycles per second (where
// next_test_delay is in microseconds).
@@ -521,8 +564,6 @@ CCluster::CCluster (void)
agMinElapsed_.tv_sec = 10000;
agMinElapsed_.tv_nsec = 0;
- tmSyncBuffer_ = Nodes->GetSyncBuffer();
-
// Allocate structures for monitor point-to-point communications
//
// The current approach is to allocate to a maximum number (MAX_NODES).
@@ -547,6 +588,7 @@ CCluster::CCluster (void)
{
comms_[i] = MPI_COMM_NULL;
socks_[i] = -1;
+ sockPorts_[i] = -1;
}
env = getenv("SQ_MON_NODE_DOWN_VALIDATION");
@@ -647,7 +689,7 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
{
trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
}
- if (nodestate[i].seq_num > 0)
+ if (nodestate[i].seq_num > 1)
{
if (seqNum == 0)
{
@@ -658,6 +700,10 @@ unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[])
assert(nodestate[i].seq_num == seqNum);
}
}
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum );
+ }
}
TRACE_EXIT;
@@ -839,7 +885,7 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state)
if ( Emulate_Down )
{
IAmIntegrated = false;
- AssignTmLeader(pnid);
+ AssignTmLeader(pnid, false);
}
TRACE_EXIT;
@@ -940,7 +986,7 @@ void CCluster::SoftNodeDown( int pnid )
}
IAmIntegrated = false;
- AssignTmLeader(pnid);
+ AssignTmLeader(pnid, false);
TRACE_EXIT;
}
@@ -1303,9 +1349,10 @@ int CCluster::HardNodeUp( int pnid, char *node_name )
, method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) );
for ( int i =0; i < Nodes->GetPNodesCount(); i++ )
{
- trace_printf( "%s@%d socks_[indexToPnid_[%d]=%d]=%d\n"
+ trace_printf( "%s@%d socks_[indexToPnid_[%d]=%d]=%d, sockPorts_[indexToPnid_[%d]=%d]=%d\n"
, method_name, __LINE__
- , i, indexToPnid_[i], socks_[indexToPnid_[i]] );
+ , i, indexToPnid_[i], socks_[indexToPnid_[i]]
+ , i, indexToPnid_[i], sockPorts_[indexToPnid_[i]] );
}
}
if ( MyNode->IsCreator() )
@@ -1616,8 +1663,9 @@ const char *SyncStateString( SyncState state)
}
-void CCluster::AddTmsyncMsg (struct sync_def *sync,
- struct internal_msg_def *msg)
+void CCluster::AddTmsyncMsg( struct sync_buffer_def *tmSyncBuffer
+ , struct sync_def *sync
+ , struct internal_msg_def *msg)
{
const char method_name[] = "CCluster::AddTmsyncMsg";
TRACE_ENTRY;
@@ -1647,12 +1695,12 @@ void CCluster::AddTmsyncMsg (struct sync_def *sync,
// Insert the message size into the message header
msg->replSize = msgSize;
- tmSyncBuffer_->msgInfo.msg_count = 1;
- tmSyncBuffer_->msgInfo.msg_offset += msgSize;
+ tmSyncBuffer->msgInfo.msg_count = 1;
+ tmSyncBuffer->msgInfo.msg_offset += msgSize;
// Set end-of-buffer marker
msg = (struct internal_msg_def *)
- &tmSyncBuffer_->msg[tmSyncBuffer_->msgInfo.msg_offset];
+ &tmSyncBuffer->msg[tmSyncBuffer->msgInfo.msg_offset];
msg->type = InternalType_Null;
TRACE_EXIT;
@@ -2787,7 +2835,6 @@ void CCluster::InitializeConfigCluster( void )
MPI_Comm_size (MPI_COMM_WORLD, &worldSize);
int rankToPnid[worldSize];
CClusterConfig *clusterConfig = Nodes->GetClusterConfig();
- configPNodesMax_ = clusterConfig->GetPNodesConfigMax();
CurNodes = worldSize;
@@ -2931,7 +2978,7 @@ void CCluster::InitializeConfigCluster( void )
node->SetCommPort( &commPortNums[ i * MPI_MAX_PORT_NAME] );
node->SetSyncPort( &syncPortNums[ i * MPI_MAX_PORT_NAME] );
rankToPnid[i] = node->GetPNid();
- nodeStatus[i] = true;
+ nodeStatus[rankToPnid[i]] = true;
if (trace_settings & TRACE_INIT)
{
@@ -2967,7 +3014,7 @@ void CCluster::InitializeConfigCluster( void )
// Any nodes not in the initial MPI_COMM_WORLD are down.
for (int i=0; i<GetConfigPNodesCount(); ++i)
{
- if ( nodeStatus[i] == false )
+ if ( nodeStatus[indexToPnid_[i]] == false )
{
if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
trace_printf( "%s@%d - nodeStatus[%d]=%d"
@@ -2982,7 +3029,7 @@ void CCluster::InitializeConfigCluster( void )
// assign new TmLeader if TMLeader node is dead.
if (TmLeaderPNid == indexToPnid_[i])
{
- AssignTmLeader(indexToPnid_[i]);
+ AssignTmLeader(indexToPnid_[i], false);
}
}
else
@@ -3327,6 +3374,132 @@ void CCluster::SendReIntegrateStatus( STATE nodeState, int initErr )
}
}
+bool CCluster::PingSockPeer(CNode *node)
+{
+ const char method_name[] = "CCluster::PingSockPeer";
+ TRACE_ENTRY;
+
+ bool rs = true;
+ int rc;
+ int pingSock = -1;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Pinging remote monitor %s, pnid=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid() );
+ }
+
+ // Attempt to connect with remote monitor
+ for (int i = 0; i < MAX_RECONN_PING_RETRY_COUNT; i++ )
+ {
+ pingSock = Monitor->Connect( node->GetCommPort() );
+ if ( pingSock < 0 )
+ {
+ sleep( MAX_RECONN_PING_WAIT_TIMEOUT );
+ }
+ else
+ {
+ break;
+ }
+ }
+ if ( pingSock < 0 )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Can't connect to remote monitor %s, pnid=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid() );
+ }
+ return(false);
+ }
+
+ nodeId_t nodeInfo;
+
+ nodeInfo.pnid = MyPNID;
+ strcpy(nodeInfo.nodeName, MyNode->GetName());
+ strcpy(nodeInfo.commPort, MyNode->GetCommPort());
+ strcpy(nodeInfo.syncPort, MyNode->GetSyncPort());
+ nodeInfo.ping = true;
+ nodeInfo.creatorPNid = -1;
+ nodeInfo.creator = false;
+ nodeInfo.creatorShellPid = -1;
+ nodeInfo.creatorShellVerifier = -1;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "Sending my nodeInfo.pnid=%d\n"
+ " nodeInfo.nodeName=%s\n"
+ " nodeInfo.commPort=%s\n"
+ " nodeInfo.syncPort=%s\n"
+ " nodeInfo.creatorPNid=%d\n"
+ " nodeInfo.creator=%d\n"
+ " nodeInfo.creatorShellPid=%d\n"
+ " nodeInfo.creatorShellVerifier=%d\n"
+ " nodeInfo.ping=%d\n"
+ , nodeInfo.pnid
+ , nodeInfo.nodeName
+ , nodeInfo.commPort
+ , nodeInfo.syncPort
+ , nodeInfo.creatorPNid
+ , nodeInfo.creator
+ , nodeInfo.creatorShellPid
+ , nodeInfo.creatorShellVerifier
+ , nodeInfo.ping );
+ }
+
+ rc = Monitor->SendSock( (char *) &nodeInfo
+ , sizeof(nodeId_t)
+ , pingSock );
+
+ if ( rc )
+ {
+ rs = false;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Cannot send ping node info to node %s: (%s)\n"
+ , method_name, node->GetName(), ErrorMsg(rc));
+ mon_log_write(MON_PINGSOCKPEER_1, SQ_LOG_ERR, buf);
+ }
+ else
+ {
+ // Get info about connecting monitor
+ rc = Monitor->ReceiveSock( (char *) &nodeInfo
+ , sizeof(nodeId_t)
+ , pingSock );
+ if ( rc )
+ { // Handle error
+ rs = false;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Cannot receive ping node info from node %s: (%s)\n"
+ , method_name, node->GetName(), ErrorMsg(rc));
+ mon_log_write(MON_PINGSOCKPEER_2, SQ_LOG_ERR, buf);
+ }
+ else
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "Received from nodeInfo.pnid=%d\n"
+ " nodeInfo.nodeName=%s\n"
+ " nodeInfo.commPort=%s\n"
+ " nodeInfo.syncPort=%s\n"
+ " nodeInfo.ping=%d\n"
+ , nodeInfo.pnid
+ , nodeInfo.nodeName
+ , nodeInfo.commPort
+ , nodeInfo.syncPort
+ , nodeInfo.ping );
+ }
+ }
+ }
+
+ close( pingSock );
+
+ TRACE_EXIT;
+ return( rs );
+}
+
void CCluster::ReIntegrate( int initProblem )
{
const char method_name[] = "CCluster::ReIntegrate";
@@ -3607,6 +3780,10 @@ void CCluster::ReIntegrateSock( int initProblem )
int rc;
int existingCommFd;
int existingSyncFd;
+ char commPort[MPI_MAX_PORT_NAME];
+ char syncPort[MPI_MAX_PORT_NAME];
+ char *pch1;
+ char *pch2;
// Set bit indicating my node is up
upNodes_.upNodes[MyPNID/MAX_NODE_BITMASK] |= (1ull << (MyPNID%MAX_NODE_BITMASK));
@@ -3642,12 +3819,6 @@ void CCluster::ReIntegrateSock( int initProblem )
TEST_POINT( TP011_NODE_UP );
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf("%s@%d Connected to creator monitor, sending my node info\n",
- method_name, __LINE__);
- }
-
// Send this node's name and port number so creator monitor
// knows who we are, and set flag to let creator monitor it is the CREATOR.
nodeId_t myNodeInfo;
@@ -3662,21 +3833,14 @@ void CCluster::ReIntegrateSock( int initProblem )
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d - Sending my node info to creator monitor\n"
- , method_name, __LINE__);
- trace_printf( "Node info for pnid=%d\n"
- " myNodeInfo.nodeName=%s\n"
- " myNodeInfo.commPort=%s\n"
- " myNodeInfo.syncPort=%s\n"
- " myNodeInfo.creatorPNid=%d\n"
- " myNodeInfo.creator=%d\n"
- " myNodeInfo.creatorShellPid=%d\n"
- " myNodeInfo.creatorShellVerifier=%d\n"
+ trace_printf( "%s@%d - Connected to creator monitor, sending my info, "
+ "node %d (%s), commPort=%s, syncPort=%s, creator=%d, "
+ "creatorShellPid=%d:%d\n"
+ , method_name, __LINE__
, myNodeInfo.pnid
, myNodeInfo.nodeName
, myNodeInfo.commPort
, myNodeInfo.syncPort
- , myNodeInfo.creatorPNid
, myNodeInfo.creator
, myNodeInfo.creatorShellPid
, myNodeInfo.creatorShellVerifier );
@@ -3774,10 +3938,33 @@ void CCluster::ReIntegrateSock( int initProblem )
otherMonRank_[nodeInfo[i].pnid] = 0;
++CurNodes;
- Node[nodeInfo[i].pnid]->SetCommPort( nodeInfo[i].commPort );
- Node[nodeInfo[i].pnid]->SetSyncPort( nodeInfo[i].syncPort );
+ // Store port numbers for the node
+ strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME);
+ strncpy(syncPort, nodeInfo[i].syncPort, MPI_MAX_PORT_NAME);
+
+ Node[nodeInfo[i].pnid]->SetCommPort( commPort );
+ pch1 = strtok (commPort,":");
+ pch1 = strtok (NULL,":");
+ Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+
+ Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
+ pch2 = strtok (syncPort,":");
+ pch2 = strtok (NULL,":");
+ Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+ sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
+
Node[nodeInfo[i].pnid]->SetState( State_Up );
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+ , method_name, __LINE__
+ , Node[nodeInfo[i].pnid]->GetPNid()
+ , Node[nodeInfo[i].pnid]->GetName()
+ , pch1, atoi(pch1)
+ , pch2, atoi(pch2) );
+ }
+
// Tell creator we are ready to accept its connection
int mypnid = MyPNID;
rc = Monitor->SendSock( (char *) &mypnid
@@ -3897,10 +4084,34 @@ void CCluster::ReIntegrateSock( int initProblem )
otherMonRank_[nodeInfo[i].pnid] = 0;
++CurNodes;
- Node[nodeInfo[i].pnid]->SetCommPort( nodeInfo[i].commPort );
- Node[nodeInfo[i].pnid]->SetSyncPort( nodeInfo[i].syncPort );
+
+ // Store port numbers for the node
+ strncpy(commPort, nodeInfo[i].commPort, MPI_MAX_PORT_NAME);
+ strncpy(syncPort, nodeInfo[i].syncPort, MPI_MAX_PORT_NAME);
+
+ Node[nodeInfo[i].pnid]->SetCommPort( commPort );
+ pch1 = strtok (commPort,":");
+ pch1 = strtok (NULL,":");
+ Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+
+ Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
+ pch2 = strtok (syncPort,":");
+ pch2 = strtok (NULL,":");
+ Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+ sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
+
Node[nodeInfo[i].pnid]->SetState( State_Up );
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
+ , method_name, __LINE__
+ , Node[nodeInfo[i].pnid]->GetPNid()
+ , Node[nodeInfo[i].pnid]->GetName()
+ , pch1, atoi(pch1)
+ , pch2, atoi(pch2) );
+ }
+
// Connect to existing monitor
existingSyncFd = AcceptSyncSock();
if ( existingSyncFd < 0 )
@@ -3969,9 +4180,10 @@ void CCluster::ReIntegrateSock( int initProblem )
}
for ( int i =0; i < pnodeCount; i++ )
{
- trace_printf( "%s@%d socks_[%d]=%d\n"
+ trace_printf( "%s@%d socks_[%d]=%d, sockPorts_[%d]=%d\n"
, method_name, __LINE__
- , nodeInfo[i].pnid, socks_[nodeInfo[i].pnid]);
+ , nodeInfo[i].pnid, socks_[nodeInfo[i].pnid]
+ , nodeInfo[i].pnid, sockPorts_[nodeInfo[i].pnid]);
}
for ( int i =0; i < MAX_NODE_MASKS ; i++ )
{
@@ -4224,16 +4436,24 @@ void CCluster::setNewSock( int pnid )
--CurNodes;
}
- if (trace_settings & TRACE_RECOVERY)
- {
- trace_printf("%s@%d - setting new communicator for pnid %d, "
- "otherRank=%d\n",
- method_name, __LINE__, it->pnid, it->otherRank);
- }
-
+ CNode *node= Nodes->GetNode( it->pnid );
socks_[it->pnid] = it->socket;
+ sockPorts_[it->pnid] = node->GetSyncSocketPort();
otherMonRank_[it->pnid] = it->otherRank;
++CurNodes;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting new communicator for %d (%s), "
+ "socks_[%d]=%d, sockPorts_[%d]=%d, otherMonRank_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetPNid()
+ , node->GetName()
+ , it->pnid, socks_[it->pnid]
+ , it->pnid, sockPorts_[it->pnid]
+ , it->pnid, otherMonRank_[it->pnid] );
+ }
+
// Set bit indicating node is up
upNodes_.upNodes[it->pnid/MAX_NODE_BITMASK] |= (1ull << (it->pnid%MAX_NODE_BITMASK));
@@ -4419,20 +4639,9 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
const char method_name[] = "CCluster::AllgatherSock";
TRACE_ENTRY;
+ bool reconnecting = false;
static int hdrSize = Nodes->GetSyncHdrSize( );
int err = MPI_SUCCESS;
- typedef struct
- {
- int p_sent;
- int p_received;
- int p_n2recv;
- bool p_sending;
- bool p_receiving;
- int p_timeout_count;
- bool p_initial_check;
- char *p_buff;
- struct timespec znodeFailedTime;
- } peer_t;
peer_t p[GetConfigPNodesMax()];
memset( p, 0, sizeof(p) );
tag = 0; // make compiler happy
@@ -4442,12 +4651,12 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
? (ZClient->GetSessionTimeout() * 2) : 120;
int nsent = 0, nrecv = 0;
- for ( int iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
+ for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
{
- peer_t *peer = &p[iPeer];
- stats[iPeer].MPI_ERROR = MPI_SUCCESS;
- stats[iPeer].count = 0;
- if ( iPeer == MyPNID || socks_[iPeer] == -1 )
+ peer_t *peer = &p[indexToPnid_[iPeer]];
+ stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_SUCCESS;
+ stats[indexToPnid_[iPeer]].count = 0;
+ if ( indexToPnid_[iPeer] == MyPNID || socks_[indexToPnid_[iPeer]] == -1 )
{
peer->p_sending = peer->p_receiving = false;
nsent++;
@@ -4460,11 +4669,31 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
peer->p_timeout_count = 0;
peer->p_initial_check = true;
peer->p_n2recv = -1;
- peer->p_buff = ((char *) rbuf) + (iPeer * CommBufSize);
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[iPeer] * CommBufSize);
+
struct epoll_event event;
- event.data.fd = socks_[iPeer];
+ event.data.fd = socks_[indexToPnid_[iPeer]];
event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
- EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[iPeer], &event );
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[iPeer]], &event );
+ }
+ }
+
+ if (trace_settings & (TRACE_SYNC | TRACE_SYNC_DETAIL))
+ {
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ peer_t *peer = &p[indexToPnid_[i]];
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
}
}
@@ -4493,8 +4722,8 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
else
{
// default to 64 seconds
- sv_epoll_wait_timeout = 4000;
- sv_epoll_retry_count = 16;
+ sv_epoll_wait_timeout = 16000;
+ sv_epoll_retry_count = 4;
}
char buf[MON_STRING_BUF_SIZE];
@@ -4512,38 +4741,128 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
struct epoll_event events[2*GetConfigPNodesMax() + 1];
while ( 1 )
{
- int maxEvents = 2*GetConfigPNodesMax() - nsent - nrecv;
+reconnected:
+ int maxEvents = 2*GetConfigPNodesCount() - nsent - nrecv;
if ( maxEvents == 0 ) break;
int nw;
int zerr = ZOK;
+
while ( 1 )
{
nw = epoll_wait( epollFD_, events, maxEvents, sv_epoll_wait_timeout );
if ( nw >= 0 || errno != EINTR ) break;
}
+
if ( nw == 0 )
- {
- for ( int iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
+ { // Timeout, no fd's ready
+ for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
{
- peer_t *peer = &p[iPeer];
- if ( (iPeer != MyPNID) &&
- (socks_[iPeer] != -1) )
+ peer_t *peer = &p[indexToPnid_[iPeer]];
+ if ( (indexToPnid_[iPeer] != MyPNID) && (socks_[indexToPnid_[iPeer]] != -1) )
{
- if ( (peer->p_receiving) ||
- (peer->p_sending) )
+ if ( (peer->p_receiving) || (peer->p_sending) )
{
if ( ! ZClientEnabled )
{
- peer->p_timeout_count++;
+ if (peer->p_initial_check && !reconnecting)
+ {
+ peer->p_initial_check = false;
+ clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
+ peer->znodeFailedTime.tv_sec += sessionTimeout;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n"
+ , method_name, __LINE__
+ , peer->znodeFailedTime.tv_sec);
+ }
+ }
if ( peer->p_timeout_count < sv_epoll_retry_count )
{
+ peer->p_timeout_count++;
+
+ if (IsRealCluster)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+ " timeout count=%d,"
+ " sending=%d,"
+ " receiving=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), iPeer
+ , peer->p_timeout_count
+ , peer->p_sending
+ , peer->p_receiving);
+ }
+ // Attempt reconnect to all peers
+ err = AllgatherSockReconnect( stats );
+ // Redrive IOs on live peer connections
+ nsent = 0; nrecv = 0;
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ peer_t *peer = &p[indexToPnid_[i]];
+ if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+ {
+ peer->p_sending = peer->p_receiving = false;
+ nsent++;
+ nrecv++;
+ }
+ else
+ {
+ peer->p_sending = peer->p_receiving = true;
+ peer->p_sent = peer->p_received = 0;
+ peer->p_timeout_count = 0;
+ peer->p_n2recv = -1;
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+
+ struct epoll_event event;
+ event.data.fd = socks_[indexToPnid_[i]];
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event );
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
+ }
+ reconnectSeqNum_ = seqNum_;
+ reconnecting = true;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Reconnecting!\n"
+ , method_name, __LINE__ );
+ }
+ goto reconnected;
+ }
continue;
}
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d - Peer timeout triggered: "
+ "peer->p_timeout_count %d, "
+ "sv_epoll_retry_count %d\n"
+ " socks_[%d]=%d\n"
+ " stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , peer->p_timeout_count
+ , sv_epoll_retry_count
+ , indexToPnid_[iPeer]
+ , socks_[indexToPnid_[iPeer]]
+ , iPeer
+ , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
+ }
}
else
{
- if (peer->p_initial_check)
+ if (peer->p_initial_check && !reconnecting)
{
peer->p_initial_check = false;
clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
@@ -4557,7 +4876,7 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
}
// If not expired, stay in the loop
- if ( ! ZClient->IsZNodeExpired( Node[iPeer]->GetName(), zerr ))
+ if ( ! ZClient->IsZNodeExpired( Node[indexToPnid_[iPeer]]->GetName(), zerr ))
{
if ( zerr == ZCONNECTIONLOSS || zerr == ZOPERATIONTIMEOUT )
{
@@ -4572,10 +4891,71 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
if ( peer->p_timeout_count < sv_epoll_retry_count )
{
+ if (IsRealCluster)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+ " timeout count=%d,"
+ " sending=%d,"
+ " receiving=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_timeout_count
+ , peer->p_sending
+ , peer->p_receiving);
+ }
+ // Attempt reconnect to all peers
+ err = AllgatherSockReconnect( stats );
+ // Redrive IOs on live peer connections
+ nsent = 0; nrecv = 0;
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ peer_t *peer = &p[indexToPnid_[i]];
+ if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+ {
+ peer->p_sending = peer->p_receiving = false;
+ nsent++;
+ nrecv++;
+ }
+ else
+ {
+ peer->p_sending = peer->p_receiving = true;
+ peer->p_sent = peer->p_received = 0;
+ peer->p_timeout_count = 0;
+ peer->p_n2recv = -1;
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+
+ struct epoll_event event;
+ event.data.fd = socks_[indexToPnid_[i]];
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
+ }
+ reconnectSeqNum_ = seqNum_;
+ reconnecting = true;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Reconnecting!\n"
+ , method_name, __LINE__ );
+ }
+ goto reconnected;
+ }
continue;
}
}
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ if (trace_settings & TRACE_RECOVERY)
{
trace_printf( "%s@%d - Znode Failed triggered\n"
" Current Time %ld(secs)\n"
@@ -4592,58 +4972,99 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
if ( peer->p_timeout_count < sv_epoll_retry_count )
{
+ if (IsRealCluster)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
+ " timeout count=%d,"
+ " sending=%d,"
+ " receiving=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_timeout_count
+ , peer->p_sending
+ , peer->p_receiving);
+ }
+ // Attempt reconnect to all peers
+ err = AllgatherSockReconnect( stats );
+ // Redrive IOs on live peer connections
+ nsent = 0; nrecv = 0;
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ peer_t *peer = &p[indexToPnid_[i]];
+ if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+ {
+ peer->p_sending = peer->p_receiving = false;
+ nsent++;
+ nrecv++;
+ }
+ else
+ {
+ peer->p_sending = peer->p_receiving = true;
+ peer->p_sent = peer->p_received = 0;
+ peer->p_timeout_count = 0;
+ peer->p_n2recv = -1;
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+
+ struct epoll_event event;
+ event.data.fd = socks_[indexToPnid_[i]];
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
+ }
+ reconnectSeqNum_ = seqNum_;
+ reconnecting = true;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Reconnecting!\n"
+ , method_name, __LINE__ );
+ }
+ goto reconnected;
+ }
continue;
}
}
}
}
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s@%d] Not heard from peer=%d (node=%s) "
- "(current seq # is %lld)\n"
- , method_name
- , __LINE__
- , iPeer
- , Node[iPeer]->GetName()
- , seqNum_ );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_1, SQ_LOG_CRIT, buf );
-
- stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
- err = MPI_ERR_IN_STATUS;
- if ( peer->p_sending )
- {
- peer->p_sending = false;
- nsent++;
- }
- if ( peer->p_receiving )
+ if (trace_settings & TRACE_RECOVERY)
{
- peer->p_receiving = false;
- nrecv++;
+ trace_printf( "%s@%d - err=%d, socks_[%d]=%d, stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , err
+ , indexToPnid_[iPeer]
+ , socks_[indexToPnid_[iPeer]]
+ , indexToPnid_[iPeer]
+ , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
}
- // setup the epoll structures
- struct epoll_event event;
- event.data.fd = socks_[iPeer];
- int op = 0;
- if ( !peer->p_sending && !peer->p_receiving )
- {
- op = EPOLL_CTL_DEL;
- event.events = 0;
- }
- else if ( peer->p_sending )
- {
- op = EPOLL_CTL_MOD;
- event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
- }
- else if ( peer->p_receiving )
+ if ( err == MPI_ERR_IN_STATUS
+ && stats[indexToPnid_[iPeer]].MPI_ERROR == MPI_ERR_EXITED)
{
- op = EPOLL_CTL_MOD;
- event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
- }
- if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD )
- {
- EpollCtl( epollFD_, op, socks_[iPeer], &event );
+ // At this point, this peer is not responding and
+ // reconnects failed or its znode expired
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] Not heard from peer=%d (node=%s) "
+ "(current seq # is %lld)\n"
+ , method_name
+ , __LINE__
+ , indexToPnid_[iPeer]
+ , Node[indexToPnid_[iPeer]]->GetName()
+ , seqNum_ );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
}
}
}
@@ -4651,35 +5072,43 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
}
if ( nw < 0 )
- {
+ { // Got an error
char ebuff[256];
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf), "[%s@%d] epoll_wait(%d, %d) error: %s\n",
method_name, __LINE__, epollFD_, maxEvents,
strerror_r( errno, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
MPI_Abort( MPI_COMM_SELF,99 );
}
+
+ // Process fd's which are ready to initiate an IO or completed IO
for ( int iEvent = 0; iEvent < nw; iEvent++ )
{
bool stateChange = false;
int fd = events[iEvent].data.fd;
int iPeer;
- for ( iPeer = 0; iPeer < GetConfigPNodesMax(); iPeer++ )
- {
- if ( events[iEvent].data.fd == socks_[iPeer] ) break;
+ for ( iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
+ { // Find corresponding peer by matching socket fd
+ if ( events[iEvent].data.fd == socks_[indexToPnid_[iPeer]] ) break;
}
- if ( iPeer < 0 || iPeer >= GetConfigPNodesMax() || iPeer == MyPNID
- || socks_[iPeer] == -1
- || (!p[iPeer].p_sending && !p[iPeer].p_receiving) )
+ if ( indexToPnid_[iPeer] < 0 || indexToPnid_[iPeer] >= GetConfigPNodesMax() || indexToPnid_[iPeer] == MyPNID
+ || socks_[indexToPnid_[iPeer]] == -1
+ || (!p[indexToPnid_[iPeer]].p_sending && !p[indexToPnid_[iPeer]].p_receiving) )
{
char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s@%d] invalid peer %d\n",
- method_name, __LINE__, iPeer );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] Invalid peer %d, "
+ "peer.p_sending=%d, "
+ "peer.p_receiving=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[iPeer]
+ , indexToPnid_[iPeer] >= GetConfigPNodesMax()?-1:p[indexToPnid_[iPeer]].p_sending
+ , indexToPnid_[iPeer] >= GetConfigPNodesMax()?-1:p[indexToPnid_[iPeer]].p_receiving );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf );
MPI_Abort( MPI_COMM_SELF,99 );
}
- peer_t *peer = &p[iPeer];
+ peer_t *peer = &p[indexToPnid_[iPeer]];
if ( (events[iEvent].events & EPOLLERR) ||
(events[iEvent].events & EPOLLHUP) ||
( !(events[iEvent].events & (EPOLLIN|EPOLLOUT))) )
@@ -4690,13 +5119,13 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
snprintf( buf, sizeof(buf)
, "[%s@%d] Error: peer=%d, events[%d].data.fd=%d, event[%d]=%s\n"
, method_name, __LINE__
- , iPeer
+ , indexToPnid_[iPeer]
, iEvent
, events[iEvent].data.fd
, iEvent
, EpollEventString(events[iEvent].events) );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf );
- stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf );
+ stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
err = MPI_ERR_IN_STATUS;
if ( peer->p_sending )
{
@@ -4708,12 +5137,11 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
peer->p_receiving = false;
nrecv++;
}
- stateChange = 1;
+ stateChange = true;
goto early_exit;
}
- if ( peer->p_receiving
- && events[iEvent].events & EPOLLIN )
- {
+ if ( peer->p_receiving && events[iEvent].events & EPOLLIN )
+ { // Got receive (read) completion
int eagain_ok = 0;
read_again:
char *r = &peer->p_buff[peer->p_received];
@@ -4729,6 +5157,26 @@ read_again:
int nr;
while ( 1 )
{
+ if (trace_settings & TRACE_SYNC_DETAIL)
+ {
+ trace_printf( "%s@%d - EPOLLIN from %s(%d),"
+ " sending=%d,"
+ " receiving=%d (%d)"
+ " sent=%d,"
+ " received=%d"
+ " timeout_count=%d,"
+ " initial_check=%d,"
+ " n2recv=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_sending
+ , peer->p_receiving, n2get
+ , peer->p_sent
+ , peer->p_received
+ , peer->p_timeout_count
+ , peer->p_initial_check
+ , peer->p_n2recv );
+ }
nr = recv( fd, r, n2get, 0 );
if ( nr >= 0 || errno == EINTR ) break;
}
@@ -4746,8 +5194,8 @@ read_again:
snprintf( buf, sizeof(buf)
, "[%s@%d] recv[%d](%d) error %d (%s)\n"
, method_name, __LINE__
- , iPeer, nr , err, strerror(err) );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf );
+ , indexToPnid_[iPeer], nr , err, strerror(err) );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf );
peer->p_receiving = false;
nrecv++;
if ( peer->p_sending )
@@ -4755,7 +5203,7 @@ read_again:
peer->p_sending = false;
nsent++;
}
- stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+ stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
err = MPI_ERR_IN_STATUS;
stateChange = true;
}
@@ -4792,7 +5240,7 @@ read_again:
snprintf( buf, sizeof(buf),
"[%s@%d] error n2recv %d\n",
method_name, __LINE__, peer->p_n2recv );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_7, SQ_LOG_CRIT, buf );
MPI_Abort( MPI_COMM_SELF,99 );
}
if ( peer->p_n2recv == 0 )
@@ -4800,20 +5248,59 @@ read_again:
// this buffer is done
peer->p_receiving = false;
nrecv++;
- stats[iPeer].count = peer->p_received;
+ stats[indexToPnid_[iPeer]].count = peer->p_received;
+ if (trace_settings & TRACE_SYNC_DETAIL)
+ {
+ trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+ " sending=%d,"
+ " receiving=%d (%d)"
+ " sent=%d,"
+ " received=%d"
+ " timeout_count=%d,"
+ " initial_check=%d,"
+ " n2recv=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_sending
+ , peer->p_receiving, n2get
+ , peer->p_sent
+ , peer->p_received
+ , peer->p_timeout_count
+ , peer->p_initial_check
+ , peer->p_n2recv );
+ }
stateChange = true;
}
}
}
}
- if ( peer->p_sending
- && events[iEvent].events & EPOLLOUT )
- {
+ if ( peer->p_sending && events[iEvent].events & EPOLLOUT )
+ { // Got send (write) completion
char *s = &((char *)sbuf)[peer->p_sent];
int n2send = nbytes - peer->p_sent;
int ns;
while ( 1 )
{
+ if (trace_settings & TRACE_SYNC_DETAIL)
+ {
+ trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+ " sending=%d (%d),"
+ " receiving=%d"
+ " sent=%d,"
+ " received=%d"
+ " timeout_count=%d,"
+ " initial_check=%d,"
+ " n2recv=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_sending, n2send
+ , peer->p_receiving
+ , peer->p_sent
+ , peer->p_received
+ , peer->p_timeout_count
+ , peer->p_initial_check
+ , peer->p_n2recv );
+ }
ns = send( fd, s, n2send, 0 );
if ( ns >= 0 || errno != EINTR ) break;
}
@@ -4825,8 +5312,8 @@ read_again:
snprintf( buf, sizeof(buf)
, "[%s@%d] send[%d](%d) error=%d (%s)\n"
, method_name, __LINE__
- , iPeer, ns, err, strerror(err) );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_7, SQ_LOG_CRIT, buf );
+ , indexToPnid_[iPeer], ns, err, strerror(err) );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_8, SQ_LOG_CRIT, buf );
peer->p_sending = false;
nsent++;
if ( peer->p_receiving )
@@ -4834,7 +5321,7 @@ read_again:
peer->p_receiving = false;
nrecv++;
}
- stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
+ stats[indexToPnid_[iPeer]].MPI_ERROR = MPI_ERR_EXITED;
err = MPI_ERR_IN_STATUS;
stateChange = true;
}
@@ -4846,6 +5333,26 @@ read_again:
// finished sending to this destination
peer->p_sending = false;
nsent++;
+ if (trace_settings & TRACE_SYNC_DETAIL)
+ {
+ trace_printf( "%s@%d - EPOLLOUT to %s(%d),"
+ " sending=%d (%d),"
+ " receiving=%d"
+ " sent=%d,"
+ " received=%d"
+ " timeout_count=%d,"
+ " initial_check=%d,"
+ " n2recv=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , peer->p_sending, n2send
+ , peer->p_receiving
+ , peer->p_sent
+ , peer->p_received
+ , peer->p_timeout_count
+ , peer->p_initial_check
+ , peer->p_n2recv );
+ }
stateChange = true;
}
}
@@ -4854,7 +5361,7 @@ early_exit:
if ( stateChange )
{
struct epoll_event event;
- event.data.fd = socks_[iPeer];
+ event.data.fd = socks_[indexToPnid_[iPeer]];
int op = 0;
if ( !peer->p_sending && !peer->p_receiving )
{
@@ -4888,49 +5395,465 @@ early_exit:
return err;
}
-// When we get a communication error for a point-to-point monitor communicator
-// verify that the other nodes in the cluster also lost communications
-// with that monitor. If all nodes concur we consider that monitor
-// down.
-void CCluster::ValidateClusterState( cluster_state_def_t nodestate[],
- bool haveDivergence)
+int CCluster::AllgatherSockReconnect( MPI_Status *stats )
{
- const char method_name[] = "CCluster::ValidateClusterState";
-
- exitedMons_t::iterator it;
- upNodes_t nodeMask;
+ const char method_name[] = "CCluster::AllgatherSockReconnect";
+ TRACE_ENTRY;
- for ( int i =0; i < MAX_NODE_MASKS ; i++ )
- {
- nodeMask.upNodes[i] = 0;
- }
+ int err = MPI_SUCCESS;
+ int idst;
+ int reconnectSock = -1;
+ CNode *node;
- for ( it = exitedMons_.begin(); it != exitedMons_.end(); )
+ // Loop on each node in the cluster
+ for ( int i = 0; i < GetConfigPNodesMax(); i++ )
{
- if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT))
- {
- trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld"
- " (current seqNum_=%lld)\n", method_name, __LINE__,
- it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_);
- }
-
- if ( seqNum_ >= (it->seqNum + 2) )
+ // Loop on each adjacent node in the cluster
+ for ( int j = i+1; j < GetConfigPNodesMax(); j++ )
{
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, "
- "detected by node %d at seq #%lld "
- "(current seq # is %lld).\n",
- method_name, it->exitedPnid, it->detectingPnid,
- it->seqNum, seqNum_);
- mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf);
-
- int concurringNodes = 0;
+ if ( i == MyPNID )
+ { // Current [i] node is my node, so connect to [j] node
- // Check if all active nodes see the node as down.
- nodeMask.upNodes[it->exitedPnid/MAX_NODE_BITMASK] = 1ull << (it->exitedPnid%MAX_NODE_BITMASK);
- string setSeesUp;
- string setSeesDown;
- char nodeX[10];
+ idst = j;
+ node = Nodes->GetNode( idst );
+ if (!node) continue;
+ if (node->GetState() != State_Up)
+ {
+ if (socks_[idst] != -1)
+ { // Peer socket is still active
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
+ }
+ continue;
+ }
+ reconnectSock = ConnectSockPeer( node, idst );
+ if (reconnectSock == -1)
+ {
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
+ if (node->GetState() != State_Up)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
+ }
+ }
+ }
+ else if ( j == MyPNID )
+ { // Current [j] is my node, accept connection from peer [i] node
+
+ idst = i;
+ node = Nodes->GetNode( idst );
+ if (!node) continue;
+ if (node->GetState() != State_Up)
+ {
+ if (socks_[idst] != -1)
+ { // Peer socket is still active
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
+ }
+ continue;
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Pinging Node %s (%d) to see if it's up\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid() );
+ }
+ if (PingSockPeer(node))
+ {
+ reconnectSock = AcceptSockPeer( node, idst );
+ if (reconnectSock == -1)
+ {
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
+ if (node->GetState() != State_Up)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
+ }
+ }
+ }
+ else
+ {
+ if (socks_[idst] != -1)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst, socks_[idst] );
+ }
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[idst];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[idst], &event );
+ }
+ reconnectSock = -1;
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
+ }
+ }
+ else
+ {
+ idst = -1;
+ }
+ if ( idst >= 0
+ && reconnectSock != -1
+ && fcntl( socks_[idst], F_SETFL, O_NONBLOCK ) )
+ {
+ err = MPI_ERR_AMODE;
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] fcntl(NONBLOCK) error: %s\n",
+ method_name, __LINE__, strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCKRECONN_1, SQ_LOG_CRIT, buf );
+ }
+ }
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - socks_[%d]=%d, "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , indexToPnid_[i]
+ , ErrorMsg(stats[indexToPnid_[i]].MPI_ERROR) );
+ }
+ }
+ trace_printf( "%s@%d - Returning err=%d\n"
+ , method_name, __LINE__, err );
+ }
+
+ TRACE_EXIT;
+ return( err );
+}
+
+int CCluster::AcceptSockPeer( CNode *node, int peer )
+{
+ const char method_name[] = "CCluster::AcceptSockPeer";
+ TRACE_ENTRY;
+
+ int rc = MPI_SUCCESS;
+ int reconnectSock = -1;
+ unsigned char srcaddr[4];
+ struct hostent *he;
+
+ // Get my host structure via my node name
+ he = gethostbyname( MyNode->GetName() );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] gethostbyname(%s) error: %s\n"
+ , method_name, __LINE__
+ , MyNode->GetName()
+ , strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_1, SQ_LOG_CRIT, buf );
+ abort();
+ }
+ else
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d Accepting server socket: from %s(%d), src=%d.%d.%d.%d, port=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , (int)((unsigned char *)srcaddr)[0]
+ , (int)((unsigned char *)srcaddr)[1]
+ , (int)((unsigned char *)srcaddr)[2]
+ , (int)((unsigned char *)srcaddr)[3]
+ , MyNode->GetSyncSocketPort() );
+ }
+
+ // Accept connection from peer
+ reconnectSock = AcceptSock( syncSock_ );
+ if (reconnectSock != -1)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d Server %s(%d) accepted from client %s(%d), old socks_[%d]=%d, new socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , MyNode->GetName(), MyPNID
+ , node->GetName(), node->GetPNid()
+ , peer, socks_[peer]
+ , peer, reconnectSock);
+ }
+ }
+ else
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] AcceptSock(%d) failed!\n",
+ method_name, __LINE__, syncSock_ );
+ mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_2, SQ_LOG_ERR, buf );
+ rc = -1;
+ }
+
+ if (socks_[peer] != -1)
+ {
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[peer];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[peer], &event );
+ }
+ if (reconnectSock != -1)
+ {
+ socks_[peer] = reconnectSock;
+ }
+ }
+
+ TRACE_EXIT;
+ return rc;
+}
+
+int CCluster::ConnectSockPeer( CNode *node, int peer )
+{
+ const char method_name[] = "CCluster::ConnectSockPeer";
+ TRACE_ENTRY;
+
+ int rc = MPI_SUCCESS;
+ int reconnectSock = -1;
+ unsigned char srcaddr[4], dstaddr[4];
+ struct hostent *he;
+
+ // Get my host structure via my node name
+ he = gethostbyname( MyNode->GetName() );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] gethostbyname(%s) error: %s\n"
+ , method_name, __LINE__
+ , MyNode->GetName()
+ , strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_1, SQ_LOG_CRIT, buf );
+ abort();
+ }
+ else
+ {
+ // Initialize my source address structure
+ memcpy( srcaddr, he->h_addr, 4 );
+ // Get peer's host structure via its node name
+ he = gethostbyname( node->GetName() );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf),
+ "[%s@%d] gethostbyname(%s) error: %s\n",
+ method_name, __LINE__, node->GetName(),
+ strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_2, SQ_LOG_CRIT, buf );
+ abort();
+ }
+ // Initialize peer's destination address structure
+ memcpy( dstaddr, he->h_addr, 4 );
+
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d Creating client socket: src=%d.%d.%d.%d, "
+ "dst(%s)=%d.%d.%d.%d, dst port=%d\n"
+ , method_name, __LINE__
+ , (int)((unsigned char *)srcaddr)[0]
+ , (int)((unsigned char *)srcaddr)[1]
+ , (int)((unsigned char *)srcaddr)[2]
+ , (int)((unsigned char *)srcaddr)[3]
+ , node->GetName()
+ , (int)((unsigned char *)dstaddr)[0]
+ , (int)((unsigned char *)dstaddr)[1]
+ , (int)((unsigned char *)dstaddr)[2]
+ , (int)((unsigned char *)dstaddr)[3]
+ , sockPorts_[peer] );
+ }
+ // Connect to peer
+ reconnectSock = MkCltSock( srcaddr, dstaddr, sockPorts_[peer] );
+ if (reconnectSock != -1)
+ {
+ if (trace_settings & TRACE_RECOVERY)
+ {
+ trace_printf( "%s@%d Client %s(%d) connected to server %s(%d), old socks_[%d]=%d, new socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , MyNode->GetName(), MyPNID
+ , node->GetName(), node->GetPNid()
+ , peer, socks_[peer]
+ , peer, reconnectSock);
+ }
+ }
+ else
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] MkCltSock() src=%d.%d.%d.%d, "
+ "dst(%s)=%d.%d.%d.%d failed!\n"
+ , method_name, __LINE__
+ , (int)((unsigned char *)srcaddr)[0]
+ , (int)((unsigned char *)srcaddr)[1]
+ , (int)((unsigned char *)srcaddr)[2]
+ , (int)((unsigned char *)srcaddr)[3]
+ , node->GetName()
+ , (int)((unsigned char *)dstaddr)[0]
+ , (int)((unsigned char *)dstaddr)[1]
+ , (int)((unsigned char *)dstaddr)[2]
+ , (int)((unsigned char *)dstaddr)[3] );
+ mon_log_write( MON_CLUSTER_CONNECTSOCKPEER_3, SQ_LOG_ERR, buf );
+ rc = -1;
+ }
+
+ if (socks_[peer] != -1)
+ {
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[peer];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[peer], &event );
+ }
+ if (reconnectSock != -1)
+ {
+ socks_[peer] = reconnectSock;
+ }
+ }
+
+ TRACE_EXIT;
+ return( rc );
+}
+
+// When we get a communication error for a point-to-point monitor communicator
+// verify that the other nodes in the cluster also lost communications
+// with that monitor. If all nodes concur we consider that monitor
+// down.
+void CCluster::ValidateClusterState( cluster_state_def_t nodestate[],
+ bool haveDivergence)
+{
+ const char method_name[] = "CCluster::ValidateClusterState";
+
+ exitedMons_t::iterator it;
+ upNodes_t nodeMask;
+
+ for ( int i =0; i < MAX_NODE_MASKS ; i++ )
+ {
+ nodeMask.upNodes[i] = 0;
+ }
+
+ for ( it = exitedMons_.begin(); it != exitedMons_.end(); )
+ {
+ if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT))
+ {
+ trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld"
+ " (current seqNum_=%lld)\n", method_name, __LINE__,
+ it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_);
+ }
+
+ if ( seqNum_ >= (it->seqNum + 2) )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, "
+ "detected by node %d at seq #%lld "
+ "(current seq # is %lld).\n",
+ method_name, it->exitedPnid, it->detectingPnid,
+ it->seqNum, seqNum_);
+ mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf);
+
+ int concurringNodes = 0;
+
+ // Check if all active nodes see the node as down.
+ nodeMask.upNodes[it->exitedPnid/MAX_NODE_BITMASK] = 1ull << (it->exitedPnid%MAX_NODE_BITMASK);
+ string setSeesUp;
+ string setSeesDown;
+ char nodeX[10];
// Evaluate each active (up) node in the cluster
int pnodesCount = 0;
@@ -5187,8 +6110,10 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
const char method_name[] = "CCluster::ValidateSeqNum";
unsigned long long seqNum;
- unsigned long long seqNumBucket[256];
- int seqNumCount[256];
+ unsigned long long loSeqNum = seqNum_;
+ unsigned long long hiSeqNum = seqNum_;
+ unsigned long long seqNumBucket[MAX_NODES];
+ int seqNumCount[MAX_NODES];
int maxBucket = 0;
bool found;
int mostCountsIndex;
@@ -5198,10 +6123,26 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
// Count occurrences of sequence numbers from other nodes
for (int pnid = 0; pnid < GetConfigPNodesMax(); pnid++)
{
+ CNode *node= Nodes->GetNode( pnid );
+ if (!node) continue;
+ if (node->GetState() != State_Up) continue;
+
seqNum = nodestate[pnid].seq_num;
+ if (trace_settings & TRACE_SYNC)
+ {
+ trace_printf( "%s@%d seqNum_=%lld, nodestate[%d].seq_num=%lld\n"
+ , method_name, __LINE__
+ , seqNum_
+ , pnid
+ , nodestate[pnid].seq_num );
+ }
+
if (seqNum != 0)
{
+ loSeqNum = (seqNum < loSeqNum) ? seqNum : loSeqNum;
+ hiSeqNum = (seqNum > hiSeqNum) ? seqNum : hiSeqNum;
+
found = false;
for (int i=0; i<maxBucket; ++i)
{
@@ -5239,14 +6180,22 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
}
}
+ lowSeqNum_ = loSeqNum;
+ highSeqNum_ = hiSeqNum;
+
if (trace_settings & TRACE_SYNC)
{
if ( seqNum_ != seqNumBucket[mostCountsIndex] )
{
- trace_printf("%s@%d Most common seq num=%lld (%d nodes), %d buckets"
- ", local seq num (%lld) did not match.\n",
- method_name, __LINE__, seqNumBucket[mostCountsIndex],
- seqNumCount[mostCountsIndex], maxBucket, seqNum_);
+ trace_printf( "%s@%d Most common seq num=%lld (%d nodes), "
+ "%d buckets, low=%lld, high=%lld, local seq num (%lld) did not match.\n"
+ , method_name, __LINE__
+ , seqNumBucket[mostCountsIndex]
+ , seqNumCount[mostCountsIndex]
+ , maxBucket
+ , lowSeqNum_
+ , highSeqNum_
+ , seqNum_ );
}
}
@@ -5268,7 +6217,7 @@ void CCluster::HandleDownNode( int pnid )
trace_printf("%s@%d - Added down node to list, pnid=%d, name=(%s)\n", method_name, __LINE__, downNode->GetPNid(), downNode->GetName());
// assign new TmLeader if TMLeader node is dead.
- AssignTmLeader(pnid);
+ AssignTmLeader(pnid, false);
// Build available list of spare nodes
CNode *spareNode;
@@ -5373,6 +6322,7 @@ void CCluster::UpdateClusterState( bool &doShutdown,
TRACE_ENTRY;
struct sync_buffer_def *recvBuf;
+ struct sync_buffer_def *sendBuf = Nodes->GetSyncBuffer();
STATE node_state;
int change_nid;
cluster_state_def_t nodestate[GetConfigPNodesMax()];
@@ -5430,13 +6380,14 @@ void CCluster::UpdateClusterState( bool &doShutdown,
recvBuf = (struct sync_buffer_def *)
(((char *) syncBuf) + index * CommBufSize);
- if (trace_settings & TRACE_SYNC_DETAIL)
+ if (trace_settings & TRACE_SYNC)
{
int nr;
MPI_Get_count(&status[index], MPI_CHAR, &nr);
trace_printf("%s@%d - Received %d bytes from node %d, "
- "message count=%d\n",
+ ", seq_num=%lld, message count=%d\n",
method_name, __LINE__, nr, index,
+ recvBuf->nodeInfo.seq_num,
recvBuf->msgInfo.msg_count);
}
@@ -5532,23 +6483,23 @@ void CCluster::UpdateClusterState( bool &doShutdown,
Node[index]->SetState( State_Down );
--CurNodes;
// Clear bit in set of "up nodes"
- upNodes_.upNodes[index/MAX_NODE_BITMASK] &=
- ~(1ull << (index%MAX_NODE_BITMASK));
+ upNodes_.upNodes[index/MAX_NODE_BITMASK] &= ~(1ull << (index%MAX_NODE_BITMASK));
}
}
- if ( checkSeqNum_ && !ValidateSeqNum( nodestate ) && !enqueuedDown_ )
+ if ( (checkSeqNum_ || reconnectSeqNum_ != 0)
+ && !ValidateSeqNum( nodestate )
+ && !enqueuedDown_ )
{
- char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "[%s] Sync cycle sequence number (%lld) "
- "incorrect. Scheduling node down.\n", method_name, seqNum_);
- mon_log_write(MON_CLU
<TRUNCATED>
[4/4] incubator-trafodion git commit: Merge [TRAFODION-2651] PR-1233
Fixed monitor-to-monitor communication by adding
Posted by su...@apache.org.
Merge [TRAFODION-2651] PR-1233 Fixed monitor-to-monitor communication by adding
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/aeb9ef22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/aeb9ef22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/aeb9ef22
Branch: refs/heads/master
Commit: aeb9ef223e7c1d8f6341a2441abf09dd28f6d466
Parents: e2789f9 5e09a1e
Author: Suresh Subbiah <su...@apache.org>
Authored: Tue Sep 19 02:54:10 2017 +0000
Committer: Suresh Subbiah <su...@apache.org>
Committed: Tue Sep 19 02:54:10 2017 +0000
----------------------------------------------------------------------
.../export/include/common/evl_sqlog_eventnum.h | 28 +-
core/sqf/monitor/linux/cluster.cxx | 1851 +++++++++++++++---
core/sqf/monitor/linux/cluster.h | 38 +-
core/sqf/monitor/linux/commaccept.cxx | 126 +-
core/sqf/monitor/linux/internal.h | 3 +
core/sqf/monitor/linux/msgdef.h | 2 +
core/sqf/monitor/linux/pnode.cxx | 20 +-
core/sqf/monitor/linux/pnode.h | 3 +
core/sqf/monitor/linux/reqtmleader.cxx | 19 +
core/sqf/monitor/linux/shell.cxx | 26 +-
core/sqf/monitor/linux/tmsync.cxx | 14 +-
core/sqf/monitor/linux/tmsync.h | 2 +-
core/sqf/monitor/linux/zclient.cxx | 8 +
core/sqf/sqenvcom.sh | 11 +-
14 files changed, 1805 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
[3/4] incubator-trafodion git commit: [TRAFODION-2651] Fixed
monitor-to-monitor communication by adding reconnect logic to better handle
various communication glitches which previously resulted in false node down
situations.
Posted by su...@apache.org.
[TRAFODION-2651] Fixed monitor-to-monitor communication by adding reconnect
logic to better handle various communication glitches which previously
resulted in false node down situations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/5e09a1e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/5e09a1e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/5e09a1e3
Branch: refs/heads/master
Commit: 5e09a1e3a41d81394a0beee7eb67f7475a344daf
Parents: cd54195
Author: Zalo Correa <za...@esgyn.com>
Authored: Fri Sep 15 15:39:06 2017 -0700
Committer: Zalo Correa <za...@esgyn.com>
Committed: Fri Sep 15 15:39:06 2017 -0700
----------------------------------------------------------------------
.../export/include/common/evl_sqlog_eventnum.h | 28 +-
core/sqf/monitor/linux/cluster.cxx | 1851 +++++++++++++++---
core/sqf/monitor/linux/cluster.h | 38 +-
core/sqf/monitor/linux/commaccept.cxx | 126 +-
core/sqf/monitor/linux/internal.h | 3 +
core/sqf/monitor/linux/msgdef.h | 2 +
core/sqf/monitor/linux/pnode.cxx | 20 +-
core/sqf/monitor/linux/pnode.h | 3 +
core/sqf/monitor/linux/reqtmleader.cxx | 19 +
core/sqf/monitor/linux/shell.cxx | 26 +-
core/sqf/monitor/linux/tmsync.cxx | 14 +-
core/sqf/monitor/linux/tmsync.h | 2 +-
core/sqf/monitor/linux/zclient.cxx | 8 +
core/sqf/sqenvcom.sh | 11 +-
14 files changed, 1805 insertions(+), 346 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/5e09a1e3/core/sqf/export/include/common/evl_sqlog_eventnum.h
----------------------------------------------------------------------
diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h
index dbf8d92..c8b4d59 100644
--- a/core/sqf/export/include/common/evl_sqlog_eventnum.h
+++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h
@@ -143,6 +143,7 @@
#define MON_CLUSTER_ALLGATHERSOCK_5 101013305
#define MON_CLUSTER_ALLGATHERSOCK_6 101013306
#define MON_CLUSTER_ALLGATHERSOCK_7 101013307
+#define MON_CLUSTER_ALLGATHERSOCK_8 101013308
#define MON_CLUSTER_EPOLLCTL_1 101013401
#define MON_CLUSTER_INITCLUSTERSOCKS_1 101013501
#define MON_CLUSTER_INITCLUSTERSOCKS_2 101013502
@@ -158,6 +159,10 @@
#define MON_CLUSTER_MKSRVSOCK_2 101013702
#define MON_CLUSTER_MKSRVSOCK_3 101013703
#define MON_CLUSTER_MKSRVSOCK_4 101013704
+#define MON_CLUSTER_MKSRVSOCK_5 101013705
+#define MON_CLUSTER_MKSRVSOCK_6 101013706
+#define MON_CLUSTER_MKSRVSOCK_7 101013707
+#define MON_CLUSTER_MKSRVSOCK_8 101013708
#define MON_CLUSTER_MKCLTSOCK_1 101013801
#define MON_CLUSTER_MKCLTSOCK_2 101013802
#define MON_CLUSTER_MKCLTSOCK_3 101013803
@@ -167,6 +172,9 @@
#define MON_CLUSTER_MKCLTSOCK_7 101013807
#define MON_CLUSTER_MKCLTSOCK_8 101013808
#define MON_CLUSTER_MKCLTSOCK_9 101013809
+#define MON_CLUSTER_MKCLTSOCK_10 101013810
+#define MON_CLUSTER_MKCLTSOCK_11 101013811
+#define MON_CLUSTER_MKCLTSOCK_12 101013812
#define MON_CLUSTER_CONNECT_1 101013901
#define MON_CLUSTER_CONNECT_2 101013902
#define MON_CLUSTER_CONNECT_3 101013903
@@ -190,8 +198,23 @@
#define MON_CLUSTER_SETKEEPALIVESOCKOPT_2 101014502
#define MON_CLUSTER_SETKEEPALIVESOCKOPT_3 101014503
#define MON_CLUSTER_SETKEEPALIVESOCKOPT_4 101014504
-#define MON_CLUSTER_HARDNODEUP_1 101014601
-#define MON_CLUSTER_NO_LICENSE_VERIFIERS 101015001
+#define MON_CLUSTER_NO_LICENSE_VERIFIERS 101014601
+
+#define MON_CLUSTER_ALLGATHERSOCKRECONN_1 101014701
+
+#define MON_CLUSTER_HARDNODEUP_1 101014801
+
+#define MON_CLUSTER_ACCEPTSOCKPEER_1 101014901
+#define MON_CLUSTER_ACCEPTSOCKPEER_2 101014902
+
+#define MON_CLUSTER_CONNECTSOCKPEER_1 101015001
+#define MON_CLUSTER_CONNECTSOCKPEER_2 101015002
+#define MON_CLUSTER_CONNECTSOCKPEER_3 101015003
+
+#define MON_CLUSTER_EPOLLCTLDELETE_1 101015101
+
+#define MON_PINGSOCKPEER_1 101015201
+#define MON_PINGSOCKPEER_2 101015202
/* Module: monitor.cxx = 02 */
@@ -800,6 +823,7 @@
#define MON_COMMACCEPT_16 101320116
#define MON_COMMACCEPT_17 101320117
#define MON_COMMACCEPT_18 101320118
+#define MON_COMMACCEPT_19 101320119
/* Module: reqnodedown.cxx = 33 */
#define MON_EXT_NODEDOWN_REQ 101330101