You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ah...@apache.org on 2013/07/26 05:11:06 UTC

[3/8] Moved ClusterManager into it's own package. Removed the agent load balancing code.

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java
deleted file mode 100755
index 5a08f87..0000000
--- a/server/src/com/cloud/cluster/ClusterManagerImpl.java
+++ /dev/null
@@ -1,1517 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SocketChannel;
-import java.rmi.RemoteException;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.sql.SQLRecoverableException;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import javax.ejb.Local;
-import javax.inject.Inject;
-import javax.naming.ConfigurationException;
-
-import org.apache.log4j.Logger;
-
-import com.cloud.agent.AgentManager;
-import com.cloud.agent.AgentManager.OnError;
-import com.cloud.agent.api.Answer;
-import com.cloud.agent.api.ChangeAgentAnswer;
-import com.cloud.agent.api.ChangeAgentCommand;
-import com.cloud.agent.api.Command;
-import com.cloud.agent.api.PropagateResourceEventCommand;
-import com.cloud.agent.api.TransferAgentCommand;
-import com.cloud.agent.api.ScheduleHostScanTaskCommand;
-import com.cloud.agent.manager.ClusteredAgentManagerImpl;
-import com.cloud.agent.manager.Commands;
-import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
-import com.cloud.cluster.dao.ManagementServerHostDao;
-import com.cloud.cluster.dao.ManagementServerHostPeerDao;
-import com.cloud.configuration.Config;
-import com.cloud.configuration.dao.ConfigurationDao;
-import com.cloud.exception.AgentUnavailableException;
-import com.cloud.exception.OperationTimedoutException;
-import com.cloud.host.Host;
-import com.cloud.host.HostVO;
-import com.cloud.host.Status.Event;
-import com.cloud.host.dao.HostDao;
-import com.cloud.resource.ResourceManager;
-import com.cloud.resource.ResourceState;
-import com.cloud.serializer.GsonHelper;
-import com.cloud.utils.DateUtil;
-import com.cloud.utils.NumbersUtil;
-import com.cloud.utils.Profiler;
-import com.cloud.utils.PropertiesUtil;
-import com.cloud.utils.component.ComponentContext;
-import com.cloud.utils.component.ComponentLifecycle;
-import com.cloud.utils.component.ManagerBase;
-import com.cloud.utils.concurrency.NamedThreadFactory;
-import com.cloud.utils.db.ConnectionConcierge;
-import com.cloud.utils.db.DB;
-import com.cloud.utils.db.SearchCriteria.Op;
-import com.cloud.utils.db.SearchCriteria2;
-import com.cloud.utils.db.SearchCriteriaService;
-import com.cloud.utils.db.Transaction;
-import com.cloud.utils.events.SubscriptionMgr;
-import com.cloud.utils.exception.CloudRuntimeException;
-import com.cloud.utils.exception.ExceptionUtil;
-import com.cloud.utils.mgmt.JmxUtil;
-import com.cloud.utils.net.NetUtils;
-
-import com.google.gson.Gson;
-
-import org.apache.cloudstack.utils.identity.ManagementServerNode;
-
-@Local(value = { ClusterManager.class })
-public class ClusterManagerImpl extends ManagerBase implements ClusterManager {
-    private static final Logger s_logger = Logger.getLogger(ClusterManagerImpl.class);
-
-    private static final int EXECUTOR_SHUTDOWN_TIMEOUT = 1000; // 1 second
-    private static final int DEFAULT_OUTGOING_WORKERS = 5; 
-
-    private final List<ClusterManagerListener> _listeners = new ArrayList<ClusterManagerListener>();
-    private final Map<Long, ManagementServerHostVO> _activePeers = new HashMap<Long, ManagementServerHostVO>();
-    private int _heartbeatInterval = ClusterManager.DEFAULT_HEARTBEAT_INTERVAL;
-    private int _heartbeatThreshold = ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD;
-
-    private final Map<String, ClusterService> _clusterPeers;
-    private final Gson _gson;
-
-    @Inject
-    private AgentManager _agentMgr;
-    @Inject
-    private ClusteredAgentRebalanceService _rebalanceService;
-    @Inject
-    private ResourceManager _resourceMgr;
-
-    private final ScheduledExecutorService _heartbeatScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Cluster-Heartbeat"));
-    private final ExecutorService _notificationExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("Cluster-Notification"));
-    private final List<ClusterManagerMessage> _notificationMsgs = new ArrayList<ClusterManagerMessage>();
-    private ConnectionConcierge _heartbeatConnection = null;
-
-    private final ExecutorService _executor;
-
-    private ClusterServiceAdapter _currentServiceAdapter;
-
-    @Inject
-    private List<ClusterServiceAdapter> _serviceAdapters;
-
-    @Inject private ManagementServerHostDao _mshostDao;
-    @Inject private ManagementServerHostPeerDao _mshostPeerDao;
-    @Inject private HostDao _hostDao;
-    @Inject private HostTransferMapDao _hostTransferDao;
-    @Inject private ConfigurationDao _configDao;
-
-    //
-    // pay attention to _mshostId and _msid
-    // _mshostId is the primary key of management host table
-    // _msid is the unique persistent identifier that peer name is based upon
-    //
-    private Long _mshostId = null;
-    protected long _msId = ManagementServerNode.getManagementServerId();
-    protected long _runId = System.currentTimeMillis();
-
-    private boolean _peerScanInited = false;
-
-    private String _clusterNodeIP = "127.0.0.1";
-    private boolean _agentLBEnabled = false;
-    private double _connectedAgentsThreshold = 0.7;
-    private static boolean _agentLbHappened = false;
-    
-    private final List<ClusterServicePdu> _clusterPduOutgoingQueue = new ArrayList<ClusterServicePdu>();
-    private final List<ClusterServicePdu> _clusterPduIncomingQueue = new ArrayList<ClusterServicePdu>();
-    private final Map<Long, ClusterServiceRequestPdu> _outgoingPdusWaitingForAck = new HashMap<Long, ClusterServiceRequestPdu>();
-    
-    public ClusterManagerImpl() {
-        _clusterPeers = new HashMap<String, ClusterService>();
-
-        _gson = GsonHelper.getGson();
-
-        // executor to perform remote-calls in another thread context, to avoid potential
-        // recursive remote calls between nodes
-        //
-        _executor = Executors.newCachedThreadPool(new NamedThreadFactory("Cluster-Worker"));
-        setRunLevel(ComponentLifecycle.RUN_LEVEL_FRAMEWORK);
-    }
-    
-    private void registerRequestPdu(ClusterServiceRequestPdu pdu) {
-        synchronized(_outgoingPdusWaitingForAck) {
-            _outgoingPdusWaitingForAck.put(pdu.getSequenceId(), pdu);
-        }
-    }
-    
-    private ClusterServiceRequestPdu popRequestPdu(long ackSequenceId) {
-        synchronized(_outgoingPdusWaitingForAck) {
-            if(_outgoingPdusWaitingForAck.get(ackSequenceId) != null) {
-                ClusterServiceRequestPdu pdu = _outgoingPdusWaitingForAck.get(ackSequenceId);
-                _outgoingPdusWaitingForAck.remove(ackSequenceId);
-                return pdu;
-            }
-        }
-        
-        return null;
-    }
-    
-    private void cancelClusterRequestToPeer(String strPeer) {
-        List<ClusterServiceRequestPdu> candidates = new ArrayList<ClusterServiceRequestPdu>();
-        synchronized(_outgoingPdusWaitingForAck) {
-            for(Map.Entry<Long, ClusterServiceRequestPdu> entry : _outgoingPdusWaitingForAck.entrySet()) {
-                if(entry.getValue().getDestPeer().equalsIgnoreCase(strPeer))
-                    candidates.add(entry.getValue());
-            }
-
-            for(ClusterServiceRequestPdu pdu : candidates) {
-                _outgoingPdusWaitingForAck.remove(pdu.getSequenceId());
-            }
-        }
-        
-        for(ClusterServiceRequestPdu pdu : candidates) {
-            s_logger.warn("Cancel cluster request PDU to peer: " + strPeer + ", pdu: " + _gson.toJson(pdu));
-            synchronized(pdu) {
-                pdu.notifyAll();
-            }
-        }
-    }
-    
-    private void addOutgoingClusterPdu(ClusterServicePdu pdu) {
-    	synchronized(_clusterPduOutgoingQueue) {
-    		_clusterPduOutgoingQueue.add(pdu);
-    		_clusterPduOutgoingQueue.notifyAll();
-    	}
-    }
-    
-    private ClusterServicePdu popOutgoingClusterPdu(long timeoutMs) {
-    	synchronized(_clusterPduOutgoingQueue) {
-    		try {
-				_clusterPduOutgoingQueue.wait(timeoutMs);
-			} catch (InterruptedException e) {
-			}
-			
-			if(_clusterPduOutgoingQueue.size() > 0) {
-				ClusterServicePdu pdu = _clusterPduOutgoingQueue.get(0);
-				_clusterPduOutgoingQueue.remove(0);
-				return pdu;
-			}
-    	}
-    	return null;
-    }
-
-    private void addIncomingClusterPdu(ClusterServicePdu pdu) {
-    	synchronized(_clusterPduIncomingQueue) {
-    		_clusterPduIncomingQueue.add(pdu);
-    		_clusterPduIncomingQueue.notifyAll();
-    	}
-    }
-    
-    private ClusterServicePdu popIncomingClusterPdu(long timeoutMs) {
-    	synchronized(_clusterPduIncomingQueue) {
-    		try {
-    			_clusterPduIncomingQueue.wait(timeoutMs);
-			} catch (InterruptedException e) {
-			}
-			
-			if(_clusterPduIncomingQueue.size() > 0) {
-				ClusterServicePdu pdu = _clusterPduIncomingQueue.get(0);
-				_clusterPduIncomingQueue.remove(0);
-				return pdu;
-			}
-    	}
-    	return null;
-    }
-    
-    private Runnable getClusterPduSendingTask() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                onSendingClusterPdu();
-            }
-        };
-    }
-    
-    private Runnable getClusterPduNotificationTask() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                onNotifyingClusterPdu();
-            }
-        };
-    }
-    
-    private void onSendingClusterPdu() {
-        while(true) {
-            try {
-                ClusterServicePdu pdu = popOutgoingClusterPdu(1000);
-                if(pdu == null)
-                	continue;
-                	
-                ClusterService peerService =  null;
-                for(int i = 0; i < 2; i++) {
-                    try {
-                        peerService = getPeerService(pdu.getDestPeer());
-                    } catch (RemoteException e) {
-                        s_logger.error("Unable to get cluster service on peer : " + pdu.getDestPeer());
-                    }
-
-                    if(peerService != null) {
-                        try {
-                            if(s_logger.isDebugEnabled()) {
-                                s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + ". agent: " + pdu.getAgentId() 
-                                    + ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
-                            }
-
-                            long startTick = System.currentTimeMillis();
-                            String strResult = peerService.execute(pdu);
-                            if(s_logger.isDebugEnabled()) {
-                                s_logger.debug("Cluster PDU " + getSelfPeerName() + " -> " + pdu.getDestPeer() + " completed. time: " +
-                                    (System.currentTimeMillis() - startTick) + "ms. agent: " + pdu.getAgentId() 
-                                     + ", pdu seq: " + pdu.getSequenceId() + ", pdu ack seq: " + pdu.getAckSequenceId() + ", json: " + pdu.getJsonPackage());
-                            }
-                            
-                            if("true".equals(strResult))
-                                break;
-                            
-                        } catch (RemoteException e) {
-                            invalidatePeerService(pdu.getDestPeer());
-                            if(s_logger.isInfoEnabled()) {
-                                s_logger.info("Exception on remote execution, peer: " + pdu.getDestPeer() + ", iteration: "
-                                        + i + ", exception message :" + e.getMessage());
-                            }
-                        }
-                    }
-                }
-            } catch(Throwable e) {
-                s_logger.error("Unexcpeted exception: ", e);
-            }
-        }
-    }
-    
-    private void onNotifyingClusterPdu() {
-        while(true) {
-            try {
-                final ClusterServicePdu pdu = popIncomingClusterPdu(1000);
-                if(pdu == null)
-                	continue;
-
-                _executor.execute(new Runnable() {
-                    @Override
-                	public void run() {
-		                if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_RESPONSE) {
-		                    ClusterServiceRequestPdu requestPdu = popRequestPdu(pdu.getAckSequenceId());
-		                    if(requestPdu != null) {
-		                        requestPdu.setResponseResult(pdu.getJsonPackage());
-		                        synchronized(requestPdu) {
-		                            requestPdu.notifyAll();
-		                        }
-		                    } else {
-		                        s_logger.warn("Original request has already been cancelled. pdu: " + _gson.toJson(pdu));
-		                    }
-		                } else {
-		                    String result = dispatchClusterServicePdu(pdu);
-		                    if(result == null)
-		                        result = "";
-		                    
-		                    if(pdu.getPduType() == ClusterServicePdu.PDU_TYPE_REQUEST) {
-			                    ClusterServicePdu responsePdu = new ClusterServicePdu();
-			                    responsePdu.setPduType(ClusterServicePdu.PDU_TYPE_RESPONSE);
-			                    responsePdu.setSourcePeer(pdu.getDestPeer());
-			                    responsePdu.setDestPeer(pdu.getSourcePeer());
-			                    responsePdu.setAckSequenceId(pdu.getSequenceId());
-			                    responsePdu.setJsonPackage(result);
-			                    
-			                    addOutgoingClusterPdu(responsePdu);
-		                    }
-		                }
-                	}
-                });
-            } catch(Throwable e) {
-                s_logger.error("Unexcpeted exception: ", e);
-            }
-        }
-    }
-
-    private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) {
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd));
-        }
-
-        try {
-            // schedule a scan task immediately
-            if (_agentMgr instanceof ClusteredAgentManagerImpl) {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Received notification as part of addHost command to start a host scan task");
-                }
-                ClusteredAgentManagerImpl clusteredAgentMgr = (ClusteredAgentManagerImpl)_agentMgr;
-                clusteredAgentMgr.scheduleHostScanTask();
-            }
-        } catch (Exception e) {
-            // Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
-            // happens at fixed intervals anyways. So handling any exceptions that may be thrown
-            s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + getSelfPeerName() + ", ignoring as regular host scan happens at fixed interval anyways", e);
-            return null;
-        }
-
-        Answer[] answers = new Answer[1];
-        answers[0] = new Answer(cmd, true, null);
-        return _gson.toJson(answers);
-    }
-
-    private String dispatchClusterServicePdu(ClusterServicePdu pdu) {
-
-        if(s_logger.isDebugEnabled()) {
-            s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
-        }
-
-        Command [] cmds = null;
-        try {
-            cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class);
-        } catch(Throwable e) {
-            assert(false);
-            s_logger.error("Excection in gson decoding : ", e);
-        }
-        
-        if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) {  //intercepted
-            ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
-
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
-            }
-            boolean result = false;
-            try {
-                result = executeAgentUserRequest(cmd.getAgentId(), cmd.getEvent());
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Result is " + result);
-                }
-
-            } catch (AgentUnavailableException e) {
-                s_logger.warn("Agent is unavailable", e);
-                return null;
-            }
-
-            Answer[] answers = new Answer[1];
-            answers[0] = new ChangeAgentAnswer(cmd, result);
-            return _gson.toJson(answers);
-        } else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
-            TransferAgentCommand cmd = (TransferAgentCommand) cmds[0];
-
-            if (s_logger.isDebugEnabled()) {
-                s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
-            }
-            boolean result = false;
-            try {
-                result = rebalanceAgent(cmd.getAgentId(), cmd.getEvent(), cmd.getCurrentOwner(), cmd.getFutureOwner());
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Result is " + result);
-                }
-
-            } catch (AgentUnavailableException e) {
-                s_logger.warn("Agent is unavailable", e);
-                return null;
-            } catch (OperationTimedoutException e) {
-                s_logger.warn("Operation timed out", e);
-                return null;
-            }
-            Answer[] answers = new Answer[1];
-            answers[0] = new Answer(cmd, result, null);
-            return _gson.toJson(answers);
-        } else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand ) {
-        	PropagateResourceEventCommand cmd = (PropagateResourceEventCommand) cmds[0];
-        	
-        	s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId());
-        	
-        	boolean result = false;
-        	try {
-        		result = executeResourceUserRequest(cmd.getHostId(), cmd.getEvent());
-        		s_logger.debug("Result is " + result);
-        	} catch (AgentUnavailableException ex) {
-        		s_logger.warn("Agent is unavailable", ex);
-        		return null;
-        	}
-        	
-        	Answer[] answers = new Answer[1];
-        	answers[0] = new Answer(cmd, result, null);
-        	return _gson.toJson(answers);
-        } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
-            ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0];
-            String response = handleScheduleHostScanTaskCommand(cmd);
-            return response;
-        }
-
-        try {
-            long startTick = System.currentTimeMillis();
-            if(s_logger.isDebugEnabled()) {
-                s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
-            }
-
-            Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
-            if(answers != null) {
-                String jsonReturn =  _gson.toJson(answers);
-
-                if(s_logger.isDebugEnabled()) {
-                    s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
-                            " in " + (System.currentTimeMillis() - startTick) + " ms, return result: " + jsonReturn);
-                }
-
-                return jsonReturn;
-            } else {
-                if(s_logger.isDebugEnabled()) {
-                    s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() +
-                            " in " + (System.currentTimeMillis() - startTick) + " ms, return null result");
-                }
-            }
-        } catch(AgentUnavailableException e) {
-            s_logger.warn("Agent is unavailable", e);
-        } catch (OperationTimedoutException e) {
-            s_logger.warn("Timed Out", e);
-        }
-        
-        return null;
-    }
-
-    @Override
-    public void OnReceiveClusterServicePdu(ClusterServicePdu pdu) {
-    	addIncomingClusterPdu(pdu);
-    }
-    
-    @Override
-    public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
-        Commands commands = new Commands(stopOnError ? OnError.Stop : OnError.Continue);
-        for (Command cmd : cmds) {
-            commands.addCommand(cmd);
-        }
-        return _agentMgr.send(hostId, commands);
-    }
-
-    @Override
-    public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
-        return _agentMgr.executeUserRequest(agentId, event);
-    }
-
-    @Override
-    public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
-        final String msPeer = getPeerName(agentId);
-        if (msPeer == null) {
-            return null;
-        }
-
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
-        }
-        Command[] cmds = new Command[1];
-        cmds[0] = new ChangeAgentCommand(agentId, event);
-
-        Answer[] answers = execute(msPeer, agentId, cmds, true);
-        if (answers == null) {
-            throw new AgentUnavailableException(agentId);
-        }
-
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Result for agent change is " + answers[0].getResult());
-        }
-
-        return answers[0].getResult();
-    }
-
-    /**
-     * called by DatabaseUpgradeChecker to see if there are other peers running.
-     * 
-     * @param notVersion
-     *            If version is passed in, the peers CANNOT be running at this version. If version is null, return true if any
-     *            peer is running regardless of version.
-     * @return true if there are peers running and false if not.
-     */
-    public static final boolean arePeersRunning(String notVersion) {
-        return false; // TODO: Leaving this for Kelven to take care of.
-    }
-
-    @Override
-    public void broadcast(long agentId, Command[] cmds) {
-        Date cutTime = DateUtil.currentGMTTime();
-
-        List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
-        for (ManagementServerHostVO peer : peers) {
-            String peerName = Long.toString(peer.getMsid());
-            if (getSelfPeerName().equals(peerName)) {
-                continue; // Skip myself.
-            }
-            try {
-                if (s_logger.isDebugEnabled()) {
-                    s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer.getMsid());
-                }
-                executeAsync(peerName, agentId, cmds, true);
-            } catch (Exception e) {
-                s_logger.warn("Caught exception while talkign to " + peer.getMsid());
-            }
-        }
-    }
-
-    @Override
-    public void executeAsync(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
-        ClusterServicePdu pdu = new ClusterServicePdu();
-        pdu.setSourcePeer(getSelfPeerName());
-        pdu.setDestPeer(strPeer);
-        pdu.setAgentId(agentId);
-        pdu.setJsonPackage(_gson.toJson(cmds, Command[].class));
-        pdu.setStopOnError(true);
-        addOutgoingClusterPdu(pdu);
-    }
-
-    @Override
-    public Answer[] execute(String strPeer, long agentId, Command [] cmds, boolean stopOnError) {
-        if(s_logger.isDebugEnabled()) {
-            s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " " +
-                    _gson.toJson(cmds, Command[].class));
-        }
-        
-        ClusterServiceRequestPdu pdu = new ClusterServiceRequestPdu();
-        pdu.setSourcePeer(getSelfPeerName());
-        pdu.setDestPeer(strPeer);
-        pdu.setAgentId(agentId);
-        pdu.setJsonPackage(_gson.toJson(cmds, Command[].class));
-        pdu.setStopOnError(stopOnError);
-        registerRequestPdu(pdu);
-        addOutgoingClusterPdu(pdu);
-        
-        synchronized(pdu) {
-            try {
-                pdu.wait();
-            } catch (InterruptedException e) {
-            }
-        }
-
-        if(s_logger.isDebugEnabled()) {
-            s_logger.debug(getSelfPeerName() + " -> " + strPeer + "." + agentId + " completed. result: " +
-                pdu.getResponseResult());
-        }
-        
-        if(pdu.getResponseResult() != null && pdu.getResponseResult().length() > 0) {
-            try {
-                return _gson.fromJson(pdu.getResponseResult(), Answer[].class);
-            } catch(Throwable e) {
-                s_logger.error("Exception on parsing gson package from remote call to " + strPeer);
-            }
-        }
-
-        return null;
-    }
-    
-    @Override
-    public String getPeerName(long agentHostId) {
-
-        HostVO host = _hostDao.findById(agentHostId);
-        if(host != null && host.getManagementServerId() != null) {
-            if(getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) {
-                return null;
-            }
-
-            return Long.toString(host.getManagementServerId());
-        }
-        return null;
-    }
-
-    @Override
-    public ManagementServerHostVO getPeer(String mgmtServerId) {
-        return _mshostDao.findByMsid(Long.valueOf(mgmtServerId));
-    }
-
-    @Override
-    public String getSelfPeerName() {
-        return Long.toString(_msId);
-    }
-
-    @Override
-    public String getSelfNodeIP() {
-        return _clusterNodeIP;
-    }
-
-    @Override
-    public void registerListener(ClusterManagerListener listener) {
-        // Note : we don't check duplicates
-        synchronized (_listeners) {
-
-    		s_logger.info("register cluster listener " + listener.getClass());
-    		
-        	_listeners.add(listener);
-        }
-    }
-
-    @Override
-    public void unregisterListener(ClusterManagerListener listener) {
-        synchronized(_listeners) {
-    		s_logger.info("unregister cluster listener " + listener.getClass());
-        	
-        	_listeners.remove(listener);
-        }
-    }
-
-    public void notifyNodeJoined(List<ManagementServerHostVO> nodeList) {
-        if(s_logger.isDebugEnabled()) {
-            s_logger.debug("Notify management server node join to listeners.");
-
-            for(ManagementServerHostVO mshost : nodeList) {
-                s_logger.debug("Joining node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
-            }
-        }
-
-        synchronized(_listeners) {
-            for(ClusterManagerListener listener : _listeners) {
-                listener.onManagementNodeJoined(nodeList, _mshostId);
-            }
-        }
-
-        SubscriptionMgr.getInstance().notifySubscribers(ClusterManager.ALERT_SUBJECT, this,
-                new ClusterNodeJoinEventArgs(_mshostId, nodeList));
-    }
-
-    public void notifyNodeLeft(List<ManagementServerHostVO> nodeList) {
-        if(s_logger.isDebugEnabled()) {
-            s_logger.debug("Notify management server node left to listeners.");
-        }
-        
-        for(ManagementServerHostVO mshost : nodeList) {
-            if(s_logger.isDebugEnabled())
-                s_logger.debug("Leaving node, IP: " + mshost.getServiceIP() + ", msid: " + mshost.getMsid());
-            cancelClusterRequestToPeer(String.valueOf(mshost.getMsid()));
-        }
-
-        synchronized(_listeners) {
-            for(ClusterManagerListener listener : _listeners) {
-                listener.onManagementNodeLeft(nodeList, _mshostId);
-            }
-        }
-
-        SubscriptionMgr.getInstance().notifySubscribers(ClusterManager.ALERT_SUBJECT, this,
-                new ClusterNodeLeftEventArgs(_mshostId, nodeList));
-    }
-
-    public void notifyNodeIsolated() {
-        if(s_logger.isDebugEnabled())
-            s_logger.debug("Notify management server node isolation to listeners");
-
-        synchronized(_listeners) {
-            for(ClusterManagerListener listener : _listeners) {
-                listener.onManagementNodeIsolated();
-            }
-        }
-    }
-
-    public ClusterService getPeerService(String strPeer) throws RemoteException {
-        synchronized(_clusterPeers) {
-            if(_clusterPeers.containsKey(strPeer)) {
-                return _clusterPeers.get(strPeer);
-            }
-        }
-
-        ClusterService service = _currentServiceAdapter.getPeerService(strPeer);
-
-        if(service != null) {
-            synchronized(_clusterPeers) {
-                // re-check the peer map again to deal with the
-                // race conditions
-                if(!_clusterPeers.containsKey(strPeer)) {
-                    _clusterPeers.put(strPeer, service);
-                }
-            }
-        }
-
-        return service;
-    }
-
-    public void invalidatePeerService(String strPeer) {
-        synchronized(_clusterPeers) {
-            if(_clusterPeers.containsKey(strPeer)) {
-                _clusterPeers.remove(strPeer);
-            }
-        }
-    }
-
-    private Runnable getHeartbeatTask() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                Transaction txn = Transaction.open("ClusterHeartBeat");
-                try {
-                    Profiler profiler = new Profiler();
-                    Profiler profilerHeartbeatUpdate = new Profiler();
-                    Profiler profilerPeerScan = new Profiler();
-                    Profiler profilerAgentLB = new Profiler();
-                    
-                    try {
-                        profiler.start();
-                        
-                        profilerHeartbeatUpdate.start();
-                        txn.transitToUserManagedConnection(getHeartbeatConnection());
-                        if(s_logger.isTraceEnabled()) {
-                            s_logger.trace("Cluster manager heartbeat update, id:" + _mshostId);
-                        }
-    
-                        _mshostDao.update(_mshostId, getCurrentRunId(), DateUtil.currentGMTTime());
-                        profilerHeartbeatUpdate.stop();
-    
-                        profilerPeerScan.start();
-                        if (s_logger.isTraceEnabled()) {
-                            s_logger.trace("Cluster manager peer-scan, id:" + _mshostId);
-                        }
-    
-                        if (!_peerScanInited) {
-                            _peerScanInited = true;
-                            initPeerScan();
-                        }
-                        
-                        peerScan();
-                        profilerPeerScan.stop();
-                        
-                        profilerAgentLB.start();
-                        //initiate agent lb task will be scheduled and executed only once, and only when number of agents loaded exceeds _connectedAgentsThreshold
-                        if (_agentLBEnabled && !_agentLbHappened) {
-                            SearchCriteriaService<HostVO, HostVO> sc = SearchCriteria2.create(HostVO.class);
-                            sc.addAnd(sc.getEntity().getManagementServerId(), Op.NNULL);
-                            sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
-                            List<HostVO> allManagedRoutingAgents = sc.list();
-                            
-                            sc = SearchCriteria2.create(HostVO.class);
-                            sc.addAnd(sc.getEntity().getType(), Op.EQ, Host.Type.Routing);
-                            List<HostVO> allAgents = sc.list();
-                            double allHostsCount = allAgents.size();
-                            double managedHostsCount = allManagedRoutingAgents.size();
-                            if (allHostsCount > 0.0) {
-                                double load = managedHostsCount/allHostsCount;
-                                if (load >= _connectedAgentsThreshold) {
-                                    s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " + _connectedAgentsThreshold);
-                                    _rebalanceService.scheduleRebalanceAgents();
-                                    _agentLbHappened = true;
-                                } else {
-                                    s_logger.trace("Not scheduling agent rebalancing task as the averages load " + load + " is less than the threshold " + _connectedAgentsThreshold);
-                                }
-                            } 
-                        }
-                        profilerAgentLB.stop();
-                    } finally {
-                        profiler.stop();
-                        
-                        if(profiler.getDuration() >= _heartbeatInterval) {
-                            if(s_logger.isDebugEnabled())
-                                s_logger.debug("Management server heartbeat takes too long to finish. profiler: " + profiler.toString() + 
-                                    ", profilerHeartbeatUpdate: " + profilerHeartbeatUpdate.toString() +
-                                    ", profilerPeerScan: " + profilerPeerScan.toString() +
-                                    ", profilerAgentLB: " + profilerAgentLB.toString());
-                        }
-                    }
-                    
-                } catch(CloudRuntimeException e) {
-                    s_logger.error("Runtime DB exception ", e.getCause());
-
-                    if(e.getCause() instanceof ClusterInvalidSessionException) {
-                        s_logger.error("Invalid cluster session found, fence it");
-                        queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
-                    }
-
-                    if(isRootCauseConnectionRelated(e.getCause())) {
-                        s_logger.error("DB communication problem detected, fence it");
-                        queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
-                    }
-
-                    invalidHeartbeatConnection();
-                } catch(ActiveFencingException e) {
-                    queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
-                } catch (Throwable e) {
-                    s_logger.error("Unexpected exception in cluster heartbeat", e);
-                    if(isRootCauseConnectionRelated(e.getCause())) {
-                        s_logger.error("DB communication problem detected, fence it");
-                        queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeIsolated));
-                    }
-
-                    invalidHeartbeatConnection();
-                } finally {
-                    txn.transitToAutoManagedConnection(Transaction.CLOUD_DB);                         
-                    txn.close("ClusterHeartBeat");           	
-                }
-            }
-        };
-    }
-
-    private boolean isRootCauseConnectionRelated(Throwable e) {
-        while (e != null) {
-            if (e instanceof SQLRecoverableException) {
-                return true;
-            }
-
-            e = e.getCause();
-        }
-
-        return false;
-    }
-
-    private Connection getHeartbeatConnection() throws SQLException {
-        if(_heartbeatConnection == null) {
-            Connection conn = Transaction.getStandaloneConnectionWithException();
-            _heartbeatConnection = new ConnectionConcierge("ClusterManagerHeartBeat", conn, false);
-        }
-
-        return _heartbeatConnection.conn();
-    }
-
-    private void invalidHeartbeatConnection() {
-        if(_heartbeatConnection != null) {
-            Connection conn = Transaction.getStandaloneConnection();
-            if (conn != null) {
-                _heartbeatConnection.reset(Transaction.getStandaloneConnection());
-            }
-        }
-    }
-
-    private Runnable getNotificationTask() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                while(true) {
-                    synchronized(_notificationMsgs) {
-                        try {
-                            _notificationMsgs.wait(1000);
-                        } catch (InterruptedException e) {
-                        }
-                    }
-
-                    ClusterManagerMessage msg = null;
-                    while((msg = getNextNotificationMessage()) != null) {
-                        try {
-                            switch(msg.getMessageType()) {
-                            case nodeAdded:
-                                if(msg.getNodes() != null && msg.getNodes().size() > 0) {
-                                    Profiler profiler = new Profiler();
-                                    profiler.start();
-
-                                    notifyNodeJoined(msg.getNodes());
-
-                                    profiler.stop();
-                                    if(profiler.getDuration() > 1000) {
-                                        if(s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Notifying management server join event took " + profiler.getDuration() + " ms");
-                                        }
-                                    } else {
-                                        s_logger.warn("Notifying management server join event took " + profiler.getDuration() + " ms");
-                                    }
-                                }
-                                break;
-
-                            case nodeRemoved:
-                                if(msg.getNodes() != null && msg.getNodes().size() > 0) {
-                                    Profiler profiler = new Profiler();
-                                    profiler.start();
-
-                                    notifyNodeLeft(msg.getNodes());
-
-                                    profiler.stop();
-                                    if(profiler.getDuration() > 1000) {
-                                        if(s_logger.isDebugEnabled()) {
-                                            s_logger.debug("Notifying management server leave event took " + profiler.getDuration() + " ms");
-                                        }
-                                    } else {
-                                        s_logger.warn("Notifying management server leave event took " + profiler.getDuration() + " ms");
-                                    }
-                                }
-                                break;
-
-                            case nodeIsolated:
-                                notifyNodeIsolated();
-                                break;
-
-                            default :
-                                assert(false);
-                                break;
-                            }
-
-                        } catch (Throwable e) {
-                            s_logger.warn("Unexpected exception during cluster notification. ", e);
-                        }
-                    }
-
-                    try { Thread.sleep(1000); } catch (InterruptedException e) {}
-                }
-            }
-        };
-    }
-
-    private void queueNotification(ClusterManagerMessage msg) {
-        synchronized(this._notificationMsgs) {
-            this._notificationMsgs.add(msg);
-            this._notificationMsgs.notifyAll();
-        }
-        
-        switch(msg.getMessageType()) {
-        case nodeAdded:
-            {
-                List<ManagementServerHostVO> l = msg.getNodes();
-                if(l != null && l.size() > 0) {
-                    for(ManagementServerHostVO mshost: l) {
-                        _mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Up);
-                    }
-                }
-            }
-            break;
-            
-        case nodeRemoved:
-            {
-                List<ManagementServerHostVO> l = msg.getNodes();
-                if(l != null && l.size() > 0) {
-                    for(ManagementServerHostVO mshost: l) {
-                        _mshostPeerDao.updatePeerInfo(_mshostId, mshost.getId(), mshost.getRunid(), ManagementServerHost.State.Down);
-                    }
-                }
-            }
-            break;
-            
-        default :
-            break;
-        
-        }
-    }
-
-    private ClusterManagerMessage getNextNotificationMessage() {
-        synchronized(this._notificationMsgs) {
-            if(this._notificationMsgs.size() > 0) {
-                return this._notificationMsgs.remove(0);
-            }
-        }
-
-        return null;
-    }
-
-    private void initPeerScan() {
-        // upon startup, for all inactive management server nodes that we see at startup time, we will send notification also to help upper layer perform
-        // missed cleanup
-        Date cutTime = DateUtil.currentGMTTime();
-        List<ManagementServerHostVO> inactiveList = _mshostDao.getInactiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
-       
-        // We don't have foreign key constraints to enforce the mgmt_server_id integrity in host table, when user manually 
-        // remove records from mshost table, this will leave orphan mgmt_serve_id reference in host table.
-        List<Long> orphanList = _mshostDao.listOrphanMsids();
-        if(orphanList.size() > 0) {
-	        for(Long orphanMsid : orphanList) {
-	        	// construct fake ManagementServerHostVO based on orphan MSID
-	        	s_logger.info("Add orphan management server msid found in host table to initial clustering notification, orphan msid: " + orphanMsid);
-	        	inactiveList.add(new ManagementServerHostVO(orphanMsid, 0, "orphan", 0, new Date()));
-	        }
-        } else {
-        	s_logger.info("We are good, no orphan management server msid in host table is found");
-        }
-        
-        if(inactiveList.size() > 0) {
-        	if(s_logger.isInfoEnabled()) {
-        		s_logger.info("Found " + inactiveList.size() + " inactive management server node based on timestamp");
-        		for(ManagementServerHostVO host : inactiveList)
-        			s_logger.info("management server node msid: " + host.getMsid() + ", name: " + host.getName() + ", service ip: " + host.getServiceIP() + ", version: " + host.getVersion());
-        	}
-
-        	List<ManagementServerHostVO> downHostList = new ArrayList<ManagementServerHostVO>();
-            for(ManagementServerHostVO host : inactiveList) {
-	            if(!pingManagementNode(host)) {
-	                s_logger.warn("Management node " + host.getId() + " is detected inactive by timestamp and also not pingable");
-	                downHostList.add(host);	
-	            }
-            }
-            
-            if(downHostList.size() > 0)
-            	this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, downHostList));
-        } else {
-        	s_logger.info("No inactive management server node found");
-        }
-    }
-
-    private void peerScan() throws ActiveFencingException {
-        Date cutTime = DateUtil.currentGMTTime();
-
-        Profiler profiler = new Profiler();
-        profiler.start();
-        
-        Profiler profilerQueryActiveList = new Profiler();
-        profilerQueryActiveList.start();
-        List<ManagementServerHostVO> currentList = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
-        profilerQueryActiveList.stop();
-
-        Profiler profilerSyncClusterInfo = new Profiler();
-        profilerSyncClusterInfo.start();
-        List<ManagementServerHostVO> removedNodeList = new ArrayList<ManagementServerHostVO>();
-        List<ManagementServerHostVO> invalidatedNodeList = new ArrayList<ManagementServerHostVO>();
-
-        if(_mshostId != null) {
-            
-            if(_mshostPeerDao.countStateSeenInPeers(_mshostId, _runId, ManagementServerHost.State.Down) > 0) {
-                String msg = "We have detected that at least one management server peer reports that this management server is down, perform active fencing to avoid split-brain situation";
-                s_logger.error(msg);
-                throw new ActiveFencingException(msg);
-            }
-            
-            // only if we have already attached to cluster, will we start to check leaving nodes
-            for(Map.Entry<Long, ManagementServerHostVO>  entry : _activePeers.entrySet()) {
-
-                ManagementServerHostVO current = getInListById(entry.getKey(), currentList);
-                if(current == null) {
-                    if(entry.getKey().longValue() != _mshostId.longValue()) {
-                        if(s_logger.isDebugEnabled()) {
-                            s_logger.debug("Detected management node left, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP());
-                        }
-                        removedNodeList.add(entry.getValue());
-                    }
-                } else {
-                    if(current.getRunid() == 0) {
-                        if(entry.getKey().longValue() != _mshostId.longValue()) {
-                            if(s_logger.isDebugEnabled()) {
-                                s_logger.debug("Detected management node left because of invalidated session, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP());
-                            }
-                            invalidatedNodeList.add(entry.getValue());
-                        }
-                    } else {
-                        if(entry.getValue().getRunid() != current.getRunid()) {
-                            if(s_logger.isDebugEnabled()) {
-                                s_logger.debug("Detected management node left and rejoined quickly, id:" + entry.getKey() + ", nodeIP:" + entry.getValue().getServiceIP());
-                            }
-
-                            entry.getValue().setRunid(current.getRunid());
-                        }
-                    }
-                }
-            }
-        }
-        profilerSyncClusterInfo.stop();
-        
-        Profiler profilerInvalidatedNodeList = new Profiler();
-        profilerInvalidatedNodeList.start();
-        // process invalidated node list
-        if(invalidatedNodeList.size() > 0) {
-            for(ManagementServerHostVO mshost : invalidatedNodeList) {
-                _activePeers.remove(mshost.getId());
-                try {
-                    JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
-                } catch(Exception e) {
-                    s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString());
-                }
-            }
-
-            this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, invalidatedNodeList));
-        }
-        profilerInvalidatedNodeList.stop();
-
-        Profiler profilerRemovedList = new Profiler();
-        profilerRemovedList.start();
-        // process removed node list
-        Iterator<ManagementServerHostVO> it = removedNodeList.iterator();
-        while(it.hasNext()) {
-            ManagementServerHostVO mshost = it.next();
-            if(!pingManagementNode(mshost)) {
-                s_logger.warn("Management node " + mshost.getId() + " is detected inactive by timestamp and also not pingable");
-                _activePeers.remove(mshost.getId());
-                try {
-                    JmxUtil.unregisterMBean("ClusterManager", "Node " + mshost.getId());
-                } catch(Exception e) {
-                    s_logger.warn("Unable to deregiester cluster node from JMX monitoring due to exception " + e.toString());
-                }
-            } else {
-                s_logger.info("Management node " + mshost.getId() + " is detected inactive by timestamp but is pingable");
-                it.remove();
-            }
-        }
-
-        if(removedNodeList.size() > 0) {
-            this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeRemoved, removedNodeList));
-        }
-        profilerRemovedList.stop();
-
-        List<ManagementServerHostVO> newNodeList = new ArrayList<ManagementServerHostVO>();
-        for(ManagementServerHostVO mshost : currentList) {
-            if(!_activePeers.containsKey(mshost.getId())) {
-                _activePeers.put(mshost.getId(), mshost);
-
-                if(s_logger.isDebugEnabled()) {
-                    s_logger.debug("Detected management node joined, id:" + mshost.getId() + ", nodeIP:" + mshost.getServiceIP());
-                }
-                newNodeList.add(mshost);
-
-                try {
-                    JmxUtil.registerMBean("ClusterManager", "Node " + mshost.getId(), new ClusterManagerMBeanImpl(this, mshost));
-                } catch(Exception e) {
-                    s_logger.warn("Unable to regiester cluster node into JMX monitoring due to exception " + ExceptionUtil.toString(e));
-                }
-            }
-        }
-
-        if(newNodeList.size() > 0) {
-            this.queueNotification(new ClusterManagerMessage(ClusterManagerMessage.MessageType.nodeAdded, newNodeList));
-        }
-        
-        profiler.stop();
-        
-        if(profiler.getDuration() >= this._heartbeatInterval) {
-            if(s_logger.isDebugEnabled())
-                s_logger.debug("Peer scan takes too long to finish. profiler: " + profiler.toString()
-                  + ", profilerQueryActiveList: " + profilerQueryActiveList.toString()
-                  + ", profilerSyncClusterInfo: " + profilerSyncClusterInfo.toString()
-                  + ", profilerInvalidatedNodeList: " + profilerInvalidatedNodeList.toString()
-                  + ", profilerRemovedList: " + profilerRemovedList.toString());
-        }
-    }
-
-    private static ManagementServerHostVO getInListById(Long id, List<ManagementServerHostVO> l) {
-        for(ManagementServerHostVO mshost : l) {
-            if(mshost.getId() == id) {
-                return mshost;
-            }
-        }
-        return null;
-    }
-
-    @Override @DB
-    public boolean start() {
-        if(s_logger.isInfoEnabled()) {
-            s_logger.info("Starting cluster manager, msid : " + _msId);
-        }
-
-        Transaction txn = Transaction.currentTxn();
-        try {
-            txn.start();
-
-            final Class<?> c = this.getClass();
-            String version = c.getPackage().getImplementationVersion();
-
-            ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
-            if (mshost == null) {
-                mshost = new ManagementServerHostVO();
-                mshost.setMsid(_msId);
-                mshost.setRunid(this.getCurrentRunId());
-                mshost.setName(NetUtils.getHostName());
-                mshost.setVersion(version);
-                mshost.setServiceIP(_clusterNodeIP);
-                mshost.setServicePort(_currentServiceAdapter.getServicePort());
-                mshost.setLastUpdateTime(DateUtil.currentGMTTime());
-                mshost.setRemoved(null);
-                mshost.setAlertCount(0);
-                mshost.setState(ManagementServerHost.State.Up);
-                _mshostDao.persist(mshost);
-
-                if (s_logger.isInfoEnabled()) {
-                    s_logger.info("New instance of management server msid " + _msId + " is being started");
-                }
-            } else {
-                if (s_logger.isInfoEnabled()) {
-                    s_logger.info("Management server " + _msId + " is being started");
-                }
-
-                _mshostDao.update(mshost.getId(), getCurrentRunId(), NetUtils.getHostName(), version, _clusterNodeIP, _currentServiceAdapter.getServicePort(), DateUtil.currentGMTTime());
-            }
-
-            txn.commit();
-
-            _mshostId = mshost.getId();
-            if (s_logger.isInfoEnabled()) {
-                s_logger.info("Management server (host id : " + _mshostId + ") is being started at " + _clusterNodeIP + ":" + _currentServiceAdapter.getServicePort());
-            }
-            
-            _mshostPeerDao.clearPeerInfo(_mshostId);
-
-            // use seperate thread for heartbeat updates
-            _heartbeatScheduler.scheduleAtFixedRate(getHeartbeatTask(), _heartbeatInterval, _heartbeatInterval, TimeUnit.MILLISECONDS);
-            _notificationExecutor.submit(getNotificationTask());
-
-        } catch (Throwable e) {
-            s_logger.error("Unexpected exception : ", e);
-            txn.rollback();
-
-            throw new CloudRuntimeException("Unable to initialize cluster info into database");
-        }
-
-        if (s_logger.isInfoEnabled()) {
-            s_logger.info("Cluster manager was started successfully");
-        }
-
-        return true;
-    }
-
-    @Override @DB
-    public boolean stop() {
-        if(_mshostId != null) {
-            ManagementServerHostVO mshost = _mshostDao.findByMsid(_msId);
-            mshost.setState(ManagementServerHost.State.Down);
-            _mshostDao.update(_mshostId, mshost);
-        }
-
-        _heartbeatScheduler.shutdownNow();
-        _executor.shutdownNow();
-
-        try {
-            _heartbeatScheduler.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-            _executor.awaitTermination(EXECUTOR_SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException e) {
-        }
-
-        if(s_logger.isInfoEnabled()) {
-            s_logger.info("Cluster manager is stopped");
-        }
-
-        return true;
-    }
-
-    @Override
-    public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
-        if(s_logger.isInfoEnabled()) {
-            s_logger.info("Start configuring cluster manager : " + name);
-        }
-
-        Map<String, String> configs = _configDao.getConfiguration("management-server", params);
-
-        String value = configs.get("cluster.heartbeat.interval");
-        if (value != null) {
-            _heartbeatInterval = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_INTERVAL);
-        }
-
-        value = configs.get("cluster.heartbeat.threshold");
-        if (value != null) {
-            _heartbeatThreshold = NumbersUtil.parseInt(value, ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD);
-        }
-
-        File dbPropsFile = PropertiesUtil.findConfigFile("db.properties");
-        Properties dbProps = new Properties();
-        try {
-            dbProps.load(new FileInputStream(dbPropsFile));
-        } catch (FileNotFoundException e) {
-            throw new ConfigurationException("Unable to find db.properties");
-        } catch (IOException e) {
-            throw new ConfigurationException("Unable to load db.properties content");
-        }
-        _clusterNodeIP = dbProps.getProperty("cluster.node.IP");
-        if (_clusterNodeIP == null) {
-            _clusterNodeIP = "127.0.0.1";
-        }
-        _clusterNodeIP = _clusterNodeIP.trim();
-
-        if(s_logger.isInfoEnabled()) {
-            s_logger.info("Cluster node IP : " + _clusterNodeIP);
-        }
-
-        if(!NetUtils.isLocalAddress(_clusterNodeIP)) {
-            throw new ConfigurationException("cluster node IP should be valid local address where the server is running, please check your configuration");
-        }
-
-        for(int i = 0; i < DEFAULT_OUTGOING_WORKERS; i++)
-        	_executor.execute(getClusterPduSendingTask());
-        
-        // notification task itself in turn works as a task dispatcher
-        _executor.execute(getClusterPduNotificationTask());
-
-        if (_serviceAdapters == null) {
-            throw new ConfigurationException("Unable to get cluster service adapters");
-        }
-        _currentServiceAdapter = _serviceAdapters.get(0);
-
-        if(_currentServiceAdapter == null) {
-            throw new ConfigurationException("Unable to set current cluster service adapter");
-        }
-
-        _agentLBEnabled = Boolean.valueOf(_configDao.getValue(Config.AgentLbEnable.key()));
-        
-        String connectedAgentsThreshold = configs.get("agent.load.threshold");
-        
-        if (connectedAgentsThreshold != null) {
-            _connectedAgentsThreshold = Double.parseDouble(connectedAgentsThreshold);
-        }
-
-        this.registerListener(new LockMasterListener(_msId));
-
-        checkConflicts();
-
-        if(s_logger.isInfoEnabled()) {
-            s_logger.info("Cluster manager is configured.");
-        }
-        return true;
-    }
-
-    @Override
-    public long getManagementNodeId() {
-        return _msId;
-    }
-
-    @Override
-    public long getCurrentRunId() {
-        return _runId;
-    }
-
-    @Override
-    public boolean isManagementNodeAlive(long msid) {
-        ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
-        if(mshost != null) {
-            if(mshost.getLastUpdateTime().getTime() >=  DateUtil.currentGMTTime().getTime() - _heartbeatThreshold) {
-                return true;
-            }
-        }
-
-        return false;
-    }
-
-    @Override
-    public boolean pingManagementNode(long msid) {
-        ManagementServerHostVO mshost = _mshostDao.findByMsid(msid);
-        if(mshost == null) {
-            return false;
-        }
-
-        return pingManagementNode(mshost);
-    }
-
-    private boolean pingManagementNode(ManagementServerHostVO mshost) {
-
-        String targetIp = mshost.getServiceIP();
-        if("127.0.0.1".equals(targetIp) || "0.0.0.0".equals(targetIp)) {
-            s_logger.info("ping management node cluster service can not be performed on self");
-            return false;
-        }
-     
-        int retry = 10;
-        while (--retry > 0) {
-            SocketChannel sch = null;
-            try {
-                s_logger.info("Trying to connect to " + targetIp);
-                sch = SocketChannel.open();
-                sch.configureBlocking(true);
-                sch.socket().setSoTimeout(5000);
-
-                InetSocketAddress addr = new InetSocketAddress(targetIp, mshost.getServicePort());
-                sch.connect(addr);
-                return true;
-            } catch (IOException e) {
-                if (e instanceof ConnectException) {
-                    s_logger.error("Unable to ping management server at " + targetIp + ":" + mshost.getServicePort() + " due to ConnectException", e);
-                	return false;
-                }
-            } finally {
-                if (sch != null) {
-                    try {
-                        sch.close();
-                    } catch (IOException e) {
-                    }
-                }
-            }
-
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ex) {
-            }
-        }
-        
-        s_logger.error("Unable to ping management server at " + targetIp + ":" + mshost.getServicePort() + " after retries");
-        return false;
-    }
-
-
-    @Override
-    public int getHeartbeatThreshold() {
-        return this._heartbeatThreshold;
-    }
-
-    public int getHeartbeatInterval() {
-        return this._heartbeatInterval;
-    }
-
-    public void setHeartbeatThreshold(int threshold) {
-        _heartbeatThreshold = threshold;
-    }
-
-    private void checkConflicts() throws ConfigurationException {
-        Date cutTime = DateUtil.currentGMTTime();
-        List<ManagementServerHostVO> peers = _mshostDao.getActiveList(new Date(cutTime.getTime() - _heartbeatThreshold));
-        for(ManagementServerHostVO peer : peers) {
-            String peerIP = peer.getServiceIP().trim();
-            if(_clusterNodeIP.equals(peerIP)) {
-                if("127.0.0.1".equals(_clusterNodeIP)) {
-                    if(pingManagementNode(peer.getMsid())) {
-                        String msg = "Detected another management node with localhost IP is already running, please check your cluster configuration";
-                        s_logger.error(msg);
-                        throw new ConfigurationException(msg);
-                    } else {
-                        String msg = "Detected another management node with localhost IP is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
-                        s_logger.info(msg);
-                    }
-                } else {
-                    if(pingManagementNode(peer.getMsid())) {
-                        String msg = "Detected that another management node with the same IP " + peer.getServiceIP() + " is already running, please check your cluster configuration";
-                        s_logger.error(msg);
-                        throw new ConfigurationException(msg);
-                    } else {
-                        String msg = "Detected that another management node with the same IP " + peer.getServiceIP()
-                                + " is considered as running in DB, however it is not pingable, we will continue cluster initialization with this management server node";
-                        s_logger.info(msg);
-                    }
-                }
-            }
-        }
-    }
-
-    @Override
-    public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
-        return _rebalanceService.executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
-    }
-
-    @Override
-    public  boolean isAgentRebalanceEnabled() {
-        return _agentLBEnabled;
-    }
-    
-    @Override
-    public Boolean propagateResourceEvent(long agentId, ResourceState.Event event) throws AgentUnavailableException {
-        final String msPeer = getPeerName(agentId);
-        if (msPeer == null) {
-            return null;
-        }
-
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
-        }
-        Command[] cmds = new Command[1];
-        cmds[0] = new PropagateResourceEventCommand(agentId, event);
-
-        Answer[] answers = execute(msPeer, agentId, cmds, true);
-        if (answers == null) {
-            throw new AgentUnavailableException(agentId);
-        }
-
-        if (s_logger.isDebugEnabled()) {
-            s_logger.debug("Result for agent change is " + answers[0].getResult());
-        }
-
-        return answers[0].getResult();
-    }
-    
-    @Override
-    public boolean executeResourceUserRequest(long hostId, ResourceState.Event event) throws AgentUnavailableException {
-        return _resourceMgr.executeUserRequest(hostId, event);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterManagerListener.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterManagerListener.java b/server/src/com/cloud/cluster/ClusterManagerListener.java
deleted file mode 100644
index bcb1736..0000000
--- a/server/src/com/cloud/cluster/ClusterManagerListener.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.util.List;
-
-public interface ClusterManagerListener {
-	void onManagementNodeJoined(List<ManagementServerHostVO> nodeList, long selfNodeId);
-	void onManagementNodeLeft(List<ManagementServerHostVO> nodeList, long selfNodeId);
-	void onManagementNodeIsolated();
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterManagerMBean.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterManagerMBean.java b/server/src/com/cloud/cluster/ClusterManagerMBean.java
deleted file mode 100644
index 9804f23..0000000
--- a/server/src/com/cloud/cluster/ClusterManagerMBean.java
+++ /dev/null
@@ -1,27 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-public interface ClusterManagerMBean {
-	public long getMsid();
-	public String getLastUpdateTime();
-	public String getClusterNodeIP();
-	public String getVersion();
-	public int getHeartbeatInterval();
-	public int getHeartbeatThreshold();
-	public void setHeartbeatThreshold(int threshold);
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterManagerMBeanImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterManagerMBeanImpl.java b/server/src/com/cloud/cluster/ClusterManagerMBeanImpl.java
deleted file mode 100644
index 51b3b42..0000000
--- a/server/src/com/cloud/cluster/ClusterManagerMBeanImpl.java
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.util.Date;
-import java.util.TimeZone;
-
-import javax.management.StandardMBean;
-
-import com.cloud.utils.DateUtil;
-
-public class ClusterManagerMBeanImpl extends StandardMBean implements ClusterManagerMBean {
-	private ClusterManagerImpl _clusterMgr;
-	private ManagementServerHostVO _mshostVo;
-	
-	public ClusterManagerMBeanImpl(ClusterManagerImpl clusterMgr, ManagementServerHostVO mshostVo) {
-		super(ClusterManagerMBean.class, false);
-		
-		_clusterMgr = clusterMgr;
-		_mshostVo = mshostVo;
-	}
-	
-	public long getMsid() {
-		return _mshostVo.getMsid();
-	}
-	
-	public String getLastUpdateTime() {
-		Date date = _mshostVo.getLastUpdateTime();
-		return DateUtil.getDateDisplayString(TimeZone.getDefault(), date);
-	}
-	
-	public String getClusterNodeIP() {
-		return _mshostVo.getServiceIP();
-	}
-	
-	public String getVersion() {
-		return _mshostVo.getVersion();
-	}
-	
-	public int getHeartbeatInterval() {
-		return _clusterMgr.getHeartbeatInterval();
-	}
-	
-	public int getHeartbeatThreshold() {
-		return _clusterMgr.getHeartbeatThreshold();
-	}
-	
-	public void setHeartbeatThreshold(int threshold) {
-		// to avoid accidentally screwing up cluster manager, we put some guarding logic here
-    	if(threshold >= ClusterManager.DEFAULT_HEARTBEAT_THRESHOLD)
-    		_clusterMgr.setHeartbeatThreshold(threshold);
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterManagerMessage.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterManagerMessage.java b/server/src/com/cloud/cluster/ClusterManagerMessage.java
deleted file mode 100644
index 7f3e596..0000000
--- a/server/src/com/cloud/cluster/ClusterManagerMessage.java
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.util.List;
-
-public class ClusterManagerMessage {
-	public static enum MessageType { nodeAdded, nodeRemoved, nodeIsolated };
-
-	MessageType _type;
-	List<ManagementServerHostVO> _nodes;
-	
-	public ClusterManagerMessage(MessageType type) {
-		_type = type;
-	}
-	
-	public ClusterManagerMessage(MessageType type, List<ManagementServerHostVO> nodes) {
-		_type = type;
-		_nodes = nodes;
-	}
-	
-	public MessageType getMessageType() {
-		return _type;
-	}
-	
-	public List<ManagementServerHostVO> getNodes() {
-		return _nodes;
-	}
-    
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterNodeJoinEventArgs.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterNodeJoinEventArgs.java b/server/src/com/cloud/cluster/ClusterNodeJoinEventArgs.java
deleted file mode 100644
index 594862d..0000000
--- a/server/src/com/cloud/cluster/ClusterNodeJoinEventArgs.java
+++ /dev/null
@@ -1,43 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.util.List;
-
-import com.cloud.utils.events.EventArgs;
-
-public class ClusterNodeJoinEventArgs extends EventArgs {
-	private static final long serialVersionUID = 6284545402661799476L;
-	
-	private List<ManagementServerHostVO> joinedNodes;
-	private Long self;
-	
-	public ClusterNodeJoinEventArgs(Long self, List<ManagementServerHostVO> joinedNodes) {
-		super(ClusterManager.ALERT_SUBJECT);
-		
-		this.self = self;
-		this.joinedNodes = joinedNodes;
-	}
-	
-	public List<ManagementServerHostVO> getJoinedNodes() {
-		return joinedNodes;
-	}
-
-	public Long getSelf() {
-		return self;
-	}
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterNodeLeftEventArgs.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterNodeLeftEventArgs.java b/server/src/com/cloud/cluster/ClusterNodeLeftEventArgs.java
deleted file mode 100644
index be54b9f..0000000
--- a/server/src/com/cloud/cluster/ClusterNodeLeftEventArgs.java
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.util.List;
-
-import com.cloud.utils.events.EventArgs;
-
-public class ClusterNodeLeftEventArgs extends EventArgs {
-	private static final long serialVersionUID = 7236743316223611935L;
-	
-	private List<ManagementServerHostVO> leftNodes;
-	private Long self;
-	
-	public ClusterNodeLeftEventArgs(Long self, List<ManagementServerHostVO> leftNodes) {
-		super(ClusterManager.ALERT_SUBJECT);
-		
-		this.self = self;
-		this.leftNodes = leftNodes;
-	}
-	
-	public List<ManagementServerHostVO> getLeftNodes() {
-		return leftNodes;
-	}
-
-	public Long getSelf() {
-		return self;
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterService.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterService.java b/server/src/com/cloud/cluster/ClusterService.java
deleted file mode 100644
index 295c3b2..0000000
--- a/server/src/com/cloud/cluster/ClusterService.java
+++ /dev/null
@@ -1,25 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.rmi.Remote;
-import java.rmi.RemoteException;
-
-public interface ClusterService extends Remote {
-    String execute(ClusterServicePdu pdu) throws RemoteException;
-    boolean ping(String callingPeer) throws RemoteException;
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterServiceAdapter.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterServiceAdapter.java b/server/src/com/cloud/cluster/ClusterServiceAdapter.java
deleted file mode 100644
index 28e2cab..0000000
--- a/server/src/com/cloud/cluster/ClusterServiceAdapter.java
+++ /dev/null
@@ -1,28 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-import java.rmi.RemoteException;
-
-import com.cloud.cluster.ClusterService;
-import com.cloud.utils.component.Adapter;
-
-public interface ClusterServiceAdapter extends Adapter {
-	public ClusterService getPeerService(String strPeer) throws RemoteException;
-	public String getServiceEndpointName(String strPeer);
-	public int getServicePort();
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterServicePdu.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterServicePdu.java b/server/src/com/cloud/cluster/ClusterServicePdu.java
deleted file mode 100644
index 81ff5d8..0000000
--- a/server/src/com/cloud/cluster/ClusterServicePdu.java
+++ /dev/null
@@ -1,112 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-public class ClusterServicePdu {
-	public final static int PDU_TYPE_MESSAGE = 0;
-	public final static int PDU_TYPE_REQUEST = 1;
-	public final static int PDU_TYPE_RESPONSE = 2;
-	
-    private long sequenceId;
-    private long ackSequenceId;
-    
-    private String sourcePeer;
-    private String destPeer;
-    
-    private long agentId;
-    private boolean stopOnError;
-    private String jsonPackage;
-    
-    private int pduType = PDU_TYPE_MESSAGE;
-    
-    private static long s_nextPduSequenceId = 1;
-    
-    public ClusterServicePdu() {
-        sequenceId = getNextPduSequenceId();
-        ackSequenceId = 0;
-        agentId = 0;
-        stopOnError = false;
-    }
-    
-    public synchronized long getNextPduSequenceId() {
-        return s_nextPduSequenceId++;
-    }
-
-    public long getSequenceId() {
-        return sequenceId;
-    }
-
-    public void setSequenceId(long sequenceId) {
-        this.sequenceId = sequenceId;
-    }
-
-    public long getAckSequenceId() {
-        return ackSequenceId;
-    }
-
-    public void setAckSequenceId(long ackSequenceId) {
-        this.ackSequenceId = ackSequenceId;
-    }
-
-    public String getSourcePeer() {
-        return sourcePeer;
-    }
-
-    public void setSourcePeer(String sourcePeer) {
-        this.sourcePeer = sourcePeer;
-    }
-
-    public String getDestPeer() {
-        return destPeer;
-    }
-
-    public void setDestPeer(String destPeer) {
-        this.destPeer = destPeer;
-    }
-
-    public long getAgentId() {
-        return agentId;
-    }
-
-    public void setAgentId(long agentId) {
-        this.agentId = agentId;
-    }
-
-    public boolean isStopOnError() {
-        return stopOnError;
-    }
-
-    public void setStopOnError(boolean stopOnError) {
-        this.stopOnError = stopOnError;
-    }
-
-    public String getJsonPackage() {
-        return jsonPackage;
-    }
-
-    public void setJsonPackage(String jsonPackage) {
-        this.jsonPackage = jsonPackage;
-    }
-    
-    public int getPduType() {
-    	return pduType;
-    }
-    
-    public void setPduType(int pduType) {
-    	this.pduType = pduType;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cloudstack/blob/9aaa378b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java b/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java
deleted file mode 100644
index 09bc22e..0000000
--- a/server/src/com/cloud/cluster/ClusterServiceRequestPdu.java
+++ /dev/null
@@ -1,54 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-package com.cloud.cluster;
-
-public class ClusterServiceRequestPdu extends ClusterServicePdu {
-
-    private String responseResult;
-    private long startTick;
-    private long timeout;
-    
-    public ClusterServiceRequestPdu() {
-        startTick = System.currentTimeMillis();
-        timeout = -1;
-        setPduType(PDU_TYPE_REQUEST);
-    }
-
-    public String getResponseResult() {
-        return responseResult;
-    }
-
-    public void setResponseResult(String responseResult) {
-        this.responseResult = responseResult;
-    }
-
-    public long getStartTick() {
-        return startTick;
-    }
-
-    public void setStartTick(long startTick) {
-        this.startTick = startTick;
-    }
-
-    public long getTimeout() {
-        return timeout;
-    }
-
-    public void setTimeout(long timeout) {
-        this.timeout = timeout;
-    }
-}