You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by su...@apache.org on 2018/01/12 19:35:59 UTC
[2/3] trafodion git commit: [TRAFODION-2881] HA fixes Fixed multiple
problems in monitor Allgather() socket reconnect logic. - Separated node down
detection logic from communication errors and timeouts to better handle
multiple failure scenarios - Bett
[TRAFODION-2881] HA fixes
Fixed multiple problems in monitor Allgather() socket reconnect logic.
- Separated node down detection logic from communication errors and timeouts
to better handle multiple failure scenarios
- Better handling network resets
- Additional trace information
- Fixed 'node up' hang in monitor shell due to TmSync race condition
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/e832d827
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/e832d827
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/e832d827
Branch: refs/heads/master
Commit: e832d827507521998567d4cc5d92e4239007d19a
Parents: db319b1
Author: Zalo Correa <za...@esgyn.com>
Authored: Thu Jan 11 09:32:11 2018 -0800
Committer: Zalo Correa <za...@esgyn.com>
Committed: Thu Jan 11 09:32:11 2018 -0800
----------------------------------------------------------------------
.../export/include/common/evl_sqlog_eventnum.h | 26 +
core/sqf/monitor/linux/cluster.cxx | 967 ++++++++++---------
core/sqf/monitor/linux/cluster.h | 6 +-
core/sqf/monitor/linux/cmsh.cxx | 146 ++-
core/sqf/monitor/linux/cmsh.h | 2 +
core/sqf/monitor/linux/commaccept.cxx | 37 +-
core/sqf/monitor/linux/commaccept.h | 3 +-
core/sqf/monitor/linux/internal.h | 2 +-
core/sqf/monitor/linux/macros.gmk | 4 +-
core/sqf/monitor/linux/montest_run.virtual | 10 +-
core/sqf/monitor/linux/pnode.cxx | 8 +-
core/sqf/monitor/linux/pnodeconfig.cxx | 33 +-
core/sqf/monitor/linux/pnodeconfig.h | 2 +
core/sqf/monitor/linux/process.cxx | 161 +--
core/sqf/monitor/linux/redirector.cxx | 11 +-
core/sqf/monitor/linux/redirector.h | 2 +-
core/sqf/monitor/linux/reqexit.cxx | 7 +-
core/sqf/monitor/linux/reqnewproc.cxx | 30 +-
core/sqf/monitor/linux/reqopen.cxx | 8 +-
core/sqf/monitor/linux/reqqueue.cxx | 5 +
core/sqf/monitor/linux/shell.cxx | 101 +-
core/sqf/monitor/linux/tcdbsqlite.cxx | 8 -
core/sqf/monitor/linux/tmsync.cxx | 46 +-
core/sqf/monitor/linux/zclient.cxx | 6 +-
core/sqf/sql/scripts/sqnodestatus | 23 +-
core/sqf/src/seabed/src/msmon.cpp | 2 +-
26 files changed, 1010 insertions(+), 646 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/export/include/common/evl_sqlog_eventnum.h
----------------------------------------------------------------------
diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h
index c8b4d59..96c3df9 100644
--- a/core/sqf/export/include/common/evl_sqlog_eventnum.h
+++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h
@@ -54,6 +54,7 @@
#define MON_CLUSTER_CLUSTER_1 101010401
#define MON_CLUSTER_CLUSTER_2 101010402
#define MON_CLUSTER_UCLUSTER 101010501
+
#define MON_CLUSTER_HANDLEOTHERNODE_1 101010601
#define MON_CLUSTER_HANDLEOTHERNODE_2 101010602
#define MON_CLUSTER_HANDLEOTHERNODE_3 101010603
@@ -65,17 +66,21 @@
#define MON_CLUSTER_HANDLEOTHERNODE_9 101010609
#define MON_CLUSTER_HANDLEOTHERNODE_10 101010610
#define MON_CLUSTER_HANDLEOTHERNODE_11 101010611
+
#define MON_CLUSTER_HANDLEMYNODE_1 101010701
#define MON_CLUSTER_HANDLEMYNODE_2 101010702
#define MON_CLUSTER_HANDLEMYNODE_3 101010703
#define MON_CLUSTER_HANDLEMYNODE_4 101010704
#define MON_CLUSTER_HANDLEMYNODE_5 101010705
#define MON_CLUSTER_HANDLEMYNODE_6 101010706
+
#define MON_CLUSTER_CHECKWHODIED_1 101010801
#define MON_CLUSTER_CHECKWHODIED_2 101010802
+
#define MON_CLUSTER_REGROUP_1 101010901
#define MON_CLUSTER_REGROUP_2 101010902
#define MON_CLUSTER_INITCLUSTER 101011001
+
#define MON_CLUSTER_MARKDOWN_1 101011101
#define MON_CLUSTER_MARKDOWN_2 101011102
#define MON_CLUSTER_MARKDOWN_3 101011103
@@ -84,15 +89,18 @@
#define MON_CLUSTER_FORCEDOWN_1 101011401
#define MON_CLUSTER_FORCEDOWN_2 101011402
#define MON_CLUSTER_CLUSTER_MANAGER 101011501
+
#define MON_CLUSTER_EXPEDITEDOWN_1 101011601
#define MON_CLUSTER_EXPEDITEDOWN_2 101011602
#define MON_CLUSTER_RESPONSIVE_1 101011701
#define MON_CLUSTER_RESPONSIVE_2 101011702
#define MON_CLUSTER_RESPONSIVE_3 101011703
+
#define MON_CLUSTER_CONNTONEWMON_1 101011801
#define MON_CLUSTER_CONNTONEWMON_2 101011802
#define MON_CLUSTER_CONNTONEWMON_10 101011810
#define MON_CLUSTER_CONNTONEWMON_11 101011811
+
#define MON_CLUSTER_MERGETONEWMON_1 101011901
#define MON_CLUSTER_MERGETONEWMON_2 101011902
#define MON_CLUSTER_MERGETONEWMON_3 101011903
@@ -100,6 +108,7 @@
#define MON_CLUSTER_MERGETONEWMON_11 101011911
#define MON_CLUSTER_MERGETONEWMON_12 101011912
#define MON_CLUSTER_MERGETONEWMON_13 101011913
+
#define MON_CLUSTER_REINTEGRATE_1 101012001
#define MON_CLUSTER_REINTEGRATE_2 101012002
#define MON_CLUSTER_REINTEGRATE_3 101012003
@@ -107,12 +116,15 @@
#define MON_CLUSTER_REINTEGRATE_11 101012011
#define MON_CLUSTER_REINTEGRATE_12 101012012
#define MON_CLUSTER_REINTEGRATE_13 101012013
+
#define MON_CLUSTER_REINITCLUSTER_1 101012101
#define MON_CLUSTER_REINITCLUSTER_2 101012102
#define MON_CLUSTER_REINITCLUSTER_3 101012103
#define MON_CLUSTER_REINITCLUSTER_4 101012104
#define MON_CLUSTER_REINITCLUSTER_5 101012105
+
#define MON_CLUSTER_CHECKWHOJOINED_1 101012201
+
#define MON_CLUSTER_UPDDATECLUSTER_1 101012301
#define MON_CLUSTER_UPDDATECLUSTER_2 101012302
#define MON_CLUSTER_UPDTCLUSTERSTATE_1 101012401
@@ -131,11 +143,14 @@
#define MON_CLUSTER_INITCONFIGCLUSTER_2 101013002
#define MON_CLUSTER_INITCONFIGCLUSTER_3 101013003
#define MON_CLUSTER_INITCONFIGCLUSTER_4 101013004
+
#define MON_CLUSTER_SETNEWCOMM_1 101013101
#define MON_CLUSTER_SETNEWCOMM_2 101013102
+
#define MON_CLUSTER_SETNEWSOCK_1 101013201
#define MON_CLUSTER_SETNEWSOCK_2 101013202
#define MON_CLUSTER_SETNEWSOCK_3 101013203
+
#define MON_CLUSTER_ALLGATHERSOCK_1 101013301
#define MON_CLUSTER_ALLGATHERSOCK_2 101013302
#define MON_CLUSTER_ALLGATHERSOCK_3 101013303
@@ -145,6 +160,7 @@
#define MON_CLUSTER_ALLGATHERSOCK_7 101013307
#define MON_CLUSTER_ALLGATHERSOCK_8 101013308
#define MON_CLUSTER_EPOLLCTL_1 101013401
+
#define MON_CLUSTER_INITCLUSTERSOCKS_1 101013501
#define MON_CLUSTER_INITCLUSTERSOCKS_2 101013502
#define MON_CLUSTER_INITCLUSTERSOCKS_3 101013503
@@ -155,6 +171,7 @@
#define MON_CLUSTER_INITCLUSTERSOCKS_8 101013508
#define MON_CLUSTER_INITACCEPTSOCK_1 101013601
#define MON_CLUSTER_INITACCEPTSOCK_2 101013602
+
#define MON_CLUSTER_MKSRVSOCK_1 101013701
#define MON_CLUSTER_MKSRVSOCK_2 101013702
#define MON_CLUSTER_MKSRVSOCK_3 101013703
@@ -163,6 +180,7 @@
#define MON_CLUSTER_MKSRVSOCK_6 101013706
#define MON_CLUSTER_MKSRVSOCK_7 101013707
#define MON_CLUSTER_MKSRVSOCK_8 101013708
+
#define MON_CLUSTER_MKCLTSOCK_1 101013801
#define MON_CLUSTER_MKCLTSOCK_2 101013802
#define MON_CLUSTER_MKCLTSOCK_3 101013803
@@ -175,29 +193,37 @@
#define MON_CLUSTER_MKCLTSOCK_10 101013810
#define MON_CLUSTER_MKCLTSOCK_11 101013811
#define MON_CLUSTER_MKCLTSOCK_12 101013812
+
#define MON_CLUSTER_CONNECT_1 101013901
#define MON_CLUSTER_CONNECT_2 101013902
#define MON_CLUSTER_CONNECT_3 101013903
#define MON_CLUSTER_CONNECT_4 101013904
#define MON_CLUSTER_CONNECT_5 101013905
+
#define MON_CLUSTER_CONNECTTOSELF_1 101014001
#define MON_CLUSTER_CONNECTTOSELF_2 101014002
#define MON_CLUSTER_CONNECTTOSELF_3 101014003
+
#define MON_CLUSTER_ACCEPTSOCK_1 101014101
#define MON_CLUSTER_ACCEPTSOCK_2 101014102
#define MON_CLUSTER_ACCEPTSOCK_3 101014103
+
#define MON_CLUSTER_INITSERVERSOCK_1 101014201
#define MON_CLUSTER_INITSERVERSOCK_2 101014202
#define MON_CLUSTER_INITSERVERSOCK_3 101014203
#define MON_CLUSTER_INITSERVERSOCK_4 101014204
+
#define MON_CLUSTER_SOFTNODEDOWN_1 101014301
#define MON_CLUSTER_SOFTNODEDOWN_2 101014302
#define MON_CLUSTER_SOFTNODEDOWN_3 101014303
+
#define MON_CLUSTER_SOFTNODEUP_1 101014401
+
#define MON_CLUSTER_SETKEEPALIVESOCKOPT_1 101014501
#define MON_CLUSTER_SETKEEPALIVESOCKOPT_2 101014502
#define MON_CLUSTER_SETKEEPALIVESOCKOPT_3 101014503
#define MON_CLUSTER_SETKEEPALIVESOCKOPT_4 101014504
+
#define MON_CLUSTER_NO_LICENSE_VERIFIERS 101014601
#define MON_CLUSTER_ALLGATHERSOCKRECONN_1 101014701
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/cluster.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx
index 66f515c..c22e8ea 100644
--- a/core/sqf/monitor/linux/cluster.cxx
+++ b/core/sqf/monitor/linux/cluster.cxx
@@ -347,10 +347,6 @@ void CCluster::NodeReady( CNode *spareNode )
}
spareNode->SetActivatingSpare( false );
- if ( MyNode->IsCreator() )
- {
- MyNode->SetCreator( false, -1, -1 );
- }
ResetIntegratingPNid();
TRACE_EXIT;
@@ -848,10 +844,6 @@ void CCluster::HardNodeDown (int pnid, bool communicate_state)
{
if ( node->GetPNid() == integratingPNid_ )
{
- if ( MyNode->IsCreator() )
- {
- MyNode->SetCreator( false, -1, -1 );
- }
ResetIntegratingPNid();
}
node->KillAllDown();
@@ -1470,10 +1462,6 @@ int CCluster::HardNodeUp( int pnid, char *node_name )
}
}
- if ( MyNode->IsCreator() )
- {
- MyNode->SetCreator( false, -1, -1 );
- }
ResetIntegratingPNid();
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
@@ -3394,6 +3382,18 @@ bool CCluster::PingSockPeer(CNode *node)
pingSock = Monitor->Connect( node->GetCommPort() );
if ( pingSock < 0 )
{
+ if (node->GetState() != State_Up)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , node->GetPNid(), socks_[node->GetPNid()] );
+ }
+ break;
+ }
sleep( MAX_RECONN_PING_WAIT_TIMEOUT );
}
else
@@ -3828,12 +3828,13 @@ void CCluster::ReIntegrateSock( int initProblem )
myNodeInfo.creator = true;
myNodeInfo.creatorShellPid = CreatorShellPid;
myNodeInfo.creatorShellVerifier = CreatorShellVerifier;
+ myNodeInfo.ping = false;
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - Connected to creator monitor, sending my info, "
"node %d (%s), commPort=%s, syncPort=%s, creator=%d, "
- "creatorShellPid=%d:%d\n"
+ "creatorShellPid=%d:%d, ping=%d\n"
, method_name, __LINE__
, myNodeInfo.pnid
, myNodeInfo.nodeName
@@ -3841,7 +3842,8 @@ void CCluster::ReIntegrateSock( int initProblem )
, myNodeInfo.syncPort
, myNodeInfo.creator
, myNodeInfo.creatorShellPid
- , myNodeInfo.creatorShellVerifier );
+ , myNodeInfo.creatorShellVerifier
+ , myNodeInfo.ping );
}
rc = Monitor->SendSock( (char *) &myNodeInfo
@@ -3907,6 +3909,7 @@ void CCluster::ReIntegrateSock( int initProblem )
myNodeInfo.creator = false;
myNodeInfo.creatorShellPid = -1;
myNodeInfo.creatorShellVerifier = -1;
+ myNodeInfo.ping = false;
for (int i=0; i<pnodeCount; i++)
{
if ( nodeInfo[i].creatorPNid != -1 &&
@@ -4237,9 +4240,36 @@ void CCluster::ResetIntegratingPNid( void )
abort();
}
+ if ( MyNode->IsCreator() )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Resetting creator pnid=%d\n",
+ method_name, __LINE__, MyPNID );
+ }
+
+ MyNode->SetCreator( false, -1, -1 );
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Resetting integratingPNid_=%d\n",
+ method_name, __LINE__, integratingPNid_ );
+ }
+
integratingPNid_ = -1;
- // Indicate to the commAcceptor thread to begin accepting connections
- CommAccept.setAccepting( true );
+
+ if (!CommAccept.isAccepting())
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Triggering commAcceptor thread to begin accepting connections\n",
+ method_name, __LINE__ );
+ }
+
+ // Indicate to the commAcceptor thread to begin accepting connections
+ CommAccept.startAccepting();
+ }
TRACE_EXIT;
}
@@ -4251,7 +4281,7 @@ void CCluster::SetIntegratingPNid( int pnid )
integratingPNid_ = pnid;
// Indicate to the commAcceptor thread to stop accepting connections
- CommAccept.setAccepting( false );
+ CommAccept.stopAccepting();
TRACE_EXIT;
}
@@ -4643,7 +4673,6 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
peer_t p[GetConfigPNodesMax()];
memset( p, 0, sizeof(p) );
tag = 0; // make compiler happy
- struct timespec currentTime;
// Set to twice the ZClient session timeout
static int sessionTimeout = ZClientEnabled
? (ZClient->GetSessionTimeout() * 2) : 120;
@@ -4740,10 +4769,14 @@ int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_St
while ( 1 )
{
reconnected:
+ bool checkConnections = false;
+ bool doReconnect = false;
+ bool resetConnections = false;
+ int peerTimedoutCount = 0;
int maxEvents = 2*GetConfigPNodesCount() - nsent - nrecv;
if ( maxEvents == 0 ) break;
int nw;
- int zerr = ZOK;
+ peer_t *peer;
while ( 1 )
{
@@ -4754,321 +4787,180 @@ reconnected:
if ( nw == 0 )
{ // Timeout, no fd's ready
for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
+ { // Check no IO completion on peers
+ peer = &p[indexToPnid_[iPeer]];
+ if ( (peer->p_receiving) || (peer->p_sending) )
+ {
+ peerTimedoutCount++;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - EPOLL timeout (%d) on: %s(%d), "
+ "socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , peerTimedoutCount
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , indexToPnid_[iPeer]
+ , socks_[indexToPnid_[iPeer]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
+
+ if (peer->p_initial_check && !reconnecting)
+ { // Set the session timeout relative to now
+ peer->p_initial_check = false;
+ clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
+ peer->znodeFailedTime.tv_sec += sessionTimeout;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n"
+ , method_name, __LINE__
+ , peer->znodeFailedTime.tv_sec);
+ }
+ }
+
+ if ( IsRealCluster && peer->p_timeout_count < sv_epoll_retry_count )
+ {
+ peer->p_timeout_count++;
+ checkConnections = true;
+ if (peer->p_timeout_count == sv_epoll_retry_count)
+ {
+ resetConnections = true;
+ }
+ }
+ else
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Peer timed out: %s(%d), "
+ "socks_[%d]=%d, "
+ "peer->p_timeout_count=%d\n"
+ , method_name, __LINE__
+ , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
+ , indexToPnid_[iPeer]
+ , socks_[indexToPnid_[iPeer]]
+ , peer->p_timeout_count );
+ }
+ }
+ }
+ } // Check no IO completion on peers
+
+ if (checkConnections)
{
- peer_t *peer = &p[indexToPnid_[iPeer]];
- if ( (indexToPnid_[iPeer] != MyPNID) && (socks_[indexToPnid_[iPeer]] != -1) )
+ checkConnections = false;
+ if (trace_settings & TRACE_RECOVERY)
{
- if ( (peer->p_receiving) || (peer->p_sending) )
- {
- if ( ! ZClientEnabled )
+ trace_printf( "%s@%d - Initianing AllgatherSockReconnect(),"
+ " peerTimedoutCount=%d\n"
+ , method_name, __LINE__
+ , peerTimedoutCount );
+ }
+ // First, check ability to connect to all peers
+ // An err returned will mean that connect failed with
+ // at least one peer. No err implies that possible network
+ // reset occurred and there is probably one dead connection
+ // to a peer where no IOs will complete ever, so connections
+ // to all peers must be reestablished.
+ err = AllgatherSockReconnect( stats, false );
+ if (err == MPI_SUCCESS)
+ { // Connections to all peers are good
+ if (resetConnections)
+ { // Establish new connections on all peers
+ resetConnections = false;
+ err = AllgatherSockReconnect( stats, true );
+ // Redrive IOs on new peer connections
+ nsent = 0; nrecv = 0;
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
{
- if (peer->p_initial_check && !reconnecting)
- {
- peer->p_initial_check = false;
- clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
- peer->znodeFailedTime.tv_sec += sessionTimeout;
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n"
- , method_name, __LINE__
- , peer->znodeFailedTime.tv_sec);
- }
+ peer = &p[indexToPnid_[i]];
+ if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
+ { // peer is me or not available
+ peer->p_sending = peer->p_receiving = false;
+ nsent++;
+ nrecv++;
}
-
- if ( peer->p_timeout_count < sv_epoll_retry_count )
+ else
{
- peer->p_timeout_count++;
-
- if (IsRealCluster)
- {
- if (trace_settings & TRACE_RECOVERY)
- {
- trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
- " timeout count=%d,"
- " sending=%d,"
- " receiving=%d\n"
- , method_name, __LINE__
- , Node[indexToPnid_[iPeer]]->GetName(), iPeer
- , peer->p_timeout_count
- , peer->p_sending
- , peer->p_receiving);
- }
- // Attempt reconnect to all peers
- err = AllgatherSockReconnect( stats );
- // Redrive IOs on live peer connections
- nsent = 0; nrecv = 0;
- for ( int i = 0; i < GetConfigPNodesCount(); i++ )
- {
- peer_t *peer = &p[indexToPnid_[i]];
- if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
- {
- peer->p_sending = peer->p_receiving = false;
- nsent++;
- nrecv++;
- }
- else
- {
- peer->p_sending = peer->p_receiving = true;
- peer->p_sent = peer->p_received = 0;
- peer->p_timeout_count = 0;
- peer->p_n2recv = -1;
- peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
-
- struct epoll_event event;
- event.data.fd = socks_[indexToPnid_[i]];
- event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
- EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event );
- }
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d" " - socks_[%d]=%d, "
- "peer->p_sending=%d, "
- "peer->p_receiving=%d\n"
- , method_name, __LINE__
- , indexToPnid_[i]
- , socks_[indexToPnid_[i]]
- , peer->p_sending
- , peer->p_receiving );
- }
- }
- reconnectSeqNum_ = seqNum_;
- reconnecting = true;
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d" " - Reconnecting!\n"
- , method_name, __LINE__ );
- }
- goto reconnected;
- }
- continue;
+ peer->p_sending = peer->p_receiving = true;
+ peer->p_sent = peer->p_received = 0;
+ peer->p_n2recv = -1;
+ peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
+ struct epoll_event event;
+ event.data.fd = socks_[indexToPnid_[i]];
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event );
}
- if (trace_settings & TRACE_RECOVERY)
+ }
+ } // (resetConnections)
+ } // (err == MPI_SUCCESS)
+ else
+ {
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ peer = &p[indexToPnid_[i]];
+ if ( indexToPnid_[i] != MyPNID && socks_[indexToPnid_[i]] == -1 )
+ { // peer is me or no longer available
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY) &&
+ (peer->p_sending || peer->p_receiving) )
{
- trace_printf( "%s@%d - Peer timeout triggered: "
- "peer->p_timeout_count %d, "
- "sv_epoll_retry_count %d\n"
- " socks_[%d]=%d\n"
- " stats[%d].MPI_ERROR=%s\n"
+ trace_printf( "%s@%d No IO completion on %s(%d):socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
, method_name, __LINE__
- , peer->p_timeout_count
- , sv_epoll_retry_count
- , indexToPnid_[iPeer]
- , socks_[indexToPnid_[iPeer]]
- , iPeer
- , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
+ , Node[indexToPnid_[i]]->GetName(), indexToPnid_[i]
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
}
- }
- else
- {
- if (peer->p_initial_check && !reconnecting)
+ if (peer->p_sending)
{
- peer->p_initial_check = false;
- clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
- peer->znodeFailedTime.tv_sec += sessionTimeout;
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n"
- , method_name, __LINE__
- , peer->znodeFailedTime.tv_sec);
- }
-
+ nsent++;
+ peer->p_sending = false;
}
- // If not expired, stay in the loop
- if ( ! ZClient->IsZNodeExpired( Node[indexToPnid_[iPeer]]->GetName(), zerr ))
+ if (peer->p_receiving)
{
- if ( zerr == ZCONNECTIONLOSS || zerr == ZOPERATIONTIMEOUT )
- {
- // Ignore transient errors with the quorum.
- // However, if longer than the session
- // timeout, handle it as a hard error.
- clock_gettime(CLOCK_REALTIME, ¤tTime);
- if (currentTime.tv_sec < peer->znodeFailedTime.tv_sec)
- {
- // Failsafe
- peer->p_timeout_count++;
-
- if ( peer->p_timeout_count < sv_epoll_retry_count )
- {
- if (IsRealCluster)
- {
- if (trace_settings & TRACE_RECOVERY)
- {
- trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
- " timeout count=%d,"
- " sending=%d,"
- " receiving=%d\n"
- , method_name, __LINE__
- , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
- , peer->p_timeout_count
- , peer->p_sending
- , peer->p_receiving);
- }
- // Attempt reconnect to all peers
- err = AllgatherSockReconnect( stats );
- // Redrive IOs on live peer connections
- nsent = 0; nrecv = 0;
- for ( int i = 0; i < GetConfigPNodesCount(); i++ )
- {
- peer_t *peer = &p[indexToPnid_[i]];
- if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
- {
- peer->p_sending = peer->p_receiving = false;
- nsent++;
- nrecv++;
- }
- else
- {
- peer->p_sending = peer->p_receiving = true;
- peer->p_sent = peer->p_received = 0;
- peer->p_timeout_count = 0;
- peer->p_n2recv = -1;
- peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
-
- struct epoll_event event;
- event.data.fd = socks_[indexToPnid_[i]];
- event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
- EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
- }
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d" " - socks_[%d]=%d, "
- "peer->p_sending=%d, "
- "peer->p_receiving=%d\n"
- , method_name, __LINE__
- , indexToPnid_[i]
- , socks_[indexToPnid_[i]]
- , peer->p_sending
- , peer->p_receiving );
- }
- }
- reconnectSeqNum_ = seqNum_;
- reconnecting = true;
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d" " - Reconnecting!\n"
- , method_name, __LINE__ );
- }
- goto reconnected;
- }
- continue;
- }
- }
- if (trace_settings & TRACE_RECOVERY)
- {
- trace_printf( "%s@%d - Znode Failed triggered\n"
- " Current Time %ld(secs)\n"
- " Znode Fail Time %ld(secs)\n"
- , method_name, __LINE__
- , currentTime.tv_sec
- , peer->znodeFailedTime.tv_sec);
- }
- }
- else
- {
- // Failsafe
- peer->p_timeout_count++;
-
- if ( peer->p_timeout_count < sv_epoll_retry_count )
- {
- if (IsRealCluster)
- {
- if (trace_settings & TRACE_RECOVERY)
- {
- trace_printf( "%s@%d - Initianing AllgatherSockReconnect(), trigger: %s(%d),"
- " timeout count=%d,"
- " sending=%d,"
- " receiving=%d\n"
- , method_name, __LINE__
- , Node[indexToPnid_[iPeer]]->GetName(), indexToPnid_[iPeer]
- , peer->p_timeout_count
- , peer->p_sending
- , peer->p_receiving);
- }
- // Attempt reconnect to all peers
- err = AllgatherSockReconnect( stats );
- // Redrive IOs on live peer connections
- nsent = 0; nrecv = 0;
- for ( int i = 0; i < GetConfigPNodesCount(); i++ )
- {
- peer_t *peer = &p[indexToPnid_[i]];
- if ( indexToPnid_[i] == MyPNID || socks_[indexToPnid_[i]] == -1 )
- {
- peer->p_sending = peer->p_receiving = false;
- nsent++;
- nrecv++;
- }
- else
- {
- peer->p_sending = peer->p_receiving = true;
- peer->p_sent = peer->p_received = 0;
- peer->p_timeout_count = 0;
- peer->p_n2recv = -1;
- peer->p_buff = ((char *) rbuf) + (indexToPnid_[i] * CommBufSize);
-
- struct epoll_event event;
- event.data.fd = socks_[indexToPnid_[i]];
- event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
- EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[i], &event );
- }
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d" " - socks_[%d]=%d, "
- "peer->p_sending=%d, "
- "peer->p_receiving=%d\n"
- , method_name, __LINE__
- , indexToPnid_[i]
- , socks_[indexToPnid_[i]]
- , peer->p_sending
- , peer->p_receiving );
- }
- }
- reconnectSeqNum_ = seqNum_;
- reconnecting = true;
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d" " - Reconnecting!\n"
- , method_name, __LINE__ );
- }
- goto reconnected;
- }
- continue;
- }
- }
+ peer->p_receiving = false;
+ nrecv++;
}
}
+ }
+ }
+ doReconnect = true;
+ } // (checkConnections)
- if (trace_settings & TRACE_RECOVERY)
- {
- trace_printf( "%s@%d - err=%d, socks_[%d]=%d, stats[%d].MPI_ERROR=%s\n"
- , method_name, __LINE__
- , err
- , indexToPnid_[iPeer]
- , socks_[indexToPnid_[iPeer]]
- , indexToPnid_[iPeer]
- , ErrorMsg(stats[indexToPnid_[iPeer]].MPI_ERROR) );
- }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ for ( int i = 0; i < GetConfigPNodesCount(); i++ )
+ {
+ peer = &p[indexToPnid_[i]];
+ trace_printf( "%s@%d doReconnect=%d, %s(%d):socks_[%d]=%d, "
+ "peer->p_sending=%d, "
+ "peer->p_receiving=%d\n"
+ , method_name, __LINE__
+ , doReconnect
+ , Node[indexToPnid_[i]]->GetName(), indexToPnid_[i]
+ , indexToPnid_[i]
+ , socks_[indexToPnid_[i]]
+ , peer->p_sending
+ , peer->p_receiving );
+ }
+ }
- if ( err == MPI_ERR_IN_STATUS
- && stats[indexToPnid_[iPeer]].MPI_ERROR == MPI_ERR_EXITED)
- {
- // At this point, this peer is not responding and
- // reconnects failed or its znode expired
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s@%d] Not heard from peer=%d (node=%s) "
- "(current seq # is %lld)\n"
- , method_name
- , __LINE__
- , indexToPnid_[iPeer]
- , Node[indexToPnid_[iPeer]]->GetName()
- , seqNum_ );
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf );
- }
- }
+ if (doReconnect)
+ {
+ reconnectSeqNum_ = seqNum_;
+ reconnecting = true;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - Reconnecting! (reconnectSeqNum_=%lld)\n"
+ , method_name, __LINE__, reconnectSeqNum_ );
}
+ goto reconnected;
}
- }
-
+ } // ( nw == 0 )
+
if ( nw < 0 )
{ // Got an error
char ebuff[256];
@@ -5393,7 +5285,7 @@ early_exit:
return err;
}
-int CCluster::AllgatherSockReconnect( MPI_Status *stats )
+int CCluster::AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnections )
{
const char method_name[] = "CCluster::AllgatherSockReconnect";
TRACE_ENTRY;
@@ -5428,6 +5320,18 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats )
, node->GetName(), node->GetPNid()
, idst, socks_[idst] );
}
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
// Remove old socket from epoll set, it may not be there
struct epoll_event event;
event.data.fd = socks_[idst];
@@ -5437,22 +5341,28 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats )
}
continue;
}
- reconnectSock = ConnectSockPeer( node, idst );
- if (reconnectSock == -1)
+ if (PingSockPeer(node))
{
- stats[idst].MPI_ERROR = MPI_ERR_EXITED;
- stats[idst].count = 0;
- err = MPI_ERR_IN_STATUS;
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ reconnectSock = ConnectSockPeer( node, idst, reestablishConnections );
+ if (reconnectSock == -1)
{
- trace_printf( "%s@%d - Setting Node %s (%d) status to "
- "stats[%d].MPI_ERROR=%s\n"
- , method_name, __LINE__
- , node->GetName(), node->GetPNid()
- , idst
- , ErrorMsg(stats[idst].MPI_ERROR) );
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
}
- if (node->GetState() != State_Up)
+ }
+ else
+ {
+ if (socks_[idst] != -1)
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
@@ -5470,6 +5380,19 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats )
EpollCtlDelete( epollFD_, socks_[idst], &event );
socks_[idst] = -1;
}
+ reconnectSock = -1;
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
}
}
else if ( j == MyPNID )
@@ -5491,6 +5414,18 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats )
, node->GetName(), node->GetPNid()
, idst, socks_[idst] );
}
+ stats[idst].MPI_ERROR = MPI_ERR_EXITED;
+ stats[idst].count = 0;
+ err = MPI_ERR_IN_STATUS;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Setting Node %s (%d) status to "
+ "stats[%d].MPI_ERROR=%s\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , idst
+ , ErrorMsg(stats[idst].MPI_ERROR) );
+ }
// Remove old socket from epoll set, it may not be there
struct epoll_event event;
event.data.fd = socks_[idst];
@@ -5508,7 +5443,7 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats )
}
if (PingSockPeer(node))
{
- reconnectSock = AcceptSockPeer( node, idst );
+ reconnectSock = AcceptSockPeer( node, idst, reestablishConnections );
if (reconnectSock == -1)
{
stats[idst].MPI_ERROR = MPI_ERR_EXITED;
@@ -5523,24 +5458,6 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats )
, idst
, ErrorMsg(stats[idst].MPI_ERROR) );
}
- if (node->GetState() != State_Up)
- {
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Node %s (%d) is not up, "
- "removing old socket from epoll set, "
- "socks_[%d]=%d\n"
- , method_name, __LINE__
- , node->GetName(), node->GetPNid()
- , idst, socks_[idst] );
- }
- // Remove old socket from epoll set, it may not be there
- struct epoll_event event;
- event.data.fd = socks_[idst];
- event.events = 0;
- EpollCtlDelete( epollFD_, socks_[idst], &event );
- socks_[idst] = -1;
- }
}
}
else
@@ -5561,6 +5478,7 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats )
event.data.fd = socks_[idst];
event.events = 0;
EpollCtlDelete( epollFD_, socks_[idst], &event );
+ socks_[idst] = -1;
}
reconnectSock = -1;
stats[idst].MPI_ERROR = MPI_ERR_EXITED;
@@ -5618,14 +5536,13 @@ int CCluster::AllgatherSockReconnect( MPI_Status *stats )
return( err );
}
-int CCluster::AcceptSockPeer( CNode *node, int peer )
+int CCluster::AcceptSockPeer( CNode *node, int peer, bool reestablishConnections )
{
const char method_name[] = "CCluster::AcceptSockPeer";
TRACE_ENTRY;
int rc = MPI_SUCCESS;
int reconnectSock = -1;
- unsigned char srcaddr[4];
struct hostent *he;
// Get my host structure via my node name
@@ -5646,13 +5563,9 @@ int CCluster::AcceptSockPeer( CNode *node, int peer )
{
if (trace_settings & TRACE_RECOVERY)
{
- trace_printf( "%s@%d Accepting server socket: from %s(%d), src=%d.%d.%d.%d, port=%d\n"
+ trace_printf( "%s@%d Accepting server socket: from %s(%d), port=%d\n"
, method_name, __LINE__
, node->GetName(), node->GetPNid()
- , (int)((unsigned char *)srcaddr)[0]
- , (int)((unsigned char *)srcaddr)[1]
- , (int)((unsigned char *)srcaddr)[2]
- , (int)((unsigned char *)srcaddr)[3]
, MyNode->GetSyncSocketPort() );
}
@@ -5679,17 +5592,40 @@ int CCluster::AcceptSockPeer( CNode *node, int peer )
rc = -1;
}
- if (socks_[peer] != -1)
+ if (reestablishConnections)
{
- // Remove old socket from epoll set, it may not be there
- struct epoll_event event;
- event.data.fd = socks_[peer];
- event.events = 0;
- EpollCtlDelete( epollFD_, socks_[peer], &event );
+ if (socks_[peer] != -1)
+ {
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[peer];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[peer], &event );
+ if (node->GetState() != State_Up)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , peer, socks_[peer] );
+ }
+ socks_[peer] = -1;
+ }
+ }
+ if (reconnectSock != -1)
+ {
+ socks_[peer] = reconnectSock;
+ }
}
- if (reconnectSock != -1)
+ else
{
- socks_[peer] = reconnectSock;
+ if (reconnectSock != -1)
+ {
+ close( (int)reconnectSock );
+ }
}
}
@@ -5697,7 +5633,7 @@ int CCluster::AcceptSockPeer( CNode *node, int peer )
return rc;
}
-int CCluster::ConnectSockPeer( CNode *node, int peer )
+int CCluster::ConnectSockPeer( CNode *node, int peer, bool reestablishConnections )
{
const char method_name[] = "CCluster::ConnectSockPeer";
TRACE_ENTRY;
@@ -5791,17 +5727,40 @@ int CCluster::ConnectSockPeer( CNode *node, int peer )
rc = -1;
}
- if (socks_[peer] != -1)
+ if (reestablishConnections)
{
- // Remove old socket from epoll set, it may not be there
- struct epoll_event event;
- event.data.fd = socks_[peer];
- event.events = 0;
- EpollCtlDelete( epollFD_, socks_[peer], &event );
+ if (socks_[peer] != -1)
+ {
+ // Remove old socket from epoll set, it may not be there
+ struct epoll_event event;
+ event.data.fd = socks_[peer];
+ event.events = 0;
+ EpollCtlDelete( epollFD_, socks_[peer], &event );
+ if (node->GetState() != State_Up)
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Node %s (%d) is not up, "
+ "removing old socket from epoll set, "
+ "socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , peer, socks_[peer] );
+ }
+ socks_[peer] = -1;
+ }
+ }
+ if (reconnectSock != -1)
+ {
+ socks_[peer] = reconnectSock;
+ }
}
- if (reconnectSock != -1)
+ else
{
- socks_[peer] = reconnectSock;
+ if (reconnectSock != -1)
+ {
+ close( (int)reconnectSock );
+ }
}
}
@@ -6118,14 +6077,21 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
if ( GetConfigPNodesCount() == 1 ) return true;
- // Count occurrences of sequence numbers from other nodes
+ // Count occurrences of sequence numbers
for (int pnid = 0; pnid < GetConfigPNodesMax(); pnid++)
{
CNode *node= Nodes->GetNode( pnid );
if (!node) continue;
if (node->GetState() != State_Up) continue;
- seqNum = nodestate[pnid].seq_num;
+ if ( pnid == MyPNID )
+ {
+ seqNum = nodestate[pnid].seq_num = seqNum_;
+ }
+ else
+ {
+ seqNum = nodestate[pnid].seq_num;
+ }
if (trace_settings & TRACE_SYNC)
{
@@ -6183,7 +6149,7 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
if (trace_settings & TRACE_SYNC)
{
- if ( seqNum_ != seqNumBucket[mostCountsIndex] )
+ if ( lowSeqNum_ != highSeqNum_ )
{
trace_printf( "%s@%d Most common seq num=%lld (%d nodes), "
"%d buckets, low=%lld, high=%lld, local seq num (%lld) did not match.\n"
@@ -6197,8 +6163,8 @@ bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] )
}
}
- // Fail if my seqnum does not match majority
- return seqNum_ == seqNumBucket[mostCountsIndex];
+ // Fail if any sequence number does not match
+ return( lowSeqNum_ == highSeqNum_ );
}
void CCluster::HandleDownNode( int pnid )
@@ -6353,13 +6319,9 @@ void CCluster::UpdateClusterState( bool &doShutdown,
{
if (!noComm)
{
- trace_printf( "%s@%d - Communication error from node %d:\n"
- " node_state=%d\n"
- " change_nid=%d\n"
- " seq_num=#%lld\n"
+ trace_printf( "%s@%d - Communication error from node %d, "
+ " seq_num=#%lld\n"
, method_name, __LINE__, index
- , recvBuf->nodeInfo.node_state
- , recvBuf->nodeInfo.change_nid
, seqNum_ );
}
}
@@ -6803,7 +6765,18 @@ bool CCluster::ProcessClusterData( struct sync_buffer_def * syncBuf,
}
// if we have already processed buffer, skip it
- if (lastSeqNum_ == msgBuf->nodeInfo.seq_num) continue;
+ if (lastSeqNum_ >= msgBuf->nodeInfo.seq_num) continue;
+
+ if (trace_settings & TRACE_SYNC)
+ {
+ trace_printf("%s@%d - Processing buffer for node %d, swpRecCount_=%d, seq_num=%lld, "
+ "lastSeqNum_=%lld, msg_count=%d, msg_offset=%d\n",
+ method_name, __LINE__, i, swpRecCount_,
+ msgBuf->nodeInfo.seq_num,
+ lastSeqNum_,
+ msgBuf->msgInfo.msg_count,
+ msgBuf->msgInfo.msg_offset);
+ }
// reset msg length to zero to initialize for PopMsg()
msgBuf->msgInfo.msg_offset = 0;
@@ -6814,7 +6787,7 @@ bool CCluster::ProcessClusterData( struct sync_buffer_def * syncBuf,
if ( deferredTmSync )
{ // This node has sent a TmSync message. Process it now.
if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
- trace_printf("%s@%d - Handling deferred TmSync message for "
+ trace_printf("%s@%d - Handling deferred TmSync messages for "
"node %d\n", method_name, __LINE__, i);
struct internal_msg_def *msg;
@@ -6840,16 +6813,9 @@ bool CCluster::ProcessClusterData( struct sync_buffer_def * syncBuf,
}
else if ( !deferredTmSync )
{
- // temp trace
- if (trace_settings & TRACE_SYNC)
- {
- trace_printf("%s@%d - For node %d, swpRecCount_=%d, "
- "seq_num=%lld,msg_count=%d, msg_offset=%d\n",
- method_name, __LINE__, i, swpRecCount_,
- msgBuf->nodeInfo.seq_num,
- msgBuf->msgInfo.msg_count,
- msgBuf->msgInfo.msg_offset);
- }
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf("%s@%d - Handling messages for "
+ "node %d\n", method_name, __LINE__, i);
do
{
// Get the next sync msg for the node
@@ -7103,15 +7069,20 @@ bool CCluster::exchangeNodeData ( )
reconnected:
if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
- trace_printf( "%s@%d - doing Allgather size=%d, swpRecCount_=%d, message count=%d "
- "message seq_num=%lld, seqNum_=%lld, lastSeqNum_=%lld\n"
+ trace_printf( "%s@%d - doing Allgather size=%d, swpRecCount_=%d, "
+ "message count=%d, message seq_num=%lld, "
+ "seqNum_=%lld, lastSeqNum_=%lld, lowSeqNum_=%lld, "
+ "highSeqNum_=%lld, reconnectSeqNum_=%lld\n"
, method_name, __LINE__
, Nodes->GetSyncSize()
, swpRecCount_
, send_buffer->msgInfo.msg_count
, send_buffer->nodeInfo.seq_num
, seqNum_
- , lastSeqNum_);
+ , lastSeqNum_
+ , lowSeqNum_
+ , highSeqNum_
+ , reconnectSeqNum_);
struct timespec ts_ag_begin;
clock_gettime(CLOCK_REALTIME, &ts_ag_begin);
@@ -7191,24 +7162,36 @@ reconnected:
, recv_buffer
, status
, send_buffer->nodeInfo.change_nid);
- }
- if ( ProcessClusterData( recv_buffer, send_buffer, false ) )
- { // There is a TmSync message remaining to be handled
- ProcessClusterData( recv_buffer, send_buffer, true );
- }
- else
- {
if ( lastAllgatherWithLastSyncBuffer )
{
seqNum_ = savedSeqNum;
lastAllgatherWithLastSyncBuffer = false;
send_buffer = Nodes->GetSyncBuffer();
+
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - Resetting lastAllgatherWithLastSyncBuffer=%d\n"
+ , method_name, __LINE__
+ , lastAllgatherWithLastSyncBuffer);
+
goto reconnected;
}
if ( reconnectSeqNum_ != 0 )
{
+
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - Allgather IO retry, swpRecCount_=%d, "
+ "seqNum_=%lld, lastSeqNum_=%lld, lowSeqNum_=%lld, "
+ "highSeqNum_=%lld, reconnectSeqNum_=%lld\n"
+ , method_name, __LINE__
+ , swpRecCount_
+ , seqNum_
+ , lastSeqNum_
+ , lowSeqNum_
+ , highSeqNum_
+ , reconnectSeqNum_);
+
// The Allgather() has executed a reconnect at reconnectSeqNum_.
// The UpdateClusterState has set the lowSeqNum_and highSeqNum_
// in the current IO exchange which will indicate whether there is
@@ -7224,6 +7207,12 @@ reconnected:
// Indicate to follow up the next exchange with current SyncBuffer
lastAllgatherWithLastSyncBuffer = true;
lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0;
+
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - Setting lastAllgatherWithLastSyncBuffer=%d\n"
+ , method_name, __LINE__
+ , lastAllgatherWithLastSyncBuffer);
+
goto reconnected;
}
else if (seqNum_ < highSeqNum_)
@@ -7231,10 +7220,25 @@ reconnected:
// Redo exchange with the current SyncBuffer
send_buffer = Nodes->GetSyncBuffer();
lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0;
+
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - lastAllgatherWithLastSyncBuffer=%d\n"
+ , method_name, __LINE__
+ , lastAllgatherWithLastSyncBuffer);
+
goto reconnected;
}
+ lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0;
}
+ }
+
+ if ( ProcessClusterData( recv_buffer, send_buffer, false ) )
+ { // There is a TmSync message remaining to be handled
+ ProcessClusterData( recv_buffer, send_buffer, true );
+ }
+ if (swpRecCount_ == 1)
+ {
// Save the sync buffer and corresponding sequence number we just processed
// On reconnect we must resend the last buffer and the current buffer
// to ensure dropped buffers are processed by all monitor processe in the
@@ -7246,22 +7250,21 @@ reconnected:
if ( ++seqNum_ == 0) seqNum_ = 1;
}
- // ?? Need the following? Possibly not since maybe all sync cycle
- // dependent code was removed -- need to check.
// Wake up any threads waiting on the completion of a sync cycle
syncCycle_.wakeAll();
if (doShutdown) result = checkIfDone( );
- --swpRecCount_;
-
if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
trace_printf( "%s@%d - node data exchange completed, swpRecCount_=%d, "
- "seqNum_=%lld, lastSeqNum_=%lld\n"
+ "seqNum_=%lld, lastSeqNum_=%lld, reconnectSeqNum_=%lld\n"
, method_name, __LINE__
, swpRecCount_
, seqNum_
- , lastSeqNum_);
+ , lastSeqNum_
+ , reconnectSeqNum_);
+
+ --swpRecCount_;
TRACE_EXIT;
@@ -7329,16 +7332,21 @@ void CCluster::exchangeTmSyncData ( struct sync_def *sync, bool bumpSync )
reconnected:
- if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_TMSYNC))
- trace_printf( "%s@%d - doing Allgather size=%d, swpRecCount_=%d, message count=%d "
- "message seq_num=%lld, seqNum_=%lld, lastSeqNum_=%lld\n"
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - doing Allgather size=%d, swpRecCount_=%d, "
+ "message count=%d, message seq_num=%lld, "
+ "seqNum_=%lld, lastSeqNum_=%lld, lowSeqNum_=%lld, "
+ "highSeqNum_=%lld, reconnectSeqNum_=%lld\n"
, method_name, __LINE__
, Nodes->GetSyncSize()
, swpRecCount_
, send_buffer->msgInfo.msg_count
, send_buffer->nodeInfo.seq_num
, seqNum_
- , lastSeqNum_);
+ , lastSeqNum_
+ , lowSeqNum_
+ , highSeqNum_
+ , reconnectSeqNum_);
struct timespec ts_ag_begin;
clock_gettime(CLOCK_REALTIME, &ts_ag_begin);
@@ -7418,68 +7426,103 @@ reconnected:
, recv_buffer
, status
, send_buffer->nodeInfo.change_nid);
- }
- if ( ProcessClusterData( recv_buffer, send_buffer, false ) )
- { // There is a TmSync message remaining to be handled
- ProcessClusterData( recv_buffer, send_buffer, true );
- }
+ if ( lastAllgatherWithLastSyncBuffer )
+ {
+ seqNum_ = savedSeqNum;
+ lastAllgatherWithLastSyncBuffer = false;
+ send_buffer = Nodes->GetSyncBuffer();
- if ( lastAllgatherWithLastSyncBuffer )
- {
- seqNum_ = savedSeqNum;
- lastAllgatherWithLastSyncBuffer = false;
- send_buffer = Nodes->GetSyncBuffer();
- goto reconnected;
- }
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - Resetting lastAllgatherWithLastSyncBuffer=%d\n"
+ , method_name, __LINE__
+ , lastAllgatherWithLastSyncBuffer);
- if ( reconnectSeqNum_ != 0 )
- {
- // The Allgather() has executed a reconnect at reconnectSeqNum_.
- // The UpdateClusterState has set the lowSeqNum_and highSeqNum_
- // in the current IO exchange which will indicate whether there is
- // a mismatch in IOs between monitor processes. If there is a mismatch,
- // the lowSeqNum_and highSeqNum_ relative to our current seqNum_
- // will determine how to redrive the exchange of node data.
- if (seqNum_ > lowSeqNum_)
- { // A remote monitor did not receive our last SyncBuffer
- // Redo exchange with the previous SyncBuffer
- send_buffer = Nodes->GetLastSyncBuffer();
- savedSeqNum = seqNum_;
- seqNum_ = lastSeqNum_;
- // Indicate to follow up the next exchange with current SyncBuffer
- lastAllgatherWithLastSyncBuffer = true;
- lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0;
goto reconnected;
}
- else if (seqNum_ < highSeqNum_)
- { // The local monitor did not receive the last remote SyncBuffer
- // Redo exchange with the current SyncBuffer
- send_buffer = Nodes->GetSyncBuffer();
+
+ if ( reconnectSeqNum_ != 0 )
+ {
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - Allgather IO retry, swpRecCount_=%d, "
+ "seqNum_=%lld, lastSeqNum_=%lld, lowSeqNum_=%lld, "
+ "highSeqNum_=%lld, reconnectSeqNum_=%lld\n"
+ , method_name, __LINE__
+ , swpRecCount_
+ , seqNum_
+ , lastSeqNum_
+ , lowSeqNum_
+ , highSeqNum_
+ , reconnectSeqNum_);
+
+ // The Allgather() has executed a reconnect at reconnectSeqNum_.
+ // The UpdateClusterState has set the lowSeqNum_and highSeqNum_
+ // in the current IO exchange which will indicate whether there is
+ // a mismatch in IOs between monitor processes. If there is a mismatch,
+ // the lowSeqNum_and highSeqNum_ relative to our current seqNum_
+ // will determine how to redrive the exchange of node data.
+ if (seqNum_ > lowSeqNum_)
+ { // A remote monitor did not receive our last SyncBuffer
+ // Redo exchange with the previous SyncBuffer
+ send_buffer = Nodes->GetLastSyncBuffer();
+ savedSeqNum = seqNum_;
+ seqNum_ = lastSeqNum_;
+ // Indicate to follow up the next exchange with current SyncBuffer
+ lastAllgatherWithLastSyncBuffer = true;
+ lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0;
+
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - Setting lastAllgatherWithLastSyncBuffer=%d\n"
+ , method_name, __LINE__
+ , lastAllgatherWithLastSyncBuffer);
+
+ goto reconnected;
+ }
+ else if (seqNum_ < highSeqNum_)
+ { // The local monitor did not receive the last remote SyncBuffer
+ // Redo exchange with the current SyncBuffer
+ send_buffer = Nodes->GetSyncBuffer();
+ lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0;
+
+ if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
+ trace_printf( "%s@%d - lastAllgatherWithLastSyncBuffer=%d\n"
+ , method_name, __LINE__
+ , lastAllgatherWithLastSyncBuffer);
+
+ goto reconnected;
+ }
lowSeqNum_ = highSeqNum_ = reconnectSeqNum_ = 0;
- goto reconnected;
}
}
- // Save the sync buffer and corresponding sequence number we just processed
- // On reconnect we must resend the last buffer and the current buffer
- // to ensure dropped buffers are processed by all monitor processe in the
- // correct order
- Nodes->SaveMyLastSyncBuffer();
- lastSeqNum_ = seqNum_;
-
- // Increment count of "Allgather" calls. If wrap-around, start again at 1.
- if ( ++seqNum_ == 0) seqNum_ = 1;
+ if ( ProcessClusterData( recv_buffer, send_buffer, false ) )
+ { // There is a TmSync message remaining to be handled
+ ProcessClusterData( recv_buffer, send_buffer, true );
+ }
- --swpRecCount_;
+ if (swpRecCount_ == 1)
+ {
+ // Save the sync buffer and corresponding sequence number we just processed
+ // On reconnect we must resend the last buffer and the current buffer
+ // to ensure dropped buffers are processed by all monitor processe in the
+ // correct order
+ Nodes->SaveMyLastSyncBuffer();
+ lastSeqNum_ = seqNum_;
+
+ // Increment count of "Allgather" calls. If wrap-around, start again at 1.
+ if ( ++seqNum_ == 0) seqNum_ = 1;
+ }
if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC))
trace_printf( "%s@%d - node data exchange completed, swpRecCount_=%d, "
- "seqNum_=%lld, lastSeqNum_=%lld\n"
+ "seqNum_=%lld, lastSeqNum_=%lld, reconnectSeqNum_=%lld\n"
, method_name, __LINE__
, swpRecCount_
, seqNum_
- , lastSeqNum_);
+ , lastSeqNum_
+ , reconnectSeqNum_);
+
+ --swpRecCount_;
TRACE_EXIT;
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/cluster.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h
index e743341..58d3540 100644
--- a/core/sqf/monitor/linux/cluster.h
+++ b/core/sqf/monitor/linux/cluster.h
@@ -336,9 +336,9 @@ private:
int Allgather(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
int AllgatherIB(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
int AllgatherSock(int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats);
- int AllgatherSockReconnect( MPI_Status *stats );
- int AcceptSockPeer( CNode *node, int peer );
- int ConnectSockPeer( CNode *node, int peer );
+ int AllgatherSockReconnect( MPI_Status *stats, bool reestablishConnections = false );
+ int AcceptSockPeer( CNode *node, int peer, bool reestablishConnections = false );
+ int ConnectSockPeer( CNode *node, int peer, bool reestablishConnections = false );
void ValidateClusterState( cluster_state_def_t nodestate[],
bool haveDivergence );
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/cmsh.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cmsh.cxx b/core/sqf/monitor/linux/cmsh.cxx
index c8c4975..709b293 100644
--- a/core/sqf/monitor/linux/cmsh.cxx
+++ b/core/sqf/monitor/linux/cmsh.cxx
@@ -91,6 +91,46 @@ int CCmsh::PopulateClusterState( void )
///////////////////////////////////////////////////////////////////////////////
//
+// Function/Method: CCmsh::PopulateNodeState
+//
+// Description: Executes the command string passed in the constructor and
+// populates the internal node state list with the state of each node
+// in the cluster. Clients can then inquire about state of each node.
+//
+// Return:
+// 0 - success
+// -1 - failure
+//
+///////////////////////////////////////////////////////////////////////////////
+int CCmsh::PopulateNodeState( const char *nodeName )
+{
+ const char method_name[] = "CCmsh::PopulateNodeState";
+ TRACE_ENTRY;
+
+ int rc;
+
+ // The caller should save and close stdin before calling this proc
+ // and restore it when done. This is to prevent ssh from consuming
+ // caller's stdin contents when executing the command.
+ string commandArgs;
+ {
+ commandArgs = "-n ";
+ commandArgs += nodeName;
+ }
+ rc = ExecuteCommand( commandArgs.c_str(), nodeStateList_ );
+ if ( rc == -1 )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf(la_buf, "[%s] Error: While executing '%s' command\n", method_name, command_.data());
+ mon_log_write(MON_CMSH_GET_CLUSTER_STATE_1, SQ_LOG_ERR, la_buf);
+ }
+
+ TRACE_EXIT;
+ return( rc );
+}
+
+///////////////////////////////////////////////////////////////////////////////
+//
// Function/Method: CCmsh::GetClusterState
//
// Description: Updates the state of the nodes in the physicalNodeMap passed in
@@ -128,31 +168,97 @@ int CCmsh::GetClusterState( PhysicalNodeNameMap_t &physicalNodeMap )
if (it != physicalNodeMap.end())
{
// TEST_POINT and Exclude List : to force state down on node name
- const char *downNodeName = getenv( TP001_NODE_DOWN );
- const char *downNodeList = getenv( TRAF_EXCLUDE_LIST );
- string downNodeString = " ";
- if (downNodeList)
- {
- downNodeString += downNodeList;
- downNodeString += " ";
- }
- string downNodeToFind = " ";
- downNodeToFind += nodeName.c_str();
- downNodeToFind += " ";
- if (((downNodeList != NULL) &&
- strstr(downNodeString.c_str(),downNodeToFind.c_str())) ||
- ( (downNodeName != NULL) &&
- !strcmp( downNodeName, nodeName.c_str()) ))
- {
- nodeState = StateDown;
- }
-
+ const char *downNodeName = getenv( TP001_NODE_DOWN );
+ const char *downNodeList = getenv( TRAF_EXCLUDE_LIST );
+ string downNodeString = " ";
+ if (downNodeList)
+ {
+ downNodeString += downNodeList;
+ downNodeString += " ";
+ }
+ string downNodeToFind = " ";
+ downNodeToFind += nodeName.c_str();
+ downNodeToFind += " ";
+ if (((downNodeList != NULL) &&
+ strstr(downNodeString.c_str(),downNodeToFind.c_str())) ||
+ ((downNodeName != NULL) &&
+ !strcmp(downNodeName, nodeName.c_str())))
+ {
+ nodeState = StateDown;
+ }
+
// Set physical node state
physicalNode = it->second;
physicalNode->SetState( nodeState );
}
}
- }
+ }
+
+ TRACE_EXIT;
+ return( rc );
+}
+
+///////////////////////////////////////////////////////////////////////////////
+//
+// Function/Method: CCmsh::GetNodeState
+//
+// Description: Updates the state of the nodeName in the physicalNode passed in
+// as a parameter. Caller should ensure that the node names are already
+// present in the physicalNodeMap.
+//
+// Return:
+// 0 - success
+// -1 - failure
+//
+///////////////////////////////////////////////////////////////////////////////
+int CCmsh::GetNodeState( char *name ,CPhysicalNode *physicalNode )
+{
+ const char method_name[] = "CCmsh::GetNodeState";
+ TRACE_ENTRY;
+
+ int rc;
+
+ rc = PopulateNodeState( name );
+
+ if ( rc != -1 )
+ {
+ // Parse each line extracting name and state
+ string nodeName;
+ NodeState_t nodeState;
+ PhysicalNodeNameMap_t::iterator it;
+
+ StringList_t::iterator alit;
+ for ( alit = nodeStateList_.begin(); alit != nodeStateList_.end() ; alit++ )
+ {
+ ParseNodeStatus( *alit, nodeName, nodeState );
+
+ // TEST_POINT and Exclude List : to force state down on node name
+ const char *downNodeName = getenv( TP001_NODE_DOWN );
+ const char *downNodeList = getenv( TRAF_EXCLUDE_LIST );
+ string downNodeString = " ";
+ if (downNodeList)
+ {
+ downNodeString += downNodeList;
+ downNodeString += " ";
+ }
+ string downNodeToFind = " ";
+ downNodeToFind += nodeName.c_str();
+ downNodeToFind += " ";
+ if (((downNodeList != NULL) &&
+ strstr(downNodeString.c_str(),downNodeToFind.c_str())) ||
+ ((downNodeName != NULL) &&
+ !strcmp(downNodeName, nodeName.c_str())))
+ {
+ nodeState = StateDown;
+ }
+
+ if (!strcmp(name, nodeName.c_str()))
+ {
+ // Set physical node state
+ physicalNode->SetState( nodeState );
+ }
+ }
+ }
TRACE_EXIT;
return( rc );
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/cmsh.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/cmsh.h b/core/sqf/monitor/linux/cmsh.h
index dce2e79..f1226bd 100644
--- a/core/sqf/monitor/linux/cmsh.h
+++ b/core/sqf/monitor/linux/cmsh.h
@@ -44,6 +44,7 @@ public:
int PopulateClusterState( void );
int GetClusterState( PhysicalNodeNameMap_t &physicalNodeMap );
+ int GetNodeState( char *name ,CPhysicalNode *physicalNode );
bool IsInitialized( void );
void ClearClusterState( void ) { nodeStateList_.clear(); }
NodeState_t GetNodeState( char nodeName[] );
@@ -52,6 +53,7 @@ private:
NodeStateList_t nodeStateList_;
void ParseNodeStatus( string &nodeStatus, string &nodeName, NodeState_t &state );
+ int PopulateNodeState( const char *nodeName );
};
#endif /*CMSH_H_*/
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/commaccept.cxx
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/commaccept.cxx b/core/sqf/monitor/linux/commaccept.cxx
index a9045af..21b30a6 100644
--- a/core/sqf/monitor/linux/commaccept.cxx
+++ b/core/sqf/monitor/linux/commaccept.cxx
@@ -942,7 +942,7 @@ void CCommAccept::commAcceptorIB()
{
char buf[MON_STRING_BUF_SIZE];
MPI_Error_class( rc, &errClass );
- snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
+ snprintf(buf, sizeof(buf), "[%s], cannot accept remote monitor: %s.\n",
method_name, ErrorMsg(rc));
mon_log_write(MON_COMMACCEPT_15, SQ_LOG_ERR, buf);
@@ -1101,13 +1101,44 @@ void CCommAccept::start()
TRACE_EXIT;
}
-void CCommAccept::setAccepting( bool accepting )
+void CCommAccept::startAccepting( void )
{
+ const char method_name[] = "CCommAccept::startAccepting";
+ TRACE_ENTRY;
+
+ CAutoLock lock( getLocker( ) );
+
+ if ( !accepting_ )
+ {
+ accepting_ = true;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Enabling accepting_=%d\n"
+ , method_name, __LINE__, accepting_ );
+ }
+ CLock::wakeOne();
+ }
+
+ TRACE_EXIT;
+}
+
+void CCommAccept::stopAccepting( void )
+{
+ const char method_name[] = "CCommAccept::stopAccepting";
+ TRACE_ENTRY;
+
CAutoLock lock( getLocker( ) );
- accepting_ = accepting;
if ( accepting_ )
{
+ accepting_ = false;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Disabling accepting_=%d\n"
+ , method_name, __LINE__, accepting_ );
+ }
CLock::wakeOne();
}
+
+ TRACE_EXIT;
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/commaccept.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/commaccept.h b/core/sqf/monitor/linux/commaccept.h
index ac85efb..c32d975 100644
--- a/core/sqf/monitor/linux/commaccept.h
+++ b/core/sqf/monitor/linux/commaccept.h
@@ -41,7 +41,8 @@ public:
bool isAccepting( void ) { CAutoLock lock(getLocker()); return( accepting_ ); }
void processNewComm( MPI_Comm interComm );
void processNewSock( int sockFd );
- void setAccepting( bool accepting );
+ void startAccepting( void );
+ void stopAccepting( void );
void start( void );
void shutdownWork( void );
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/internal.h
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/internal.h b/core/sqf/monitor/linux/internal.h
index f69f424..b0f118c 100644
--- a/core/sqf/monitor/linux/internal.h
+++ b/core/sqf/monitor/linux/internal.h
@@ -442,7 +442,7 @@ typedef struct nodeId_s
int creatorPNid;
int creatorShellPid;
Verifier_t creatorShellVerifier;
- bool creator; // NEW monitor set to true to tell creator it is the CREATOR
+ bool creator; // NEW monitor sets to true to tell creator it is the CREATOR
bool ping; // Monitor sets to true to tell remote monitor
// it is just checking that it can communicate with it.
// Used during allgather reconnect
http://git-wip-us.apache.org/repos/asf/trafodion/blob/e832d827/core/sqf/monitor/linux/macros.gmk
----------------------------------------------------------------------
diff --git a/core/sqf/monitor/linux/macros.gmk b/core/sqf/monitor/linux/macros.gmk
index e5324f1..5bb24ec 100644
--- a/core/sqf/monitor/linux/macros.gmk
+++ b/core/sqf/monitor/linux/macros.gmk
@@ -53,8 +53,8 @@ endif
MPI_CC := $(CC)
MPI_CXX := $(CXX)
-CC = mpicc
-CXX = $(MPICH_ROOT)/bin/mpicxx
+CC = mpicc $(ARCH_SPECIFIC_OPTION)
+CXX = $(MPICH_ROOT)/bin/mpicxx $(ARCH_SPECIFIC_OPTION)
ifeq ($(SQ_MTYPE),32)
CC += -mpi32