You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by mu...@apache.org on 2013/02/07 08:43:58 UTC

[34/50] [abbrv] git commit: refs/heads/gslb - CS-CLOUDSTACK-606: The issue happens randomly when hosts in a cluster gets distributed across multiple MS. Host can get split in following scenarios: a. Add host – MS on which add host is executed takes o

CS-CLOUDSTACK-606:
The issue happens randomly when hosts in a cluster gets distributed across multiple MS. Host can get split in following scenarios:
    a. Add host – MS on which add host is executed takes ownership of the host. So if 2 hosts belonging to same cluster are added from 2 different MS then cluster gets split
    b. scanDirectAgentToLoad – This runs every 90 secs. and check if there are any hosts that needs to be reconnected. The current logic of host scan can also lead to a split

    The idea is to fix (b) to ensure that hosts in a cluster are managed by same MS. For (a) only the entry in the database is going to be created except in case if the host getting added is first in the cluster (in this case agent creation happens at the same time) and then (b) will take care of connection and agent creation part. Since currently addHost only creates an entry in the db there is a small window where the host state will be shown as 'Alert' till the time (b) is scheduled and picks up the host to make a connection. The MS doing add host will immediately schedule a scan task and also send notification to peers to start the scan task.


Project: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/commit/777147ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/tree/777147ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/diff/777147ce

Branch: refs/heads/gslb
Commit: 777147ce8a47238125a5439f207c225aa9db5304
Parents: e162876
Author: Koushik Das <ko...@citrix.com>
Authored: Fri Feb 1 15:34:41 2013 +0530
Committer: Nitin Mehta <ni...@citrix.com>
Committed: Fri Feb 1 15:34:41 2013 +0530

----------------------------------------------------------------------
 .../agent/manager/ClusteredAgentManagerImpl.java   |   17 ++
 .../src/com/cloud/cluster/ClusterManagerImpl.java  |   34 +++-
 server/src/com/cloud/host/dao/HostDaoImpl.java     |  147 ++++++++++++--
 .../com/cloud/resource/ResourceManagerImpl.java    |  160 ++++++++++++++-
 4 files changed, 340 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/777147ce/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
index ca0bf5c..6487b8e 100755
--- a/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ b/server/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@ -50,6 +50,7 @@ import com.cloud.agent.api.CancelCommand;
 import com.cloud.agent.api.ChangeAgentCommand;
 import com.cloud.agent.api.Command;
 import com.cloud.agent.api.TransferAgentCommand;
+import com.cloud.agent.api.ScheduleHostScanTaskCommand;
 import com.cloud.agent.transport.Request;
 import com.cloud.agent.transport.Request.Version;
 import com.cloud.agent.transport.Response;
@@ -159,6 +160,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
         return true;
     }
 
+    public void scheduleHostScanTask() {
+        _timer.schedule(new DirectAgentScanTimerTask(), 0);
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Scheduled a direct agent scan task");
+        }
+    }
+
     private void runDirectAgentScanTimerTask() {
         scanDirectAgentToLoad();
     }
@@ -357,6 +365,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
         _clusterMgr.broadcast(attache.getId(), cmds);
     }
 
+    // notifies MS peers to schedule a host scan task immediately, triggered during addHost operation
+    public void notifyNodesInClusterToScheduleHostScanTask() {
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Notifying other MS nodes to run host scan task");
+        }
+        Command[] cmds = new Command[] { new ScheduleHostScanTaskCommand() };
+        _clusterMgr.broadcast(0, cmds);
+    }
+
     protected static void logT(byte[] bytes, final String msg) {
         s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": "
                 + (Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/777147ce/server/src/com/cloud/cluster/ClusterManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/cluster/ClusterManagerImpl.java b/server/src/com/cloud/cluster/ClusterManagerImpl.java
index e341b88..465f384 100755
--- a/server/src/com/cloud/cluster/ClusterManagerImpl.java
+++ b/server/src/com/cloud/cluster/ClusterManagerImpl.java
@@ -53,6 +53,8 @@ import com.cloud.agent.api.ChangeAgentCommand;
 import com.cloud.agent.api.Command;
 import com.cloud.agent.api.PropagateResourceEventCommand;
 import com.cloud.agent.api.TransferAgentCommand;
+import com.cloud.agent.api.ScheduleHostScanTaskCommand;
+import com.cloud.agent.manager.ClusteredAgentManagerImpl;
 import com.cloud.agent.manager.Commands;
 import com.cloud.cluster.agentlb.dao.HostTransferMapDao;
 import com.cloud.cluster.dao.ManagementServerHostDao;
@@ -348,7 +350,33 @@ public class ClusterManagerImpl implements ClusterManager {
             }
         }
     }
-    
+
+    private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) {
+        if (s_logger.isDebugEnabled()) {
+            s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd));
+        }
+
+        try {
+            // schedule a scan task immediately
+            if (_agentMgr instanceof ClusteredAgentManagerImpl) {
+                if (s_logger.isDebugEnabled()) {
+                    s_logger.debug("Received notification as part of addHost command to start a host scan task");
+                }
+                ClusteredAgentManagerImpl clusteredAgentMgr = (ClusteredAgentManagerImpl)_agentMgr;
+                clusteredAgentMgr.scheduleHostScanTask();
+            }
+        } catch (Exception e) {
+            // Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
+            // happens at fixed intervals anyways. So handling any exceptions that may be thrown
+            s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + getSelfPeerName() + ", ignoring as regular host scan happens at fixed interval anyways", e);
+            return null;
+        }
+
+        Answer[] answers = new Answer[1];
+        answers[0] = new Answer(cmd, true, null);
+        return _gson.toJson(answers);
+    }
+
     private String dispatchClusterServicePdu(ClusterServicePdu pdu) {
 
         if(s_logger.isDebugEnabled()) {
@@ -424,6 +452,10 @@ public class ClusterManagerImpl implements ClusterManager {
         	Answer[] answers = new Answer[1];
         	answers[0] = new Answer(cmd, result, null);
         	return _gson.toJson(answers);
+        } else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
+            ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand) cmds[0];
+            String response = handleScheduleHostScanTaskCommand(cmd);
+            return response;
         }
 
         try {

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/777147ce/server/src/com/cloud/host/dao/HostDaoImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/host/dao/HostDaoImpl.java b/server/src/com/cloud/host/dao/HostDaoImpl.java
index 0881675..c7c014d 100755
--- a/server/src/com/cloud/host/dao/HostDaoImpl.java
+++ b/server/src/com/cloud/host/dao/HostDaoImpl.java
@@ -106,6 +106,11 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
     protected SearchBuilder<ClusterVO> ClusterManagedSearch;
     protected final SearchBuilder<HostVO> RoutingSearch;
 
+    protected final SearchBuilder<HostVO> HostsForReconnectSearch;
+    protected final GenericSearchBuilder<HostVO, Long> ClustersOwnedByMSSearch;
+    protected final GenericSearchBuilder<ClusterVO, Long> AllClustersSearch;
+    protected final SearchBuilder<HostVO> HostsInClusterSearch;
+
     protected final Attribute _statusAttr;
     protected final Attribute _resourceStateAttr;
     protected final Attribute _msIdAttr;
@@ -233,6 +238,7 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
         UnmanagedDirectConnectSearch.and("server", UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.NULL);
         UnmanagedDirectConnectSearch.and("lastPinged", UnmanagedDirectConnectSearch.entity().getLastPinged(), SearchCriteria.Op.LTEQ);
         UnmanagedDirectConnectSearch.and("resourceStates", UnmanagedDirectConnectSearch.entity().getResourceState(), SearchCriteria.Op.NIN);
+        UnmanagedDirectConnectSearch.and("cluster", UnmanagedDirectConnectSearch.entity().getClusterId(), SearchCriteria.Op.EQ);
         /*
          * UnmanagedDirectConnectSearch.op(SearchCriteria.Op.OR, "managementServerId",
          * UnmanagedDirectConnectSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
@@ -301,6 +307,33 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
         RoutingSearch.and("type", RoutingSearch.entity().getType(), SearchCriteria.Op.EQ);
         RoutingSearch.done();
 
+        HostsForReconnectSearch = createSearchBuilder();
+        HostsForReconnectSearch.and("resource", HostsForReconnectSearch.entity().getResource(), SearchCriteria.Op.NNULL);
+        HostsForReconnectSearch.and("server", HostsForReconnectSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
+        HostsForReconnectSearch.and("lastPinged", HostsForReconnectSearch.entity().getLastPinged(), SearchCriteria.Op.LTEQ);
+        HostsForReconnectSearch.and("resourceStates", HostsForReconnectSearch.entity().getResourceState(), SearchCriteria.Op.NIN);
+        HostsForReconnectSearch.and("cluster", HostsForReconnectSearch.entity().getClusterId(), SearchCriteria.Op.NNULL);
+        HostsForReconnectSearch.and("status", HostsForReconnectSearch.entity().getStatus(), SearchCriteria.Op.IN);
+        HostsForReconnectSearch.done();
+
+        ClustersOwnedByMSSearch = createSearchBuilder(Long.class);
+        ClustersOwnedByMSSearch.select(null, Func.DISTINCT, ClustersOwnedByMSSearch.entity().getClusterId());
+        ClustersOwnedByMSSearch.and("resource", ClustersOwnedByMSSearch.entity().getResource(), SearchCriteria.Op.NNULL);
+        ClustersOwnedByMSSearch.and("cluster", ClustersOwnedByMSSearch.entity().getClusterId(), SearchCriteria.Op.NNULL);
+        ClustersOwnedByMSSearch.and("server", ClustersOwnedByMSSearch.entity().getManagementServerId(), SearchCriteria.Op.EQ);
+        ClustersOwnedByMSSearch.done();
+
+        AllClustersSearch = _clusterDao.createSearchBuilder(Long.class);
+        AllClustersSearch.select(null, Func.NATIVE, AllClustersSearch.entity().getId());
+        AllClustersSearch.and("managed", AllClustersSearch.entity().getManagedState(), SearchCriteria.Op.EQ);
+        AllClustersSearch.done();
+
+        HostsInClusterSearch = createSearchBuilder();
+        HostsInClusterSearch.and("resource", HostsInClusterSearch.entity().getResource(), SearchCriteria.Op.NNULL);
+        HostsInClusterSearch.and("cluster", HostsInClusterSearch.entity().getClusterId(), SearchCriteria.Op.EQ);
+        HostsInClusterSearch.and("server", HostsInClusterSearch.entity().getManagementServerId(), SearchCriteria.Op.NNULL);
+        HostsInClusterSearch.done();
+
         _statusAttr = _allAttributes.get("status");
         _msIdAttr = _allAttributes.get("managementServerId");
         _pingTimeAttr = _allAttributes.get("lastPinged");
@@ -326,25 +359,113 @@ public class HostDaoImpl extends GenericDaoBase<HostVO, Long> implements HostDao
         SearchCriteria<HostVO> sc = GuidSearch.create("guid", guid);
         return findOneBy(sc);
     }
-    
+
+    /*
+     * Find hosts which is in Disconnected, Down, Alert and ping timeout and server is not null, set server to null
+     */
+    private void resetHosts(long managementServerId, long lastPingSecondsAfter) {
+        SearchCriteria<HostVO> sc = HostsForReconnectSearch.create();
+        sc.setParameters("server", managementServerId);
+        sc.setParameters("lastPinged", lastPingSecondsAfter);
+        sc.setParameters("status", Status.Disconnected, Status.Down, Status.Alert);
+
+        List<HostVO> hosts = lockRows(sc, null, true); // exclusive lock
+        for (HostVO host : hosts) {
+            host.setManagementServerId(null);
+            update(host.getId(), host);
+        }
+    }
+
+    /*
+     * Returns a list of cluster owned by @managementServerId
+     */
+    private List<Long> findClustersOwnedByManagementServer(long managementServerId) {
+        SearchCriteria<Long> sc = ClustersOwnedByMSSearch.create();
+        sc.setParameters("server", managementServerId);
+
+        List<Long> clusters = customSearch(sc, null);
+        return clusters;
+    }
+
+    /*
+     * Returns a list of all cluster Ids
+     */
+    private List<Long> listAllClusters() {
+        SearchCriteria<Long> sc = AllClustersSearch.create();
+        sc.setParameters("managed", Managed.ManagedState.Managed);
+
+        List<Long> clusters = _clusterDao.customSearch(sc, null);
+        return clusters;
+    }
+
+    /*
+     * This determines if hosts belonging to cluster(@clusterId) are up for grabs
+     *
+     * This is used for handling following cases:
+     * 1. First host added in cluster
+     * 2. During MS restart all hosts in a cluster are without any MS
+     */
+    private boolean canOwnCluster(long clusterId) {
+        SearchCriteria<HostVO> sc = HostsInClusterSearch.create();
+        sc.setParameters("cluster", clusterId);
+
+        List<HostVO> hosts = search(sc, null);
+        boolean ownCluster = (hosts == null || hosts.size() == 0);
+
+        return ownCluster;
+    }
+
     @Override @DB
     public List<HostVO> findAndUpdateDirectAgentToLoad(long lastPingSecondsAfter, Long limit, long managementServerId) {
         Transaction txn = Transaction.currentTxn();
-        txn.start();       
-    	SearchCriteria<HostVO> sc = UnmanagedDirectConnectSearch.create();
-    	sc.setParameters("lastPinged", lastPingSecondsAfter);
-        //sc.setParameters("resourceStates", ResourceState.ErrorInMaintenance, ResourceState.Maintenance, ResourceState.PrepareForMaintenance, ResourceState.Disabled);
+
+        // reset hosts that are suitable candidates for reconnect
+        txn.start();
+        resetHosts(managementServerId, lastPingSecondsAfter);
+        txn.commit();
+
+        List<Long> clusters = findClustersOwnedByManagementServer(managementServerId);
+        List<Long> allClusters = listAllClusters();
+
+        SearchCriteria<HostVO> sc = UnmanagedDirectConnectSearch.create();
+        sc.setParameters("lastPinged", lastPingSecondsAfter);
         sc.setJoinParameters("ClusterManagedSearch", "managed", Managed.ManagedState.Managed);
-        List<HostVO> hosts = lockRows(sc, new Filter(HostVO.class, "clusterId", true, 0L, limit), true);
-        
-        for (HostVO host : hosts) {
-            host.setManagementServerId(managementServerId);
-            update(host.getId(), host);
+        List<HostVO> assignedHosts = new ArrayList<HostVO>();
+        List<Long> remainingClusters = new ArrayList<Long>();
+
+        // handle clusters already owned by @managementServerId
+        txn.start();
+        for (Long clusterId : allClusters) {
+            if (clusters.contains(clusterId)) { // host belongs to clusters owned by @managementServerId
+                sc.setParameters("cluster", clusterId);
+                List<HostVO> unmanagedHosts = lockRows(sc, null, true);
+                for (HostVO host : unmanagedHosts) {
+                    host.setManagementServerId(managementServerId);
+                    update(host.getId(), host);
+                    assignedHosts.add(host);
+                }
+            } else {
+                remainingClusters.add(clusterId);
+            }
         }
-        
         txn.commit();
-        
-        return hosts;
+
+        // for remaining clusters check if they can be owned
+        for (Long clusterId : remainingClusters) {
+            txn.start();
+            sc.setParameters("cluster", clusterId);
+            List<HostVO> unmanagedHosts = lockRows(sc, null, true);
+            if (canOwnCluster(clusterId)) { // cluster is not owned by any other MS, so @managementServerId can own it
+                for (HostVO host : unmanagedHosts) {
+                    host.setManagementServerId(managementServerId);
+                    update(host.getId(), host);
+                    assignedHosts.add(host);
+                }
+            }
+            txn.commit();
+        }
+
+        return assignedHosts;
     }
     
     @Override @DB

http://git-wip-us.apache.org/repos/asf/incubator-cloudstack/blob/777147ce/server/src/com/cloud/resource/ResourceManagerImpl.java
----------------------------------------------------------------------
diff --git a/server/src/com/cloud/resource/ResourceManagerImpl.java b/server/src/com/cloud/resource/ResourceManagerImpl.java
index f82424a..9e9b687 100755
--- a/server/src/com/cloud/resource/ResourceManagerImpl.java
+++ b/server/src/com/cloud/resource/ResourceManagerImpl.java
@@ -54,6 +54,7 @@ import com.cloud.agent.api.StartupRoutingCommand;
 import com.cloud.agent.api.UnsupportedAnswer;
 import com.cloud.agent.api.UpdateHostPasswordCommand;
 import com.cloud.agent.manager.AgentAttache;
+import com.cloud.agent.manager.ClusteredAgentManagerImpl;
 import com.cloud.agent.manager.allocator.PodAllocator;
 import com.cloud.agent.transport.Request;
 import org.apache.cloudstack.api.ApiConstants;
@@ -136,6 +137,8 @@ import com.cloud.utils.component.Adapters;
 import com.cloud.utils.component.Inject;
 import com.cloud.utils.component.Manager;
 import com.cloud.utils.db.DB;
+import com.cloud.utils.db.GlobalLock;
+import com.cloud.utils.db.SearchBuilder;
 import com.cloud.utils.db.SearchCriteria;
 import com.cloud.utils.db.SearchCriteria.Op;
 import com.cloud.utils.db.SearchCriteria2;
@@ -227,6 +230,8 @@ public class ResourceManagerImpl implements ResourceManager, ResourceService, Ma
     protected HashMap<Integer, List<ResourceListener>> _lifeCycleListeners = new HashMap<Integer, List<ResourceListener>>();
     private HypervisorType _defaultSystemVMHypervisor;
 
+    private static final int ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION = 30; // seconds
+
     private void insertListener(Integer event, ResourceListener listener) {
         List<ResourceListener> lst = _lifeCycleListeners.get(event);
         if (lst == null) {
@@ -544,14 +549,14 @@ public class ResourceManagerImpl implements ResourceManager, ResourceService, Ma
             }
         }
 
-        return discoverHostsFull(dcId, podId, clusterId, clusterName, url, username, password, cmd.getHypervisor(), hostTags, cmd.getFullUrlParams());
+        return discoverHostsFull(dcId, podId, clusterId, clusterName, url, username, password, cmd.getHypervisor(), hostTags, cmd.getFullUrlParams(), true);
     }
 
     @Override
     public List<? extends Host> discoverHosts(AddSecondaryStorageCmd cmd) throws IllegalArgumentException, DiscoveryException, InvalidParameterValueException {
         Long dcId = cmd.getZoneId();
         String url = cmd.getUrl();
-        return discoverHostsFull(dcId, null, null, null, url, null, null, "SecondaryStorage", null, null);
+        return discoverHostsFull(dcId, null, null, null, url, null, null, "SecondaryStorage", null, null, false);
     }
 
     @Override
@@ -576,7 +581,7 @@ public class ResourceManagerImpl implements ResourceManager, ResourceService, Ma
     }
 
     private List<HostVO> discoverHostsFull(Long dcId, Long podId, Long clusterId, String clusterName, String url, String username, String password, String hypervisorType, List<String> hostTags,
-            Map<String, String> params) throws IllegalArgumentException, DiscoveryException, InvalidParameterValueException {
+            Map<String, String> params, boolean deferAgentCreation) throws IllegalArgumentException, DiscoveryException, InvalidParameterValueException {
         URI uri = null;
 
         // Check if the zone exists in the system
@@ -731,7 +736,12 @@ public class ResourceManagerImpl implements ResourceManager, ResourceService, Ma
                         return null;
                     }
 
-                    HostVO host = (HostVO)createHostAndAgent(resource, entry.getValue(), true, hostTags, false);
+                    HostVO host = null;
+                    if (deferAgentCreation) {
+                        host = (HostVO)createHostAndAgentDeferred(resource, entry.getValue(), true, hostTags, false);
+                    } else {
+                        host = (HostVO)createHostAndAgent(resource, entry.getValue(), true, hostTags, false);
+                    }
                     if (host != null) {
                         hosts.add(host);
                     }
@@ -1602,6 +1612,25 @@ public class ResourceManagerImpl implements ResourceManager, ResourceService, Ma
         return host;
     }
 
+    private boolean isFirstHostInCluster(HostVO host)
+    {
+        boolean isFirstHost = true;
+        if (host.getClusterId() != null) {
+            SearchBuilder<HostVO> sb = _hostDao.createSearchBuilder();
+            sb.and("removed", sb.entity().getRemoved(), SearchCriteria.Op.NULL);
+            sb.and("cluster", sb.entity().getClusterId(), SearchCriteria.Op.EQ);
+            sb.done();
+            SearchCriteria<HostVO> sc = sb.create();
+            sc.setParameters("cluster", host.getClusterId());
+
+            List<HostVO> hosts = _hostDao.search(sc, null);
+            if (hosts != null && hosts.size() > 1) {
+                isFirstHost = false;
+            }
+        }
+        return isFirstHost;
+    }
+
     private Host createHostAndAgent(ServerResource resource, Map<String, String> details, boolean old, List<String> hostTags,
             boolean forRebalance) {
         HostVO host = null;
@@ -1676,6 +1705,129 @@ public class ResourceManagerImpl implements ResourceManager, ResourceService, Ma
         return host;
     }
 
+    private Host createHostAndAgentDeferred(ServerResource resource, Map<String, String> details, boolean old, List<String> hostTags,
+            boolean forRebalance) {
+        HostVO host = null;
+        AgentAttache attache = null;
+        StartupCommand[] cmds = null;
+        boolean hostExists = false;
+        boolean deferAgentCreation = true;
+
+        try {
+            cmds = resource.initialize();
+            if (cmds == null) {
+                s_logger.info("Unable to fully initialize the agent because no StartupCommands are returned");
+                return null;
+            }
+
+            /* Generate a random version in a dev setup situation */
+            if ( this.getClass().getPackage().getImplementationVersion() == null ) {
+                for ( StartupCommand cmd : cmds ) {
+                    if ( cmd.getVersion() == null ) {
+                        cmd.setVersion(Long.toString(System.currentTimeMillis()));
+                    }
+                }
+            }
+
+            if (s_logger.isDebugEnabled()) {
+                new Request(-1l, -1l, cmds, true, false).logD("Startup request from directly connected host: ", true);
+            }
+
+            if (old) {
+                StartupCommand firstCmd = cmds[0];
+                host = findHostByGuid(firstCmd.getGuid());
+                if (host == null) {
+                    host = findHostByGuid(firstCmd.getGuidWithoutResource());
+                }
+                if (host != null && host.getRemoved() == null) { // host already added, no need to add again
+                    s_logger.debug("Found the host " + host.getId() + " by guid: " + firstCmd.getGuid() + ", old host reconnected as new");
+                    hostExists = true; // ensures that host status is left unchanged in case of adding same one again
+                    return null;
+                }
+            }
+
+            host = null;
+            GlobalLock addHostLock = GlobalLock.getInternLock("AddHostLock");
+            try {
+                if (addHostLock.lock(ACQUIRE_GLOBAL_LOCK_TIMEOUT_FOR_COOPERATION)) { // to safely determine first host in cluster in multi-MS scenario
+                    try {
+                        host = createHostVO(cmds, resource, details, hostTags, ResourceStateAdapter.Event.CREATE_HOST_VO_FOR_DIRECT_CONNECT);
+                        if (host != null) {
+                            deferAgentCreation = !isFirstHostInCluster(host); // if first host in cluster no need to defer agent creation
+                        }
+                    } finally {
+                        addHostLock.unlock();
+                    }
+                }
+            } finally {
+                addHostLock.releaseRef();
+            }
+
+            if (host != null) {
+                if (!deferAgentCreation) { // if first host in cluster then create agent otherwise defer it to scan task
+                    attache = _agentMgr.handleDirectConnectAgent(host, cmds, resource, forRebalance);
+                    host = _hostDao.findById(host.getId()); // reload
+                } else {
+                    host = _hostDao.findById(host.getId()); // reload
+                    // force host status to 'Alert' so that it is loaded for connection during next scan task
+                    _agentMgr.agentStatusTransitTo(host, Status.Event.AgentDisconnected, _nodeId);
+
+                    host = _hostDao.findById(host.getId()); // reload
+                    host.setLastPinged(0); // so that scan task can pick it up
+                    _hostDao.update(host.getId(), host);
+
+                    // schedule a scan task immediately
+                    if (_agentMgr instanceof ClusteredAgentManagerImpl) {
+                        ClusteredAgentManagerImpl clusteredAgentMgr = (ClusteredAgentManagerImpl)_agentMgr;
+                        if (s_logger.isDebugEnabled()) {
+                            s_logger.debug("Scheduling a host scan task");
+                        }
+                        // schedule host scan task on current MS
+                        clusteredAgentMgr.scheduleHostScanTask();
+                        if (s_logger.isDebugEnabled()) {
+                            s_logger.debug("Notifying all peer MS to schedule host scan task");
+                        }
+                        // notify peers to schedule a host scan task as well
+                        clusteredAgentMgr.notifyNodesInClusterToScheduleHostScanTask();
+                    }
+                }
+            }
+        } catch (Exception e) {
+            s_logger.warn("Unable to connect due to ", e);
+        } finally {
+            if (hostExists) {
+                if (cmds != null) {
+                    resource.disconnected();
+                }
+            } else {
+                if (!deferAgentCreation && attache == null) {
+                    if (cmds != null) {
+                        resource.disconnected();
+                    }
+
+                    // In case of some db errors, we may land with the situation that host is null. We need to reload host from db and call disconnect on it so that it will be loaded for reconnection next time
+                    HostVO tempHost = host;
+                    if (tempHost == null) {
+                        if (cmds != null) {
+                            StartupCommand firstCmd = cmds[0];
+                            tempHost = findHostByGuid(firstCmd.getGuid());
+                            if (tempHost == null) {
+                                tempHost = findHostByGuid(firstCmd.getGuidWithoutResource());
+                            }
+                        }
+                    }
+                    if (tempHost != null) {
+                        /* Change agent status to Alert */
+                        _agentMgr.agentStatusTransitTo(tempHost, Status.Event.AgentDisconnected, _nodeId);
+                        /* Don't change resource state here since HostVO is already in database, which means resource state has had an appropriate value*/
+                    }
+                }
+            }
+        }
+
+        return host;
+    }
+
     @Override
     public Host createHostAndAgent(Long hostId, ServerResource resource, Map<String, String> details, boolean old, List<String> hostTags, boolean forRebalance) {
         _agentMgr.tapLoadingAgents(hostId, TapAgentsAction.Add);