You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ja...@apache.org on 2015/06/29 09:00:59 UTC
[38/50] [abbrv] git commit: updated refs/heads/dhcpoffload to 45721ae
Formatting class ClusterManagerImpl
- Splitting format commit from fix commit.
Signed-off-by: wilderrodrigues <wr...@schubergphilis.com>
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/ea1f2eb0
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/ea1f2eb0
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/ea1f2eb0
Branch: refs/heads/dhcpoffload
Commit: ea1f2eb049f71d3bfc2fddc5d759c9685ad22513
Parents: af902fd
Author: wilderrodrigues <wr...@schubergphilis.com>
Authored: Thu Jun 25 07:59:35 2015 +0200
Committer: wilderrodrigues <wr...@schubergphilis.com>
Committed: Thu Jun 25 09:13:46 2015 +0200
----------------------------------------------------------------------
.../com/cloud/cluster/ClusterManagerImpl.java | 415 ++++++++++---------
1 file changed, 213 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/ea1f2eb0/framework/cluster/src/com/cloud/cluster/ClusterManagerImpl.java
----------------------------------------------------------------------
diff --git a/framework/cluster/src/com/cloud/cluster/ClusterManagerImpl.java b/framework/cluster/src/com/cloud/cluster/ClusterManagerImpl.java
index 8c99ff3..02597c3 100644
--- a/framework/cluster/src/com/cloud/cluster/ClusterManagerImpl.java
+++ b/framework/cluster/src/com/cloud/cluster/ClusterManagerImpl.java
@@ -41,13 +41,12 @@ import javax.ejb.Local;
import javax.inject.Inject;
import javax.naming.ConfigurationException;
-import org.apache.log4j.Logger;
-
import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.Configurable;
import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
+import org.apache.log4j.Logger;
import com.cloud.cluster.dao.ManagementServerHostDao;
import com.cloud.cluster.dao.ManagementServerHostPeerDao;
@@ -130,21 +129,21 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
setRunLevel(ComponentLifecycle.RUN_LEVEL_FRAMEWORK);
}
- private void registerRequestPdu(ClusterServiceRequestPdu pdu) {
+ private void registerRequestPdu(final ClusterServiceRequestPdu pdu) {
synchronized (_outgoingPdusWaitingForAck) {
_outgoingPdusWaitingForAck.put(pdu.getSequenceId(), pdu);
}
}
@Override
- public void registerDispatcher(Dispatcher dispatcher) {
+ public void registerDispatcher(final Dispatcher dispatcher) {
_dispatcher = dispatcher;
}
- private ClusterServiceRequestPdu popRequestPdu(long ackSequenceId) {
+ private ClusterServiceRequestPdu popRequestPdu(final long ackSequenceId) {
synchronized (_outgoingPdusWaitingForAck) {
if (_outgoingPdusWaitingForAck.get(ackSequenceId) != null) {
- ClusterServiceRequestPdu pdu = _outgoingPdusWaitingForAck.get(ackSequenceId);
+ final ClusterServiceRequestPdu pdu = _outgoingPdusWaitingForAck.get(ackSequenceId);
_outgoingPdusWaitingForAck.remove(ackSequenceId);
return pdu;
}
@@ -153,20 +152,21 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return null;
}
- private void cancelClusterRequestToPeer(String strPeer) {
- List<ClusterServiceRequestPdu> candidates = new ArrayList<ClusterServiceRequestPdu>();
+ private void cancelClusterRequestToPeer(final String strPeer) {
+ final List<ClusterServiceRequestPdu> candidates = new ArrayList<ClusterServiceRequestPdu>();
synchronized (_outgoingPdusWaitingForAck) {
- for (Map.Entry<Long, ClusterServiceRequestPdu> entry : _outgoingPdusWaitingForAck.entrySet()) {
- if (entry.getValue().getDestPeer().equalsIgnoreCase(strPeer))
+ for (final Map.Entry<Long, ClusterServiceRequestPdu> entry : _outgoingPdusWaitingForAck.entrySet()) {
+ if (entry.getValue().getDestPeer().equalsIgnoreCase(strPeer)) {
candidates.add(entry.getValue());
+ }
}
- for (ClusterServiceRequestPdu pdu : candidates) {
+ for (final ClusterServiceRequestPdu pdu : candidates) {
_outgoingPdusWaitingForAck.remove(pdu.getSequenceId());
}
}
- for (ClusterServiceRequestPdu pdu : candidates) {
+ for (final ClusterServiceRequestPdu pdu : candidates) {
s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + pdu.getJsonPackage());
synchronized (pdu) {
pdu.notifyAll();
@@ -174,22 +174,22 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
}
- private void addOutgoingClusterPdu(ClusterServicePdu pdu) {
+ private void addOutgoingClusterPdu(final ClusterServicePdu pdu) {
synchronized (_clusterPduOutgoingQueue) {
_clusterPduOutgoingQueue.add(pdu);
_clusterPduOutgoingQueue.notifyAll();
}
}
- private ClusterServicePdu popOutgoingClusterPdu(long timeoutMs) {
+ private ClusterServicePdu popOutgoingClusterPdu(final long timeoutMs) {
synchronized (_clusterPduOutgoingQueue) {
try {
_clusterPduOutgoingQueue.wait(timeoutMs);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
}
if (_clusterPduOutgoingQueue.size() > 0) {
- ClusterServicePdu pdu = _clusterPduOutgoingQueue.get(0);
+ final ClusterServicePdu pdu = _clusterPduOutgoingQueue.get(0);
_clusterPduOutgoingQueue.remove(0);
return pdu;
}
@@ -197,22 +197,22 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return null;
}
- private void addIncomingClusterPdu(ClusterServicePdu pdu) {
+ private void addIncomingClusterPdu(final ClusterServicePdu pdu) {
synchronized (_clusterPduIncomingQueue) {
_clusterPduIncomingQueue.add(pdu);
_clusterPduIncomingQueue.notifyAll();
}
}
- private ClusterServicePdu popIncomingClusterPdu(long timeoutMs) {
+ private ClusterServicePdu popIncomingClusterPdu(final long timeoutMs) {
synchronized (_clusterPduIncomingQueue) {
try {
_clusterPduIncomingQueue.wait(timeoutMs);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
}
if (_clusterPduIncomingQueue.size() > 0) {
- ClusterServicePdu pdu = _clusterPduIncomingQueue.get(0);
+ final ClusterServicePdu pdu = _clusterPduIncomingQueue.get(0);
_clusterPduIncomingQueue.remove(0);
return pdu;
}
@@ -241,15 +241,16 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private void onSendingClusterPdu() {
while (true) {
try {
- ClusterServicePdu pdu = popOutgoingClusterPdu(1000);
- if (pdu == null)
+ final ClusterServicePdu pdu = popOutgoingClusterPdu(1000);
+ if (pdu == null) {
continue;
+ }
ClusterService peerService = null;
for (int i = 0; i < 2; i++) {
try {
peerService = getPeerService(pdu.getDestPeer());
- } catch (RemoteException e) {
+ } catch (final RemoteException e) {
s_logger.error("Unable to get cluster service on peer : " + pdu.getDestPeer());
}
@@ -257,30 +258,31 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + ". agent: " + pdu.getAgentId() + ", pdu seq: " +
- pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
+ pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
}
- long startTick = System.currentTimeMillis();
- String strResult = peerService.execute(pdu);
+ final long startTick = System.currentTimeMillis();
+ final String strResult = peerService.execute(pdu);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + " completed. time: " +
- (System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId() + ", pdu seq: " + pdu.getSequenceId() +
- ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
+ (System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId() + ", pdu seq: " + pdu.getSequenceId() +
+ ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
}
- if ("true".equals(strResult))
+ if ("true".equals(strResult)) {
break;
+ }
- } catch (RemoteException e) {
+ } catch (final RemoteException e) {
invalidatePeerService(pdu.getDestPeer());
if (s_logger.isInfoEnabled()) {
s_logger.info("Exception on remote execution, peer: " + pdu.getDestPeer() + ", iteration: " + i + ", exception message :" +
- e.getMessage());
+ e.getMessage());
}
}
}
}
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
}
@@ -290,14 +292,15 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
while (true) {
try {
final ClusterServicePdu pdu = popIncomingClusterPdu(1000);
- if (pdu == null)
+ if (pdu == null) {
continue;
+ }
_executor.execute(new ManagedContextRunnable() {
@Override
protected void runInContext() {
if (pdu.getPduType() == ClusterServicePdu.PDU_TYPE_RESPONSE) {
- ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
+ final ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
if (requestPdu != null) {
requestPdu.setResponseResult(pdu.getJsonPackage());
synchronized (requestPdu) {
@@ -308,11 +311,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
} else {
String result = _dispatcher.dispatch(pdu);
- if (result == null)
+ if (result == null) {
result = "";
+ }
if (pdu.getPduType() == ClusterServicePdu.PDU_TYPE_REQUEST) {
- ClusterServicePdu responsePdu = new ClusterServicePdu();
+ final ClusterServicePdu responsePdu = new ClusterServicePdu();
responsePdu.setPduType(ClusterServicePdu.PDU_TYPE_RESPONSE);
responsePdu.setSourcePeer(pdu.getDestPeer());
responsePdu.setDestPeer(pdu.getSourcePeer());
@@ -324,14 +328,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
}
});
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.error("Unexcpeted exception: ", e);
}
}
}
@Override
- public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
+ public void OnReceiveClusterServicePdu(final ClusterServicePdu pdu) {
addIncomingClusterPdu(pdu);
}
@@ -343,17 +347,17 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
* peer is running regardless of version.
* @return true if there are peers running and false if not.
*/
- public static final boolean arePeersRunning(String notVersion) {
+ public static final boolean arePeersRunning(final String notVersion) {
return false; // TODO: Leaving this for Kelven to take care of.
}
@Override
- public void broadcast(long agentId, String cmds) {
- Date cutTime = DateUtil.currentGMTTime();
+ public void broadcast(final long agentId, final String cmds) {
+ final Date cutTime = DateUtil.currentGMTTime();
- List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
- for (ManagementServerHostVO peer : peers) {
- String peerName = Long.toString(peer.getMsid());
+ final List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
+ for (final ManagementServerHostVO peer : peers) {
+ final String peerName = Long.toString(peer.getMsid());
if (getSelfPeerName().equals(peerName)) {
continue; // Skip myself.
}
@@ -362,14 +366,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
s_logger.debug("Forwarding " + cmds + " to " + peer.getMsid());
}
executeAsync(peerName, agentId, cmds, true);
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.warn("Caught exception while talkign to " + peer.getMsid());
}
}
}
- public void executeAsync(String strPeer, long agentId, String cmds, boolean stopOnError) {
- ClusterServicePdu pdu = new ClusterServicePdu();
+ public void executeAsync(final String strPeer, final long agentId, final String cmds, final boolean stopOnError) {
+ final ClusterServicePdu pdu = new ClusterServicePdu();
pdu.setSourcePeer(getSelfPeerName());
pdu.setDestPeer(strPeer);
pdu.setAgentId(agentId);
@@ -379,12 +383,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
- public String execute(String strPeer, long agentId, String cmds, boolean stopOnError) {
+ public String execute(final String strPeer, final long agentId, final String cmds, final boolean stopOnError) {
if (s_logger.isDebugEnabled()) {
s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " + cmds);
}
- ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu();
+ final ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu();
pdu.setSourcePeer(getSelfPeerName());
pdu.setDestPeer(strPeer);
pdu.setAgentId(agentId);
@@ -396,7 +400,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
synchronized (pdu) {
try {
pdu.wait();
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
}
}
@@ -412,7 +416,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
- public ManagementServerHostVO getPeer(String mgmtServerId) {
+ public ManagementServerHostVO getPeer(final String mgmtServerId) {
return _mshostDao.findByMsid(Long.parseLong(mgmtServerId));
}
@@ -426,7 +430,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
- public void registerListener(ClusterManagerListener listener) {
+ public void registerListener(final ClusterManagerListener listener) {
// Note : we don't check duplicates
synchronized (_listeners) {
@@ -437,7 +441,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
- public void unregisterListener(ClusterManagerListener listener) {
+ public void unregisterListener(final ClusterManagerListener listener) {
synchronized (_listeners) {
s_logger.info("unregister cluster listener " + listener.getClass());
@@ -445,17 +449,17 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
}
- public void notifyNodeJoined(List<ManagementServerHostVO> nodeList) {
+ public void notifyNodeJoined(final List<ManagementServerHostVO> nodeList) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node join to listeners.");
- for (ManagementServerHostVO mshost : nodeList) {
+ for (final ManagementServerHostVO mshost : nodeList) {
s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
}
}
synchronized (_listeners) {
- for (ClusterManagerListener listener : _listeners) {
+ for (final ClusterManagerListener listener : _listeners) {
listener.onManagementNodeJoined(nodeList, _mshostId);
}
}
@@ -463,19 +467,20 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
SubscriptionMgr.getInstance().notifySubscribers(ClusterManager.ALERT_SUBJECT, this, new ClusterNodeJoinEventArgs(_mshostId, nodeList));
}
- public void notifyNodeLeft(List<ManagementServerHostVO> nodeList) {
+ public void notifyNodeLeft(final List<ManagementServerHostVO> nodeList) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node left to listeners.");
}
- for (ManagementServerHostVO mshost : nodeList) {
- if (s_logger.isDebugEnabled())
+ for (final ManagementServerHostVO mshost : nodeList) {
+ if (s_logger.isDebugEnabled()) {
s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
+ }
cancelClusterRequestToPeer(String.valueOf(mshost.getMsid()));
}
synchronized (_listeners) {
- for (ClusterManagerListener listener : _listeners) {
+ for (final ClusterManagerListener listener : _listeners) {
listener.onManagementNodeLeft(nodeList, _mshostId);
}
}
@@ -484,24 +489,25 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
public void notifyNodeIsolated() {
- if (s_logger.isDebugEnabled())
+ if (s_logger.isDebugEnabled()) {
s_logger.debug("Notify management server node isolation to listeners");
+ }
synchronized (_listeners) {
- for (ClusterManagerListener listener : _listeners) {
+ for (final ClusterManagerListener listener : _listeners) {
listener.onManagementNodeIsolated();
}
}
}
- public ClusterService getPeerService(String strPeer) throws RemoteException {
+ public ClusterService getPeerService(final String strPeer) throws RemoteException {
synchronized (_clusterPeers) {
if (_clusterPeers.containsKey(strPeer)) {
return _clusterPeers.get(strPeer);
}
}
- ClusterService service = _currentServiceAdapter.getPeerService(strPeer);
+ final ClusterService service = _currentServiceAdapter.getPeerService(strPeer);
if (service != null) {
synchronized (_clusterPeers) {
@@ -516,7 +522,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return service;
}
- public void invalidatePeerService(String strPeer) {
+ public void invalidatePeerService(final String strPeer) {
synchronized (_clusterPeers) {
if (_clusterPeers.containsKey(strPeer)) {
_clusterPeers.remove(strPeer);
@@ -528,11 +534,11 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return new ManagedContextRunnable() {
@Override
protected void runInContext() {
- TransactionLegacy txn = TransactionLegacy.open("ClusterHeartbeat");
+ final TransactionLegacy txn = TransactionLegacy.open("ClusterHeartbeat");
try {
- Profiler profiler = new Profiler();
- Profiler profilerHeartbeatUpdate = new Profiler();
- Profiler profilerPeerScan = new Profiler();
+ final Profiler profiler = new Profiler();
+ final Profiler profilerHeartbeatUpdate = new Profiler();
+ final Profiler profilerPeerScan = new Profiler();
try {
profiler.start();
@@ -563,13 +569,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
profiler.stop();
if (profiler.getDuration() >= HeartbeatInterval.value()) {
- if (s_logger.isDebugEnabled())
+ if (s_logger.isDebugEnabled()) {
s_logger.debug("Management server heartbeat takes too long to finish. profiler: " + profiler.toString() + ", profilerHeartbeatUpdate: " +
- profilerHeartbeatUpdate.toString() + ", profilerPeerScan: " + profilerPeerScan.toString());
+ profilerHeartbeatUpdate.toString() + ", profilerPeerScan: " + profilerPeerScan.toString());
+ }
}
}
- } catch (CloudRuntimeException e) {
+ } catch (final CloudRuntimeException e) {
s_logger.error("Runtime DB exception ", e.getCause());
if (e.getCause() instanceof ClusterInvalidSessionException) {
@@ -580,9 +587,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if (isRootCauseConnectionRelated(e.getCause())) {
invalidHeartbeatConnection();
}
- } catch (ActiveFencingException e) {
+ } catch (final ActiveFencingException e) {
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.error("Unexpected exception in cluster heartbeat", e);
if (isRootCauseConnectionRelated(e.getCause())) {
invalidHeartbeatConnection();
@@ -598,7 +605,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private boolean isRootCauseConnectionRelated(Throwable e) {
while (e != null) {
if (e instanceof SQLRecoverableException || e instanceof SQLNonTransientException) {
- return true;
+ return true;
}
e = e.getCause();
@@ -609,7 +616,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private Connection getHeartbeatConnection() throws SQLException {
if (_heartbeatConnection == null) {
- Connection conn = TransactionLegacy.getStandaloneConnectionWithException();
+ final Connection conn = TransactionLegacy.getStandaloneConnectionWithException();
_heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartbeat", conn, false);
}
@@ -618,7 +625,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private void invalidHeartbeatConnection() {
if (_heartbeatConnection != null) {
- Connection conn = TransactionLegacy.getStandaloneConnection();
+ final Connection conn = TransactionLegacy.getStandaloneConnection();
if (conn != null) {
_heartbeatConnection.reset(conn);
} else {
@@ -638,7 +645,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
synchronized (_notificationMsgs) {
try {
_notificationMsgs.wait(1000);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
}
}
@@ -646,94 +653,94 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
while ((msg = getNextNotificationMessage()) != null) {
try {
switch (msg.getMessageType()) {
- case nodeAdded:
- if (msg.getNodes() != null && msg.getNodes().size() > 0) {
- Profiler profiler = new Profiler();
- profiler.start();
-
- notifyNodeJoined(msg.getNodes());
-
- profiler.stop();
- if (profiler.getDuration() > 1000) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
- }
- } else {
- s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
+ case nodeAdded:
+ if (msg.getNodes() != null && msg.getNodes().size() > 0) {
+ final Profiler profiler = new Profiler();
+ profiler.start();
+
+ notifyNodeJoined(msg.getNodes());
+
+ profiler.stop();
+ if (profiler.getDuration() > 1000) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
}
+ } else {
+ s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
}
- break;
-
- case nodeRemoved:
- if (msg.getNodes() != null && msg.getNodes().size() > 0) {
- Profiler profiler = new Profiler();
- profiler.start();
-
- notifyNodeLeft(msg.getNodes());
-
- profiler.stop();
- if (profiler.getDuration() > 1000) {
- if (s_logger.isDebugEnabled()) {
- s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
- }
- } else {
- s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
+ }
+ break;
+
+ case nodeRemoved:
+ if (msg.getNodes() != null && msg.getNodes().size() > 0) {
+ final Profiler profiler = new Profiler();
+ profiler.start();
+
+ notifyNodeLeft(msg.getNodes());
+
+ profiler.stop();
+ if (profiler.getDuration() > 1000) {
+ if (s_logger.isDebugEnabled()) {
+ s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
+ } else {
+ s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
}
- break;
+ }
+ break;
- case nodeIsolated:
- notifyNodeIsolated();
- break;
+ case nodeIsolated:
+ notifyNodeIsolated();
+ break;
- default:
- assert (false);
- break;
+ default:
+ assert false;
+ break;
}
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.warn("Unexpected exception during cluster notification. ", e);
}
}
try {
Thread.sleep(1000);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
}
}
}
};
}
- private void queueNotification(ClusterManagerMessage msg) {
+ private void queueNotification(final ClusterManagerMessage msg) {
synchronized (_notificationMsgs) {
_notificationMsgs.add(msg);
_notificationMsgs.notifyAll();
}
switch (msg.getMessageType()) {
- case nodeAdded: {
- List<ManagementServerHostVO> l = msg.getNodes();
- if (l != null && l.size() > 0) {
- for (ManagementServerHostVO mshost : l) {
- _mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Up);
- }
+ case nodeAdded: {
+ final List<ManagementServerHostVO> l = msg.getNodes();
+ if (l != null && l.size() > 0) {
+ for (final ManagementServerHostVO mshost : l) {
+ _mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Up);
}
}
- break;
+ }
+ break;
- case nodeRemoved: {
- List<ManagementServerHostVO> l = msg.getNodes();
- if (l != null && l.size() > 0) {
- for (ManagementServerHostVO mshost : l) {
- _mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Down);
- }
+ case nodeRemoved: {
+ final List<ManagementServerHostVO> l = msg.getNodes();
+ if (l != null && l.size() > 0) {
+ for (final ManagementServerHostVO mshost : l) {
+ _mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Down);
}
}
- break;
+ }
+ break;
- default:
- break;
+ default:
+ break;
}
}
@@ -750,14 +757,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
private void initPeerScan() {
// upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform
// missed cleanup
- Date cutTime = DateUtil.currentGMTTime();
- List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
+ final Date cutTime = DateUtil.currentGMTTime();
+ final List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
// We don't have foreign key constraints to enforce the mgmt_server_id integrity in host table, when user manually
// remove records from mshost table, this will leave orphan mgmt_serve_id reference in host table.
- List<Long> orphanList = _mshostDao.listOrphanMsids();
+ final List<Long> orphanList = _mshostDao.listOrphanMsids();
if (orphanList.size() > 0) {
- for (Long orphanMsid : orphanList) {
+ for (final Long orphanMsid : orphanList) {
// construct fake ManagementServerHostVO based on orphan MSID
s_logger.info("Add orphan management server msid found in host table to initial clustering notification, orphan msid: " + orphanMsid);
inactiveList.add(new ManagementServerHostVO(orphanMsid, 0, "orphan", 0, new Date()));
@@ -769,55 +776,57 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if (inactiveList.size() > 0) {
if (s_logger.isInfoEnabled()) {
s_logger.info("Found " + inactiveList.size() + " inactive management server node based on timestamp");
- for (ManagementServerHostVO host : inactiveList)
+ for (final ManagementServerHostVO host : inactiveList) {
s_logger.info("management server node msid: " + host.getMsid() + ", name: " + host.getName() + ", service ip: " + host.getServiceIP() +
- ", version: " + host.getVersion());
+ ", version: " + host.getVersion());
+ }
}
- List<ManagementServerHostVO> downHostList = new ArrayList<ManagementServerHostVO>();
- for (ManagementServerHostVO host : inactiveList) {
+ final List<ManagementServerHostVO> downHostList = new ArrayList<ManagementServerHostVO>();
+ for (final ManagementServerHostVO host : inactiveList) {
if (!pingManagementNode(host)) {
s_logger.warn("Management node " + host.getId() + " is detected inactive by timestamp and also not pingable");
downHostList.add(host);
}
}
- if (downHostList.size() > 0)
+ if (downHostList.size() > 0) {
queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, downHostList));
+ }
} else {
s_logger.info("No inactive management server node found");
}
}
private void peerScan() throws ActiveFencingException {
- Date cutTime = DateUtil.currentGMTTime();
+ final Date cutTime = DateUtil.currentGMTTime();
- Profiler profiler = new Profiler();
+ final Profiler profiler = new Profiler();
profiler.start();
- Profiler profilerQueryActiveList = new Profiler();
+ final Profiler profilerQueryActiveList = new Profiler();
profilerQueryActiveList.start();
- List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
+ final List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
profilerQueryActiveList.stop();
- Profiler profilerSyncClusterInfo = new Profiler();
+ final Profiler profilerSyncClusterInfo = new Profiler();
profilerSyncClusterInfo.start();
- List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
- List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
+ final List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
+ final List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
if (_mshostId != null) {
if (_mshostPeerDao.countStateSeenInPeers(_mshostId, _runId, ManagementServerHost.State.Down) > 0) {
- String msg =
- "We have detected that at least one management server peer reports that this management server is down, perform active fencing to avoid split-brain situation";
+ final String msg =
+ "We have detected that at least one management server peer reports that this management server is down, perform active fencing to avoid split-brain situation";
s_logger.error(msg);
throw new ActiveFencingException(msg);
}
// only if we have already attached to cluster, will we start to check leaving nodes
- for (Map.Entry<Long, ManagementServerHostVO> entry : _activePeers.entrySet()) {
+ for (final Map.Entry<Long, ManagementServerHostVO> entry : _activePeers.entrySet()) {
- ManagementServerHostVO current = getInListById(entry.getKey(), currentList);
+ final ManagementServerHostVO current = getInListById(entry.getKey(), currentList);
if (current == null) {
if (entry.getKey().longValue() != _mshostId.longValue()) {
if (s_logger.isDebugEnabled()) {
@@ -830,7 +839,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if (entry.getKey().longValue() != _mshostId.longValue()) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Detected management node left because of invalidated session, id:" + entry.getKey() + ", nodeIP:" +
- entry.getValue().getServiceIP());
+ entry.getValue().getServiceIP());
}
invalidatedNodeList.add(entry.getValue());
}
@@ -848,15 +857,15 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
profilerSyncClusterInfo.stop();
- Profiler profilerInvalidatedNodeList = new Profiler();
+ final Profiler profilerInvalidatedNodeList = new Profiler();
profilerInvalidatedNodeList.start();
// process invalidated node list
if (invalidatedNodeList.size() > 0) {
- for (ManagementServerHostVO mshost : invalidatedNodeList) {
+ for (final ManagementServerHostVO mshost : invalidatedNodeList) {
_activePeers.remove(mshost.getId());
try {
JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString());
}
}
@@ -865,18 +874,18 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
profilerInvalidatedNodeList.stop();
- Profiler profilerRemovedList = new Profiler();
+ final Profiler profilerRemovedList = new Profiler();
profilerRemovedList.start();
// process removed node list
- Iterator<ManagementServerHostVO> it = removedNodeList.iterator();
+ final Iterator<ManagementServerHostVO> it = removedNodeList.iterator();
while (it.hasNext()) {
- ManagementServerHostVO mshost = it.next();
+ final ManagementServerHostVO mshost = it.next();
if (!pingManagementNode(mshost)) {
s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable");
_activePeers.remove(mshost.getId());
try {
JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString());
}
} else {
@@ -890,8 +899,8 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
profilerRemovedList.stop();
- List<ManagementServerHostVO> newNodeList = new ArrayList<ManagementServerHostVO>();
- for (ManagementServerHostVO mshost : currentList) {
+ final List<ManagementServerHostVO> newNodeList = new ArrayList<ManagementServerHostVO>();
+ for (final ManagementServerHostVO mshost : currentList) {
if (!_activePeers.containsKey(mshost.getId())) {
_activePeers.put(mshost.getId(), mshost);
@@ -902,7 +911,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
try {
JmxUtil.registerMBean("ClusterManager", "Node " + mshost.getId(), new ClusterManagerMBeanImpl(this, mshost));
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.warn("Unable to regiester cluster node into JMX monitoring due to exception " + ExceptionUtil.toString(e));
}
}
@@ -915,15 +924,16 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
profiler.stop();
if (profiler.getDuration() >= HeartbeatInterval.value()) {
- if (s_logger.isDebugEnabled())
+ if (s_logger.isDebugEnabled()) {
s_logger.debug("Peer scan takes too long to finish. profiler: " + profiler.toString() + ", profilerQueryActiveList: " +
- profilerQueryActiveList.toString() + ", profilerSyncClusterInfo: " + profilerSyncClusterInfo.toString() + ", profilerInvalidatedNodeList: " +
- profilerInvalidatedNodeList.toString() + ", profilerRemovedList: " + profilerRemovedList.toString());
+ profilerQueryActiveList.toString() + ", profilerSyncClusterInfo: " + profilerSyncClusterInfo.toString() + ", profilerInvalidatedNodeList: " +
+ profilerInvalidatedNodeList.toString() + ", profilerRemovedList: " + profilerRemovedList.toString());
+ }
}
}
- private static ManagementServerHostVO getInListById(Long id, List<ManagementServerHostVO> l) {
- for (ManagementServerHostVO mshost : l) {
+ private static ManagementServerHostVO getInListById(final Long id, final List<ManagementServerHostVO> l) {
+ for (final ManagementServerHostVO mshost : l) {
if (mshost.getId() == id) {
return mshost;
}
@@ -938,12 +948,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
s_logger.info("Starting Cluster manager, msid : " + _msId);
}
- ManagementServerHostVO mshost = Transaction.execute(new TransactionCallback<ManagementServerHostVO>() {
+ final ManagementServerHostVO mshost = Transaction.execute(new TransactionCallback<ManagementServerHostVO>() {
@Override
- public ManagementServerHostVO doInTransaction(TransactionStatus status) {
+ public ManagementServerHostVO doInTransaction(final TransactionStatus status) {
final Class<?> c = this.getClass();
- String version = c.getPackage().getImplementationVersion();
+ final String version = c.getPackage().getImplementationVersion();
ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
if (mshost == null) {
@@ -964,7 +974,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
} else {
_mshostDao.update(mshost.getId(), _runId, NetUtils.getHostName(), version, _clusterNodeIP, _currentServiceAdapter.getServicePort(),
- DateUtil.currentGMTTime());
+ DateUtil.currentGMTTime());
if (s_logger.isInfoEnabled()) {
s_logger.info("Management server " + _msId + ", runId " + _runId + " is being started");
}
@@ -1000,7 +1010,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
if (_mshostId != null) {
- ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
+ final ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
mshost.setState(ManagementServerHost.State.Down);
_mshostDao.update(_mshostId, mshost);
}
@@ -1011,7 +1021,7 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
try {
_heartbeatScheduler.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
_executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
}
if (s_logger.isInfoEnabled()) {
@@ -1022,12 +1032,12 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
- public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
+ public boolean configure(final String name, final Map<String, Object> params) throws ConfigurationException {
if (s_logger.isInfoEnabled()) {
s_logger.info("Start configuring cluster manager : " + name);
}
- Properties dbProps = DbProperties.getDbProperties();
+ final Properties dbProps = DbProperties.getDbProperties();
_clusterNodeIP = dbProps.getProperty("cluster.node.IP");
if (_clusterNodeIP == null) {
_clusterNodeIP = "127.0.0.1";
@@ -1042,8 +1052,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration");
}
- for (int i = 0; i < DEFAULT_OUTGOING_WORKERS; i++)
+ for (int i = 0; i < DEFAULT_OUTGOING_WORKERS; i++) {
_executor.execute(getClusterPduSendingTask());
+ }
// notification task itself in turn works as a task dispatcher
_executor.execute(getClusterPduNotificationTask());
@@ -1076,16 +1087,16 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
@Override
- public long getManagementRunId(long msId) {
- ManagementServerHostVO mshost = _mshostDao.findByMsid(msId);
+ public long getManagementRunId(final long msId) {
+ final ManagementServerHostVO mshost = _mshostDao.findByMsid(msId);
if (mshost != null) {
return mshost.getRunid();
}
return -1;
}
- public boolean isManagementNodeAlive(long msid) {
- ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
+ public boolean isManagementNodeAlive(final long msid) {
+ final ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
if (mshost != null) {
if (mshost.getLastUpdateTime().getTime() >= DateUtil.currentGMTTime().getTime() - HeartbeatThreshold.value()) {
return true;
@@ -1095,8 +1106,8 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return false;
}
- public boolean pingManagementNode(long msid) {
- ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
+ public boolean pingManagementNode(final long msid) {
+ final ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
if (mshost == null) {
return false;
}
@@ -1114,9 +1125,9 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
return new ConfigKey<?>[] {HeartbeatInterval, HeartbeatThreshold};
}
- private boolean pingManagementNode(ManagementServerHostVO mshost) {
+ private boolean pingManagementNode(final ManagementServerHostVO mshost) {
- String targetIp = mshost.getServiceIP();
+ final String targetIp = mshost.getServiceIP();
if ("127.0.0.1".equals(targetIp) || "0.0.0.0".equals(targetIp)) {
s_logger.info("ping management node cluster service can not be performed on self");
return false;
@@ -1131,10 +1142,10 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
sch.configureBlocking(true);
sch.socket().setSoTimeout(5000);
- InetSocketAddress addr = new InetSocketAddress(targetIp, mshost.getServicePort());
+ final InetSocketAddress addr = new InetSocketAddress(targetIp, mshost.getServicePort());
sch.connect(addr);
return true;
- } catch (IOException e) {
+ } catch (final IOException e) {
if (e instanceof ConnectException) {
s_logger.error("Unable to ping management server at " + targetIp + ":" + mshost.getServicePort() + " due to ConnectException", e);
return false;
@@ -1143,14 +1154,14 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
if (sch != null) {
try {
sch.close();
- } catch (IOException e) {
+ } catch (final IOException e) {
}
}
}
try {
Thread.sleep(1000);
- } catch (InterruptedException ex) {
+ } catch (final InterruptedException ex) {
}
}
@@ -1163,31 +1174,31 @@ public class ClusterManagerImpl extends ManagerBase implements ClusterManager, C
}
private void checkConflicts() throws ConfigurationException {
- Date cutTime = DateUtil.currentGMTTime();
- List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
- for (ManagementServerHostVO peer : peers) {
- String peerIP = peer.getServiceIP().trim();
+ final Date cutTime = DateUtil.currentGMTTime();
+ final List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - HeartbeatThreshold.value()));
+ for (final ManagementServerHostVO peer : peers) {
+ final String peerIP = peer.getServiceIP().trim();
if (_clusterNodeIP.equals(peerIP)) {
if ("127.0.0.1".equals(_clusterNodeIP)) {
if (pingManagementNode(peer.getMsid())) {
- String msg = "Detected another management node with localhost IP is already running, please check your cluster configuration";
+ final String msg = "Detected another management node with localhost IP is already running, please check your cluster configuration";
s_logger.error(msg);
throw new ConfigurationException(msg);
} else {
- String msg =
- "Detected another management node with localhost IP is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
+ final String msg =
+ "Detected another management node with localhost IP is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
s_logger.info(msg);
}
} else {
if (pingManagementNode(peer.getMsid())) {
- String msg =
- "Detected that another management node with the same IP " + peer.getServiceIP() +
+ final String msg =
+ "Detected that another management node with the same IP " + peer.getServiceIP() +
" is already running, please check your cluster configuration";
s_logger.error(msg);
throw new ConfigurationException(msg);
} else {
- String msg =
- "Detected that another management node with the same IP " + peer.getServiceIP() +
+ final String msg =
+ "Detected that another management node with the same IP " + peer.getServiceIP() +
" is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
s_logger.info(msg);
}