You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cloudstack.apache.org by ek...@apache.org on 2015/09/11 14:56:26 UTC
[2/5] git commit: updated refs/heads/master to a04b8f6
CLOUDSTACK-8822 - Replacing Runnable by Callable in the Taks and NioConnection classes
- All the sub-classes were also updated according to the changes in the super-classes
- There were also code formatting changes
Project: http://git-wip-us.apache.org/repos/asf/cloudstack/repo
Commit: http://git-wip-us.apache.org/repos/asf/cloudstack/commit/79a3f8c5
Tree: http://git-wip-us.apache.org/repos/asf/cloudstack/tree/79a3f8c5
Diff: http://git-wip-us.apache.org/repos/asf/cloudstack/diff/79a3f8c5
Branch: refs/heads/master
Commit: 79a3f8c5774c50dc128968c29da5096dc3dde39e
Parents: 2d90f18
Author: wilderrodrigues <wr...@schubergphilis.com>
Authored: Tue Sep 8 12:12:55 2015 +0200
Committer: wilderrodrigues <wr...@schubergphilis.com>
Committed: Fri Sep 11 11:28:40 2015 +0200
----------------------------------------------------------------------
agent/src/com/cloud/agent/Agent.java | 125 ++++---
.../cloud/agent/manager/AgentManagerImpl.java | 18 +-
.../manager/ClusteredAgentManagerImpl.java | 363 ++++++++++---------
.../java/com/cloud/utils/SerialVersionUID.java | 2 +
.../utils/exception/NioConnectionException.java | 48 +++
.../utils/exception/TaskExecutionException.java | 48 +++
.../java/com/cloud/utils/nio/NioClient.java | 32 +-
.../java/com/cloud/utils/nio/NioConnection.java | 304 +++++++++-------
.../java/com/cloud/utils/nio/NioServer.java | 10 +-
.../src/main/java/com/cloud/utils/nio/Task.java | 25 +-
.../java/com/cloud/utils/testcase/NioTest.java | 35 +-
11 files changed, 582 insertions(+), 428 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/agent/src/com/cloud/agent/Agent.java
----------------------------------------------------------------------
diff --git a/agent/src/com/cloud/agent/Agent.java b/agent/src/com/cloud/agent/Agent.java
index ac2d9ba..e3510c4 100644
--- a/agent/src/com/cloud/agent/Agent.java
+++ b/agent/src/com/cloud/agent/Agent.java
@@ -35,9 +35,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.naming.ConfigurationException;
-import org.apache.log4j.Logger;
-
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
+import org.apache.log4j.Logger;
import com.cloud.agent.api.AgentControlAnswer;
import com.cloud.agent.api.AgentControlCommand;
@@ -59,6 +58,8 @@ import com.cloud.utils.PropertiesUtil;
import com.cloud.utils.backoff.BackoffAlgorithm;
import com.cloud.utils.concurrency.NamedThreadFactory;
import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.exception.NioConnectionException;
+import com.cloud.utils.exception.TaskExecutionException;
import com.cloud.utils.nio.HandlerFactory;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.NioClient;
@@ -121,11 +122,11 @@ public class Agent implements HandlerFactory, IAgentControl {
long _startupWait = _startupWaitDefault;
boolean _reconnectAllowed = true;
//For time sentitive task, e.g. PingTask
- private ThreadPoolExecutor _ugentTaskPool;
+ private final ThreadPoolExecutor _ugentTaskPool;
ExecutorService _executor;
// for simulator use only
- public Agent(IAgentShell shell) {
+ public Agent(final IAgentShell shell) {
_shell = shell;
_link = null;
@@ -134,29 +135,29 @@ public class Agent implements HandlerFactory, IAgentControl {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
_ugentTaskPool =
- new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
- "UgentTask"));
+ new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
+ "UgentTask"));
_executor =
- new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
- "agentRequest-Handler"));
+ new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
+ "agentRequest-Handler"));
}
- public Agent(IAgentShell shell, int localAgentId, ServerResource resource) throws ConfigurationException {
+ public Agent(final IAgentShell shell, final int localAgentId, final ServerResource resource) throws ConfigurationException {
_shell = shell;
_resource = resource;
_link = null;
resource.setAgentControl(this);
- String value = _shell.getPersistentProperty(getResourceName(), "id");
+ final String value = _shell.getPersistentProperty(getResourceName(), "id");
_id = value != null ? Long.parseLong(value) : null;
- s_logger.info("id is " + ((_id != null) ? _id : ""));
+ s_logger.info("id is " + (_id != null ? _id : ""));
final Map<String, Object> params = PropertiesUtil.toMap(_shell.getProperties());
// merge with properties from command line to let resource access command line parameters
- for (Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) {
+ for (final Map.Entry<String, Object> cmdLineProp : _shell.getCmdLineProperties().entrySet()) {
params.put(cmdLineProp.getKey(), cmdLineProp.getValue());
}
@@ -172,15 +173,15 @@ public class Agent implements HandlerFactory, IAgentControl {
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
_ugentTaskPool =
- new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
- "UgentTask"));
+ new ThreadPoolExecutor(shell.getPingRetries(), 2 * shell.getPingRetries(), 10, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), new NamedThreadFactory(
+ "UgentTask"));
_executor =
- new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
- "agentRequest-Handler"));
+ new ThreadPoolExecutor(_shell.getWorkers(), 5 * _shell.getWorkers(), 1, TimeUnit.DAYS, new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory(
+ "agentRequest-Handler"));
s_logger.info("Agent [id = " + (_id != null ? _id : "new") + " : type = " + getResourceName() + " : zone = " + _shell.getZone() + " : pod = " + _shell.getPod() +
- " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
+ " : workers = " + _shell.getWorkers() + " : host = " + _shell.getHost() + " : port = " + _shell.getPort());
}
public String getVersion() {
@@ -188,7 +189,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public String getResourceGuid() {
- String guid = _shell.getGuid();
+ final String guid = _shell.getGuid();
return guid + "-" + getResourceName();
}
@@ -222,11 +223,19 @@ public class Agent implements HandlerFactory, IAgentControl {
throw new CloudRuntimeException("Unable to start the resource: " + _resource.getName());
}
- _connection.start();
+ try {
+ _connection.start();
+ } catch (final NioConnectionException e) {
+ throw new CloudRuntimeException("Unable to start the connection!", e);
+ }
while (!_connection.isStartup()) {
_shell.getBackoffAlgorithm().waitBeforeRetry();
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
- _connection.start();
+ try {
+ _connection.start();
+ } catch (final NioConnectionException e) {
+ throw new CloudRuntimeException("Unable to start the connection!", e);
+ }
}
}
@@ -236,12 +245,12 @@ public class Agent implements HandlerFactory, IAgentControl {
final ShutdownCommand cmd = new ShutdownCommand(reason, detail);
try {
if (_link != null) {
- Request req = new Request((_id != null ? _id : -1), -1, cmd, false);
+ final Request req = new Request(_id != null ? _id : -1, -1, cmd, false);
_link.send(req.toBytes());
}
} catch (final ClosedChannelException e) {
s_logger.warn("Unable to send: " + cmd.toString());
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.warn("Unable to send: " + cmd.toString() + " due to exception: ", e);
}
s_logger.debug("Sending shutdown to management server");
@@ -294,13 +303,13 @@ public class Agent implements HandlerFactory, IAgentControl {
_watchList.clear();
}
}
- public synchronized void lockStartupTask(Link link)
+ public synchronized void lockStartupTask(final Link link)
{
_startup = new StartupTask(link);
_timer.schedule(_startup, _startupWait);
}
- public void sendStartup(Link link) {
+ public void sendStartup(final Link link) {
final StartupCommand[] startup = _resource.initialize();
if (startup != null) {
final Command[] commands = new Command[startup.length];
@@ -323,7 +332,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
- protected void setupStartupCommand(StartupCommand startup) {
+ protected void setupStartupCommand(final StartupCommand startup) {
InetAddress addr;
try {
addr = InetAddress.getLocalHost();
@@ -349,7 +358,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
- public Task create(Task.Type type, Link link, byte[] data) {
+ public Task create(final Task.Type type, final Link link, final byte[] data) {
return new ServerHandler(type, link, data);
}
@@ -391,19 +400,23 @@ public class Agent implements HandlerFactory, IAgentControl {
try {
_connection.cleanUp();
- } catch (IOException e) {
+ } catch (final IOException e) {
s_logger.warn("Fail to clean up old connection. " + e);
}
_connection = new NioClient("Agent", _shell.getHost(), _shell.getPort(), _shell.getWorkers(), this);
do {
s_logger.info("Reconnecting...");
- _connection.start();
+ try {
+ _connection.start();
+ } catch (final NioConnectionException e) {
+ throw new CloudRuntimeException("Unable to start the connection!", e);
+ }
_shell.getBackoffAlgorithm().waitBeforeRetry();
} while (!_connection.isStartup());
s_logger.info("Connected to the server");
}
- public void processStartupAnswer(Answer answer, Response response, Link link) {
+ public void processStartupAnswer(final Answer answer, final Response response, final Link link) {
boolean cancelled = false;
synchronized (this) {
if (_startup != null) {
@@ -450,7 +463,7 @@ public class Agent implements HandlerFactory, IAgentControl {
if (s_logger.isDebugEnabled()) {
if (!requestLogged) // ensures request is logged only once per method call
{
- String requestMsg = request.toString();
+ final String requestMsg = request.toString();
if (requestMsg != null) {
s_logger.debug("Request:" + requestMsg);
}
@@ -464,7 +477,7 @@ public class Agent implements HandlerFactory, IAgentControl {
scheduleWatch(link, request, (long)watch.getInterval() * 1000, watch.getInterval() * 1000);
answer = new Answer(cmd, true, null);
} else if (cmd instanceof ShutdownCommand) {
- ShutdownCommand shutdown = (ShutdownCommand)cmd;
+ final ShutdownCommand shutdown = (ShutdownCommand)cmd;
s_logger.debug("Received shutdownCommand, due to: " + shutdown.getReason());
cancelTasks();
_reconnectAllowed = false;
@@ -481,7 +494,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else if (cmd instanceof AgentControlCommand) {
answer = null;
synchronized (_controlListeners) {
- for (IAgentControlListener listener : _controlListeners) {
+ for (final IAgentControlListener listener : _controlListeners) {
answer = listener.processControlRequest(request, (AgentControlCommand)cmd);
if (answer != null) {
break;
@@ -527,7 +540,7 @@ public class Agent implements HandlerFactory, IAgentControl {
response = new Response(request, answers);
} finally {
if (s_logger.isDebugEnabled()) {
- String responseMsg = response.toString();
+ final String responseMsg = response.toString();
if (responseMsg != null) {
s_logger.debug(response.toString());
}
@@ -553,7 +566,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else if (answer instanceof AgentControlAnswer) {
// Notice, we are doing callback while holding a lock!
synchronized (_controlListeners) {
- for (IAgentControlListener listener : _controlListeners) {
+ for (final IAgentControlListener listener : _controlListeners) {
listener.processControlResponse(response, (AgentControlAnswer)answer);
}
}
@@ -562,7 +575,7 @@ public class Agent implements HandlerFactory, IAgentControl {
}
}
- public void processReadyCommand(Command cmd) {
+ public void processReadyCommand(final Command cmd) {
final ReadyCommand ready = (ReadyCommand)cmd;
@@ -574,10 +587,10 @@ public class Agent implements HandlerFactory, IAgentControl {
}
- public void processOtherTask(Task task) {
+ public void processOtherTask(final Task task) {
final Object obj = task.get();
if (obj instanceof Response) {
- if ((System.currentTimeMillis() - _lastPingResponseTime) > _pingInterval * _shell.getPingRetries()) {
+ if (System.currentTimeMillis() - _lastPingResponseTime > _pingInterval * _shell.getPingRetries()) {
s_logger.error("Ping Interval has gone past " + _pingInterval * _shell.getPingRetries() + ". Won't reconnect to mgt server, as connection is still alive");
return;
}
@@ -633,25 +646,25 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
- public void registerControlListener(IAgentControlListener listener) {
+ public void registerControlListener(final IAgentControlListener listener) {
synchronized (_controlListeners) {
_controlListeners.add(listener);
}
}
@Override
- public void unregisterControlListener(IAgentControlListener listener) {
+ public void unregisterControlListener(final IAgentControlListener listener) {
synchronized (_controlListeners) {
_controlListeners.remove(listener);
}
}
@Override
- public AgentControlAnswer sendRequest(AgentControlCommand cmd, int timeoutInMilliseconds) throws AgentControlChannelException {
- Request request = new Request(this.getId(), -1, new Command[] {cmd}, true, false);
+ public AgentControlAnswer sendRequest(final AgentControlCommand cmd, final int timeoutInMilliseconds) throws AgentControlChannelException {
+ final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false);
request.setSequence(getNextSequence());
- AgentControlListener listener = new AgentControlListener(request);
+ final AgentControlListener listener = new AgentControlListener(request);
registerControlListener(listener);
try {
@@ -659,7 +672,7 @@ public class Agent implements HandlerFactory, IAgentControl {
synchronized (listener) {
try {
listener.wait(timeoutInMilliseconds);
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
s_logger.warn("sendRequest is interrupted, exit waiting");
}
}
@@ -671,13 +684,13 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
- public void postRequest(AgentControlCommand cmd) throws AgentControlChannelException {
- Request request = new Request(this.getId(), -1, new Command[] {cmd}, true, false);
+ public void postRequest(final AgentControlCommand cmd) throws AgentControlChannelException {
+ final Request request = new Request(getId(), -1, new Command[] {cmd}, true, false);
request.setSequence(getNextSequence());
postRequest(request);
}
- private void postRequest(Request request) throws AgentControlChannelException {
+ private void postRequest(final Request request) throws AgentControlChannelException {
if (_link != null) {
try {
_link.send(request.toBytes());
@@ -694,7 +707,7 @@ public class Agent implements HandlerFactory, IAgentControl {
private AgentControlAnswer _answer;
private final Request _request;
- public AgentControlListener(Request request) {
+ public AgentControlListener(final Request request) {
_request = request;
}
@@ -703,12 +716,12 @@ public class Agent implements HandlerFactory, IAgentControl {
}
@Override
- public Answer processControlRequest(Request request, AgentControlCommand cmd) {
+ public Answer processControlRequest(final Request request, final AgentControlCommand cmd) {
return null;
}
@Override
- public void processControlResponse(Response response, AgentControlAnswer answer) {
+ public void processControlResponse(final Response response, final AgentControlAnswer answer) {
if (_request.getSequence() == response.getSequence()) {
_answer = answer;
synchronized (this) {
@@ -797,13 +810,13 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public class AgentRequestHandler extends Task {
- public AgentRequestHandler(Task.Type type, Link link, Request req) {
+ public AgentRequestHandler(final Task.Type type, final Link link, final Request req) {
super(type, link, req);
}
@Override
- protected void doTask(Task task) throws Exception {
- Request req = (Request)this.get();
+ protected void doTask(final Task task) throws TaskExecutionException {
+ final Request req = (Request)get();
if (!(req instanceof Response)) {
processRequest(req, task.getLink());
}
@@ -811,16 +824,16 @@ public class Agent implements HandlerFactory, IAgentControl {
}
public class ServerHandler extends Task {
- public ServerHandler(Task.Type type, Link link, byte[] data) {
+ public ServerHandler(final Task.Type type, final Link link, final byte[] data) {
super(type, link, data);
}
- public ServerHandler(Task.Type type, Link link, Request req) {
+ public ServerHandler(final Task.Type type, final Link link, final Request req) {
super(type, link, req);
}
@Override
- public void doTask(final Task task) {
+ public void doTask(final Task task) throws TaskExecutionException {
if (task.getType() == Task.Type.CONNECT) {
_shell.getBackoffAlgorithm().reset();
setLink(task.getLink());
@@ -835,7 +848,7 @@ public class Agent implements HandlerFactory, IAgentControl {
} else {
//put the requests from mgt server into another thread pool, as the request may take a longer time to finish. Don't block the NIO main thread pool
//processRequest(request, task.getLink());
- _executor.execute(new AgentRequestHandler(this.getType(), this.getLink(), request));
+ _executor.submit(new AgentRequestHandler(getType(), getLink(), request));
}
} catch (final ClassNotFoundException e) {
s_logger.error("Unable to find this request ");
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
index a38fd08..ef8a373 100644
--- a/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/AgentManagerImpl.java
@@ -103,6 +103,8 @@ import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.exception.CloudRuntimeException;
import com.cloud.utils.exception.HypervisorVersionChangedException;
+import com.cloud.utils.exception.NioConnectionException;
+import com.cloud.utils.exception.TaskExecutionException;
import com.cloud.utils.fsm.NoTransitionException;
import com.cloud.utils.fsm.StateMachine2;
import com.cloud.utils.nio.HandlerFactory;
@@ -593,7 +595,11 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
startDirectlyConnectedHosts();
if (_connection != null) {
- _connection.start();
+ try {
+ _connection.start();
+ } catch (final NioConnectionException e) {
+ s_logger.error("Error when connecting to the NioServer!", e);
+ }
}
_monitorExecutor.scheduleWithFixedDelay(new MonitorTask(), PingInterval.value(), PingInterval.value(), TimeUnit.SECONDS);
@@ -827,7 +833,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
Status determinedState = investigate(attache);
// if state cannot be determined do nothing and bail out
if (determinedState == null) {
- if (((System.currentTimeMillis() >> 10) - host.getLastPinged()) > AlertWait.value()) {
+ if ((System.currentTimeMillis() >> 10) - host.getLastPinged() > AlertWait.value()) {
s_logger.warn("Agent " + hostId + " state cannot be determined for more than " + AlertWait + "(" + AlertWait.value() + ") seconds, will go to Alert state");
determinedState = Status.Alert;
} else {
@@ -840,7 +846,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
s_logger.info("The agent " + hostId + " state determined is " + determinedState);
if (determinedState == Status.Down) {
- String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs";
+ final String message = "Host is down: " + host.getId() + "-" + host.getName() + ". Starting HA on the VMs";
s_logger.error(message);
if (host.getType() != Host.Type.SecondaryStorage && host.getType() != Host.Type.ConsoleProxy) {
_alertMgr.sendAlert(AlertManager.AlertType.ALERT_TYPE_HOST, host.getDataCenterId(), host.getPodId(), "Host down, " + host.getId(), message);
@@ -1299,7 +1305,7 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
}
@Override
- protected void doTask(final Task task) throws Exception {
+ protected void doTask(final Task task) throws TaskExecutionException {
final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
try {
final Type type = task.getType();
@@ -1315,6 +1321,10 @@ public class AgentManagerImpl extends ManagerBase implements AgentManager, Handl
} catch (final UnsupportedVersionException e) {
s_logger.warn(e.getMessage());
// upgradeAgent(task.getLink(), data, e.getReason());
+ } catch (final ClassNotFoundException e) {
+ final String message = String.format("Exception occured when executing taks! Error '%s'", e.getMessage());
+ s_logger.error(message);
+ throw new TaskExecutionException(message, e);
}
} else if (type == Task.Type.CONNECT) {
} else if (type == Task.Type.DISCONNECT) {
http://git-wip-us.apache.org/repos/asf/cloudstack/blob/79a3f8c5/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
----------------------------------------------------------------------
diff --git a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
index 2bc4f68..04be1ab 100644
--- a/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
+++ b/engine/orchestration/src/com/cloud/agent/manager/ClusteredAgentManagerImpl.java
@@ -43,10 +43,6 @@ import javax.naming.ConfigurationException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
-import org.apache.log4j.Logger;
-
-import com.google.gson.Gson;
-
import org.apache.cloudstack.framework.config.ConfigDepot;
import org.apache.cloudstack.framework.config.ConfigKey;
import org.apache.cloudstack.framework.config.dao.ConfigurationDao;
@@ -54,6 +50,7 @@ import org.apache.cloudstack.managed.context.ManagedContextRunnable;
import org.apache.cloudstack.managed.context.ManagedContextTimerTask;
import org.apache.cloudstack.utils.identity.ManagementServerNode;
import org.apache.cloudstack.utils.security.SSLUtils;
+import org.apache.log4j.Logger;
import com.cloud.agent.AgentManager;
import com.cloud.agent.api.Answer;
@@ -93,8 +90,10 @@ import com.cloud.utils.db.QueryBuilder;
import com.cloud.utils.db.SearchCriteria.Op;
import com.cloud.utils.db.TransactionLegacy;
import com.cloud.utils.exception.CloudRuntimeException;
+import com.cloud.utils.exception.TaskExecutionException;
import com.cloud.utils.nio.Link;
import com.cloud.utils.nio.Task;
+import com.google.gson.Gson;
@Local(value = {AgentManager.class, ClusteredAgentRebalanceService.class})
public class ClusteredAgentManagerImpl extends AgentManagerImpl implements ClusterManagerListener, ClusteredAgentRebalanceService {
@@ -139,7 +138,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
"Interval between scans to load agents", false, ConfigKey.Scope.Global, 1000);
@Override
- public boolean configure(String name, Map<String, Object> xmlParams) throws ConfigurationException {
+ public boolean configure(final String name, final Map<String, Object> xmlParams) throws ConfigurationException {
_peers = new HashMap<String, SocketChannel>(7);
_sslEngines = new HashMap<String, SSLEngine>(7);
_nodeId = ManagementServerNode.getManagementServerId();
@@ -192,17 +191,17 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
// for agents that are self-managed, threshold to be considered as disconnected after pingtimeout
- long cutSeconds = (System.currentTimeMillis() >> 10) - getTimeout();
- List<HostVO> hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, LoadSize.value().longValue(), _nodeId);
- List<HostVO> appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId);
+ final long cutSeconds = (System.currentTimeMillis() >> 10) - getTimeout();
+ final List<HostVO> hosts = _hostDao.findAndUpdateDirectAgentToLoad(cutSeconds, LoadSize.value().longValue(), _nodeId);
+ final List<HostVO> appliances = _hostDao.findAndUpdateApplianceToLoad(cutSeconds, _nodeId);
- if (hosts != null) {
+ if (hosts != null) {
hosts.addAll(appliances);
if (hosts.size() > 0) {
s_logger.debug("Found " + hosts.size() + " unmanaged direct hosts, processing connect for them...");
- for (HostVO host : hosts) {
+ for (final HostVO host : hosts) {
try {
- AgentAttache agentattache = findAttache(host.getId());
+ final AgentAttache agentattache = findAttache(host.getId());
if (agentattache != null) {
// already loaded, skip
if (agentattache.forForward()) {
@@ -219,7 +218,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.debug("Loading directly connected host " + host.getId() + "(" + host.getName() + ")");
}
loadDirectlyConnectedHost(host, false);
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.warn(" can not load directly connected host " + host.getId() + "(" + host.getName() + ") due to ", e);
}
}
@@ -235,20 +234,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
protected void runInContext() {
try {
runDirectAgentScanTimerTask();
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.error("Unexpected exception " + e.getMessage(), e);
}
}
}
@Override
- public Task create(Task.Type type, Link link, byte[] data) {
+ public Task create(final Task.Type type, final Link link, final byte[] data) {
return new ClusteredAgentHandler(type, link, data);
}
- protected AgentAttache createAttache(long id) {
+ protected AgentAttache createAttache(final long id) {
s_logger.debug("create forwarding ClusteredAgentAttache for " + id);
- HostVO host = _hostDao.findById(id);
+ final HostVO host = _hostDao.findById(id);
final AgentAttache attache = new ClusteredAgentAttache(this, id, host.getName());
AgentAttache old = null;
synchronized (_agents) {
@@ -265,7 +264,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
- protected AgentAttache createAttacheForConnect(HostVO host, Link link) {
+ protected AgentAttache createAttacheForConnect(final HostVO host, final Link link) {
s_logger.debug("create ClusteredAgentAttache for " + host.getId());
final AgentAttache attache = new ClusteredAgentAttache(this, host.getId(), host.getName(), link, host.isInMaintenanceStates());
link.attach(attache);
@@ -281,7 +280,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
- protected AgentAttache createAttacheForDirectConnect(Host host, ServerResource resource) {
+ protected AgentAttache createAttacheForDirectConnect(final Host host, final ServerResource resource) {
s_logger.debug("create ClusteredDirectAgentAttache for " + host.getId());
final DirectAgentAttache attache = new ClusteredDirectAgentAttache(this, host.getId(), host.getName(), _nodeId, resource, host.isInMaintenanceStates());
AgentAttache old = null;
@@ -296,16 +295,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
- protected boolean handleDisconnectWithoutInvestigation(AgentAttache attache, Status.Event event, boolean transitState, boolean removeAgent) {
+ protected boolean handleDisconnectWithoutInvestigation(final AgentAttache attache, final Status.Event event, final boolean transitState, final boolean removeAgent) {
return handleDisconnect(attache, event, false, true, removeAgent);
}
@Override
- protected boolean handleDisconnectWithInvestigation(AgentAttache attache, Status.Event event) {
+ protected boolean handleDisconnectWithInvestigation(final AgentAttache attache, final Status.Event event) {
return handleDisconnect(attache, event, true, true, true);
}
- protected boolean handleDisconnect(AgentAttache agent, Status.Event event, boolean investigate, boolean broadcast, boolean removeAgent) {
+ protected boolean handleDisconnect(final AgentAttache agent, final Status.Event event, final boolean investigate, final boolean broadcast, final boolean removeAgent) {
boolean res;
if (!investigate) {
res = super.handleDisconnectWithoutInvestigation(agent, event, true, removeAgent);
@@ -324,16 +323,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
- public boolean executeUserRequest(long hostId, Event event) throws AgentUnavailableException {
+ public boolean executeUserRequest(final long hostId, final Event event) throws AgentUnavailableException {
if (event == Event.AgentDisconnected) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Received agent disconnect event for host " + hostId);
}
- AgentAttache attache = findAttache(hostId);
+ final AgentAttache attache = findAttache(hostId);
if (attache != null) {
// don't process disconnect if the host is being rebalanced
if (isAgentRebalanceEnabled()) {
- HostTransferMapVO transferVO = _hostTransferDao.findById(hostId);
+ final HostTransferMapVO transferVO = _hostTransferDao.findById(hostId);
if (transferVO != null) {
if (transferVO.getFutureOwner() == _nodeId && transferVO.getState() == HostTransferState.TransferStarted) {
s_logger.debug("Not processing " + Event.AgentDisconnected + " event for the host id=" + hostId + " as the host is being connected to " +
@@ -368,7 +367,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (result != null) {
return result;
}
- } catch (AgentUnavailableException e) {
+ } catch (final AgentUnavailableException e) {
s_logger.debug("cannot propagate agent reconnect because agent is not available", e);
return false;
}
@@ -376,9 +375,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return super.reconnect(hostId);
}
- public void notifyNodesInCluster(AgentAttache attache) {
+ public void notifyNodesInCluster(final AgentAttache attache) {
s_logger.debug("Notifying other nodes of to disconnect");
- Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)};
+ final Command[] cmds = new Command[] {new ChangeAgentCommand(attache.getId(), Event.AgentDisconnected)};
_clusterMgr.broadcast(attache.getId(), _gson.toJson(cmds));
}
@@ -387,26 +386,26 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isDebugEnabled()) {
s_logger.debug("Notifying other MS nodes to run host scan task");
}
- Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()};
+ final Command[] cmds = new Command[] {new ScheduleHostScanTaskCommand()};
_clusterMgr.broadcast(0, _gson.toJson(cmds));
}
- protected static void logT(byte[] bytes, final String msg) {
+ protected static void logT(final byte[] bytes, final String msg) {
s_logger.trace("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
- protected static void logD(byte[] bytes, final String msg) {
+ protected static void logD(final byte[] bytes, final String msg) {
s_logger.debug("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
- protected static void logI(byte[] bytes, final String msg) {
+ protected static void logI(final byte[] bytes, final String msg) {
s_logger.info("Seq " + Request.getAgentId(bytes) + "-" + Request.getSequence(bytes) + ": MgmtId " + Request.getManagementServerId(bytes) + ": " +
(Request.isRequest(bytes) ? "Req: " : "Resp: ") + msg);
}
- public boolean routeToPeer(String peer, byte[] bytes) {
+ public boolean routeToPeer(final String peer, final byte[] bytes) {
int i = 0;
SocketChannel ch = null;
SSLEngine sslEngine = null;
@@ -432,7 +431,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
Link.write(ch, new ByteBuffer[] {ByteBuffer.wrap(bytes)}, sslEngine);
return true;
- } catch (IOException e) {
+ } catch (final IOException e) {
try {
logI(bytes, "Unable to route to peer: " + Request.parse(bytes).toString() + " due to " + e.getMessage());
} catch (ClassNotFoundException | UnsupportedVersionException ex) {
@@ -445,28 +444,28 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return false;
}
- public String findPeer(long hostId) {
+ public String findPeer(final long hostId) {
return getPeerName(hostId);
}
- public SSLEngine getSSLEngine(String peerName) {
+ public SSLEngine getSSLEngine(final String peerName) {
return _sslEngines.get(peerName);
}
- public void cancel(String peerName, long hostId, long sequence, String reason) {
- CancelCommand cancel = new CancelCommand(sequence, reason);
- Request req = new Request(hostId, _nodeId, cancel, true);
+ public void cancel(final String peerName, final long hostId, final long sequence, final String reason) {
+ final CancelCommand cancel = new CancelCommand(sequence, reason);
+ final Request req = new Request(hostId, _nodeId, cancel, true);
req.setControl(true);
routeToPeer(peerName, req.getBytes());
}
- public void closePeer(String peerName) {
+ public void closePeer(final String peerName) {
synchronized (_peers) {
- SocketChannel ch = _peers.get(peerName);
+ final SocketChannel ch = _peers.get(peerName);
if (ch != null) {
try {
ch.close();
- } catch (IOException e) {
+ } catch (final IOException e) {
s_logger.warn("Unable to close peer socket connection to " + peerName);
}
}
@@ -475,29 +474,29 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
- public SocketChannel connectToPeer(String peerName, SocketChannel prevCh) {
+ public SocketChannel connectToPeer(final String peerName, final SocketChannel prevCh) {
synchronized (_peers) {
- SocketChannel ch = _peers.get(peerName);
+ final SocketChannel ch = _peers.get(peerName);
SSLEngine sslEngine = null;
if (prevCh != null) {
try {
prevCh.close();
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.info("[ignored]"
+ "failed to get close resource for previous channel Socket: " + e.getLocalizedMessage());
}
}
if (ch == null || ch == prevCh) {
- ManagementServerHost ms = _clusterMgr.getPeer(peerName);
+ final ManagementServerHost ms = _clusterMgr.getPeer(peerName);
if (ms == null) {
s_logger.info("Unable to find peer: " + peerName);
return null;
}
- String ip = ms.getServiceIP();
+ final String ip = ms.getServiceIP();
InetAddress addr;
try {
addr = InetAddress.getByName(ip);
- } catch (UnknownHostException e) {
+ } catch (final UnknownHostException e) {
throw new CloudRuntimeException("Unable to resolve " + ip);
}
SocketChannel ch1 = null;
@@ -507,14 +506,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
ch1.socket().setKeepAlive(true);
ch1.socket().setSoTimeout(60 * 1000);
try {
- SSLContext sslContext = Link.initSSLContext(true);
+ final SSLContext sslContext = Link.initSSLContext(true);
sslEngine = sslContext.createSSLEngine(ip, Port.value());
sslEngine.setUseClientMode(true);
sslEngine.setEnabledProtocols(SSLUtils.getSupportedProtocols(sslEngine.getEnabledProtocols()));
Link.doHandshake(ch1, sslEngine, true);
s_logger.info("SSL: Handshake done");
- } catch (Exception e) {
+ } catch (final Exception e) {
ch1.close();
throw new IOException("SSL: Fail to init SSL! " + e);
}
@@ -524,10 +523,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
_peers.put(peerName, ch1);
_sslEngines.put(peerName, sslEngine);
return ch1;
- } catch (IOException e) {
+ } catch (final IOException e) {
try {
ch1.close();
- } catch (IOException ex) {
+ } catch (final IOException ex) {
s_logger.error("failed to close failed peer socket: " + ex);
}
s_logger.warn("Unable to connect to peer management server: " + peerName + ", ip: " + ip + " due to " + e.getMessage(), e);
@@ -542,8 +541,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
- public SocketChannel connectToPeer(long hostId, SocketChannel prevCh) {
- String peerName = getPeerName(hostId);
+ public SocketChannel connectToPeer(final long hostId, final SocketChannel prevCh) {
+ final String peerName = getPeerName(hostId);
if (peerName == null) {
return null;
}
@@ -553,8 +552,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
protected AgentAttache getAttache(final Long hostId) throws AgentUnavailableException {
- assert (hostId != null) : "Who didn't check their id value?";
- HostVO host = _hostDao.findById(hostId);
+ assert hostId != null : "Who didn't check their id value?";
+ final HostVO host = _hostDao.findById(hostId);
if (host == null) {
throw new AgentUnavailableException("Can't find the host ", hostId);
}
@@ -569,7 +568,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
if (agent == null) {
- AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId);
+ final AgentUnavailableException ex = new AgentUnavailableException("Host with specified id is not in the right state: " + host.getStatus(), hostId);
ex.addProxyObject(_entityMgr.findById(Host.class, hostId).getUuid());
throw ex;
}
@@ -580,11 +579,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
public boolean stop() {
if (_peers != null) {
- for (SocketChannel ch : _peers.values()) {
+ for (final SocketChannel ch : _peers.values()) {
try {
s_logger.info("Closing: " + ch.toString());
ch.close();
- } catch (IOException e) {
+ } catch (final IOException e) {
s_logger.info("[ignored] error on closing channel: " +ch.toString(), e);
}
}
@@ -606,13 +605,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public class ClusteredAgentHandler extends AgentHandler {
- public ClusteredAgentHandler(Task.Type type, Link link, byte[] data) {
+ public ClusteredAgentHandler(final Task.Type type, final Link link, final byte[] data) {
super(type, link, data);
}
@Override
- protected void doTask(final Task task) throws Exception {
- TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
+ protected void doTask(final Task task) throws TaskExecutionException {
+ final TransactionLegacy txn = TransactionLegacy.open(TransactionLegacy.CLOUD_DB);
try {
if (task.getType() != Task.Type.DATA) {
super.doTask(task);
@@ -620,37 +619,37 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
final byte[] data = task.getData();
- Version ver = Request.getVersion(data);
+ final Version ver = Request.getVersion(data);
if (ver.ordinal() != Version.v1.ordinal() && ver.ordinal() != Version.v3.ordinal()) {
s_logger.warn("Wrong version for clustered agent request");
super.doTask(task);
return;
}
- long hostId = Request.getAgentId(data);
- Link link = task.getLink();
+ final long hostId = Request.getAgentId(data);
+ final Link link = task.getLink();
if (Request.fromServer(data)) {
- AgentAttache agent = findAttache(hostId);
+ final AgentAttache agent = findAttache(hostId);
if (Request.isControl(data)) {
if (agent == null) {
logD(data, "No attache to process cancellation");
return;
}
- Request req = Request.parse(data);
- Command[] cmds = req.getCommands();
- CancelCommand cancel = (CancelCommand)cmds[0];
+ final Request req = Request.parse(data);
+ final Command[] cmds = req.getCommands();
+ final CancelCommand cancel = (CancelCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
logD(data, "Cancel request received");
}
agent.cancel(cancel.getSequence());
final Long current = agent._currentSequence;
// if the request is the current request, always have to trigger sending next request in
-// sequence,
+ // sequence,
// otherwise the agent queue will be blocked
- if (req.executeInSequence() && (current != null && current == Request.getSequence(data))) {
+ if (req.executeInSequence() && current != null && current == Request.getSequence(data)) {
agent.sendNext(Request.getSequence(data));
}
return;
@@ -665,29 +664,29 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
// route it to the agent.
// But we have the serialize the control commands here so we have
// to deserialize this and send it through the agent attache.
- Request req = Request.parse(data);
+ final Request req = Request.parse(data);
agent.send(req, null);
return;
} else {
if (agent instanceof Routable) {
- Routable cluster = (Routable)agent;
+ final Routable cluster = (Routable)agent;
cluster.routeToAgent(data);
} else {
agent.send(Request.parse(data));
}
return;
}
- } catch (AgentUnavailableException e) {
+ } catch (final AgentUnavailableException e) {
logD(data, e.getMessage());
cancel(Long.toString(Request.getManagementServerId(data)), hostId, Request.getSequence(data), e.getMessage());
}
} else {
- long mgmtId = Request.getManagementServerId(data);
+ final long mgmtId = Request.getManagementServerId(data);
if (mgmtId != -1 && mgmtId != _nodeId) {
routeToPeer(Long.toString(mgmtId), data);
if (Request.requiresSequentialExecution(data)) {
- AgentAttache attache = (AgentAttache)link.attachment();
+ final AgentAttache attache = (AgentAttache)link.attachment();
if (attache != null) {
attache.sendNext(Request.getSequence(data));
} else if (s_logger.isDebugEnabled()) {
@@ -701,7 +700,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
} else {
// received an answer.
final Response response = Response.parse(data);
- AgentAttache attache = findAttache(response.getAgentId());
+ final AgentAttache attache = findAttache(response.getAgentId());
if (attache == null) {
s_logger.info("SeqA " + response.getAgentId() + "-" + response.getSequence() + "Unable to find attache to forward " + response.toString());
return;
@@ -713,6 +712,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return;
}
}
+ } catch (final ClassNotFoundException e) {
+ final String message = String.format("ClassNotFoundException occured when executing taks! Error '%s'", e.getMessage());
+ s_logger.error(message);
+ throw new TaskExecutionException(message, e);
+ } catch (final UnsupportedVersionException e) {
+ final String message = String.format("UnsupportedVersionException occured when executing taks! Error '%s'", e.getMessage());
+ s_logger.error(message);
+ throw new TaskExecutionException(message, e);
} finally {
txn.close();
}
@@ -720,14 +727,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
- public void onManagementNodeJoined(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
+ public void onManagementNodeJoined(final List<? extends ManagementServerHost> nodeList, final long selfNodeId) {
}
@Override
- public void onManagementNodeLeft(List<? extends ManagementServerHost> nodeList, long selfNodeId) {
- for (ManagementServerHost vo : nodeList) {
+ public void onManagementNodeLeft(final List<? extends ManagementServerHost> nodeList, final long selfNodeId) {
+ for (final ManagementServerHost vo : nodeList) {
s_logger.info("Marking hosts as disconnected on Management server" + vo.getMsid());
- long lastPing = (System.currentTimeMillis() >> 10) - getTimeout();
+ final long lastPing = (System.currentTimeMillis() >> 10) - getTimeout();
_hostDao.markHostsAsDisconnected(vo.getMsid(), lastPing);
s_logger.info("Deleting entries from op_host_transfer table for Management server " + vo.getMsid());
cleanupTransferMap(vo.getMsid());
@@ -739,7 +746,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
- public void removeAgent(AgentAttache attache, Status nextState) {
+ public void removeAgent(final AgentAttache attache, final Status nextState) {
if (attache == null) {
return;
}
@@ -748,7 +755,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
- public boolean executeRebalanceRequest(long agentId, long currentOwnerId, long futureOwnerId, Event event) throws AgentUnavailableException,
+ public boolean executeRebalanceRequest(final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) throws AgentUnavailableException,
OperationTimedoutException {
boolean result = false;
if (event == Event.RequestAgentRebalance) {
@@ -756,7 +763,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
} else if (event == Event.StartAgentRebalance) {
try {
result = rebalanceHost(agentId, currentOwnerId, futureOwnerId);
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.warn("Unable to rebalance host id=" + agentId, e);
}
}
@@ -795,7 +802,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
cancelled = true;
}
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.error("Unexpected exception " + e.toString(), e);
}
}
@@ -803,11 +810,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
public void startRebalanceAgents() {
s_logger.debug("Management server " + _nodeId + " is asking other peers to rebalance their agents");
- List<ManagementServerHostVO> allMS = _mshostDao.listBy(ManagementServerHost.State.Up);
- QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
+ final List<ManagementServerHostVO> allMS = _mshostDao.listBy(ManagementServerHost.State.Up);
+ final QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getManagementServerId(), Op.NNULL);
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
- List<HostVO> allManagedAgents = sc.list();
+ final List<HostVO> allManagedAgents = sc.list();
int avLoad = 0;
@@ -828,11 +835,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
avLoad = 1;
}
- for (ManagementServerHostVO node : allMS) {
+ for (final ManagementServerHostVO node : allMS) {
if (node.getMsid() != _nodeId) {
List<HostVO> hostsToRebalance = new ArrayList<HostVO>();
- for (AgentLoadBalancerPlanner lbPlanner : _lbPlanners) {
+ for (final AgentLoadBalancerPlanner lbPlanner : _lbPlanners) {
hostsToRebalance = lbPlanner.getHostsToRebalance(node.getMsid(), avLoad);
if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) {
break;
@@ -843,8 +850,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (hostsToRebalance != null && !hostsToRebalance.isEmpty()) {
s_logger.debug("Found " + hostsToRebalance.size() + " hosts to rebalance from management server " + node.getMsid());
- for (HostVO host : hostsToRebalance) {
- long hostId = host.getId();
+ for (final HostVO host : hostsToRebalance) {
+ final long hostId = host.getId();
s_logger.debug("Asking management server " + node.getMsid() + " to give away host id=" + hostId);
boolean result = true;
@@ -856,23 +863,23 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
HostTransferMapVO transfer = null;
try {
transfer = _hostTransferDao.startAgentTransfering(hostId, node.getMsid(), _nodeId);
- Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance);
+ final Answer[] answer = sendRebalanceCommand(node.getMsid(), hostId, node.getMsid(), _nodeId, Event.RequestAgentRebalance);
if (answer == null) {
s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid());
result = false;
}
- } catch (Exception ex) {
+ } catch (final Exception ex) {
s_logger.warn("Failed to get host id=" + hostId + " from management server " + node.getMsid(), ex);
result = false;
} finally {
if (transfer != null) {
- HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId);
+ final HostTransferMapVO transferState = _hostTransferDao.findByIdAndFutureOwnerId(transfer.getId(), _nodeId);
if (!result && transferState != null && transferState.getState() == HostTransferState.TransferRequested) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Removing mapping from op_host_transfer as it failed to be set to transfer mode");
}
// just remove the mapping (if exists) as nothing was done on the peer management
-// server yet
+ // server yet
_hostTransferDao.remove(transfer.getId());
}
}
@@ -885,31 +892,31 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
- private Answer[] sendRebalanceCommand(long peer, long agentId, long currentOwnerId, long futureOwnerId, Event event) {
- TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event);
- Commands commands = new Commands(Command.OnError.Stop);
+ private Answer[] sendRebalanceCommand(final long peer, final long agentId, final long currentOwnerId, final long futureOwnerId, final Event event) {
+ final TransferAgentCommand transfer = new TransferAgentCommand(agentId, currentOwnerId, futureOwnerId, event);
+ final Commands commands = new Commands(Command.OnError.Stop);
commands.addCommand(transfer);
- Command[] cmds = commands.toCommands();
+ final Command[] cmds = commands.toCommands();
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Forwarding " + cmds[0].toString() + " to " + peer);
}
- String peerName = Long.toString(peer);
- String cmdStr = _gson.toJson(cmds);
- String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true);
- Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
+ final String peerName = Long.toString(peer);
+ final String cmdStr = _gson.toJson(cmds);
+ final String ansStr = _clusterMgr.execute(peerName, agentId, cmdStr, true);
+ final Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
return answers;
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.warn("Caught exception while talking to " + currentOwnerId, e);
return null;
}
}
- public String getPeerName(long agentHostId) {
+ public String getPeerName(final long agentHostId) {
- HostVO host = _hostDao.findById(agentHostId);
+ final HostVO host = _hostDao.findById(agentHostId);
if (host != null && host.getManagementServerId() != null) {
if (_clusterMgr.getSelfPeerName().equals(Long.toString(host.getManagementServerId()))) {
return null;
@@ -920,7 +927,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return null;
}
- public Boolean propagateAgentEvent(long agentId, Event event) throws AgentUnavailableException {
+ public Boolean propagateAgentEvent(final long agentId, final Event event) throws AgentUnavailableException {
final String msPeer = getPeerName(agentId);
if (msPeer == null) {
return null;
@@ -929,15 +936,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (s_logger.isDebugEnabled()) {
s_logger.debug("Propagating agent change request event:" + event.toString() + " to agent:" + agentId);
}
- Command[] cmds = new Command[1];
+ final Command[] cmds = new Command[1];
cmds[0] = new ChangeAgentCommand(agentId, event);
- String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true);
+ final String ansStr = _clusterMgr.execute(msPeer, agentId, _gson.toJson(cmds), true);
if (ansStr == null) {
throw new AgentUnavailableException(agentId);
}
- Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
+ final Answer[] answers = _gson.fromJson(ansStr, Answer[].class);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Result for agent change is " + answers[0].getResult());
@@ -958,9 +965,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
if (_agentToTransferIds.size() > 0) {
s_logger.debug("Found " + _agentToTransferIds.size() + " agents to transfer");
// for (Long hostId : _agentToTransferIds) {
- for (Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
- Long hostId = iterator.next();
- AgentAttache attache = findAttache(hostId);
+ for (final Iterator<Long> iterator = _agentToTransferIds.iterator(); iterator.hasNext();) {
+ final Long hostId = iterator.next();
+ final AgentAttache attache = findAttache(hostId);
// if the thread:
// 1) timed out waiting for the host to reconnect
@@ -968,8 +975,8 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
// 3) if the management server doesn't own the host any more
// remove the host from re-balance list and delete from op_host_transfer DB
// no need to do anything with the real attache as we haven't modified it yet
- Date cutTime = DateUtil.currentGMTTime();
- HostTransferMapVO transferMap =
+ final Date cutTime = DateUtil.currentGMTTime();
+ final HostTransferMapVO transferMap =
_hostTransferDao.findActiveHostTransferMapByHostId(hostId, new Date(cutTime.getTime() - rebalanceTimeOut));
if (transferMap == null) {
@@ -986,7 +993,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
continue;
}
- ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
+ final ManagementServerHostVO ms = _mshostDao.findByMsid(transferMap.getFutureOwner());
if (ms != null && ms.getState() != ManagementServerHost.State.Up) {
s_logger.debug("Can't transfer host " + hostId + " as it's future owner is not in UP state: " + ms +
", skipping rebalance for the host");
@@ -999,7 +1006,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
iterator.remove();
try {
_executor.execute(new RebalanceTask(hostId, transferMap.getInitialOwner(), transferMap.getFutureOwner()));
- } catch (RejectedExecutionException ex) {
+ } catch (final RejectedExecutionException ex) {
s_logger.warn("Failed to submit rebalance task for host id=" + hostId + "; postponing the execution");
continue;
}
@@ -1016,21 +1023,21 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.error("Problem with the clustered agent transfer scan check!", e);
}
}
};
}
- private boolean setToWaitForRebalance(final long hostId, long currentOwnerId, long futureOwnerId) {
+ private boolean setToWaitForRebalance(final long hostId, final long currentOwnerId, final long futureOwnerId) {
s_logger.debug("Adding agent " + hostId + " to the list of agents to transfer");
synchronized (_agentToTransferIds) {
return _agentToTransferIds.add(hostId);
}
}
- protected boolean rebalanceHost(final long hostId, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException {
+ protected boolean rebalanceHost(final long hostId, final long currentOwnerId, final long futureOwnerId) throws AgentUnavailableException {
boolean result = true;
if (currentOwnerId == _nodeId) {
@@ -1040,12 +1047,12 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return false;
}
try {
- Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance);
+ final Answer[] answer = sendRebalanceCommand(futureOwnerId, hostId, currentOwnerId, futureOwnerId, Event.StartAgentRebalance);
if (answer == null || !answer[0].getResult()) {
result = false;
}
- } catch (Exception ex) {
+ } catch (final Exception ex) {
s_logger.warn("Host " + hostId + " failed to connect to the management server " + futureOwnerId + " as a part of rebalance process", ex);
result = false;
}
@@ -1059,13 +1066,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
} else if (futureOwnerId == _nodeId) {
- HostVO host = _hostDao.findById(hostId);
+ final HostVO host = _hostDao.findById(hostId);
try {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Disconnecting host " + host.getId() + "(" + host.getName() + " as a part of rebalance process without notification");
}
- AgentAttache attache = findAttache(hostId);
+ final AgentAttache attache = findAttache(hostId);
if (attache != null) {
result = handleDisconnect(attache, Event.AgentDisconnected, false, false, true);
}
@@ -1080,7 +1087,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.warn("Failed to disconnect " + host.getId() + "(" + host.getName() + " as a part of rebalance process without notification");
}
- } catch (Exception ex) {
+ } catch (final Exception ex) {
s_logger.warn("Failed to load directly connected host " + host.getId() + "(" + host.getName() + ") to the management server " + _nodeId +
" as a part of rebalance process due to:", ex);
result = false;
@@ -1098,21 +1105,21 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return result;
}
- protected void finishRebalance(final long hostId, long futureOwnerId, Event event) {
+ protected void finishRebalance(final long hostId, final long futureOwnerId, final Event event) {
- boolean success = (event == Event.RebalanceCompleted) ? true : false;
+ final boolean success = event == Event.RebalanceCompleted ? true : false;
if (s_logger.isDebugEnabled()) {
s_logger.debug("Finishing rebalancing for the agent " + hostId + " with event " + event);
}
- AgentAttache attache = findAttache(hostId);
+ final AgentAttache attache = findAttache(hostId);
if (attache == null || !(attache instanceof ClusteredAgentAttache)) {
s_logger.debug("Unable to find forward attache for the host id=" + hostId + ", assuming that the agent disconnected already");
_hostTransferDao.completeAgentTransfer(hostId);
return;
}
- ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
+ final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)attache;
if (success) {
@@ -1124,7 +1131,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
while (requestToTransfer != null) {
s_logger.debug("Forwarding request " + requestToTransfer.getSequence() + " held in transfer attache " + hostId + " from the management server " +
_nodeId + " to " + futureOwnerId);
- boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes());
+ final boolean routeResult = routeToPeer(Long.toString(futureOwnerId), requestToTransfer.getBytes());
if (!routeResult) {
logD(requestToTransfer.getBytes(), "Failed to route request to peer");
}
@@ -1147,13 +1154,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.debug("Management server " + _nodeId + " failed to rebalance agent " + hostId);
_hostTransferDao.completeAgentTransfer(hostId);
handleDisconnectWithoutInvestigation(findAttache(hostId), Event.RebalanceFailed, true, true);
- } catch (Exception ex) {
+ } catch (final Exception ex) {
s_logger.warn("Failed to reconnect host id=" + hostId + " as a part of failed rebalance task cleanup");
}
}
protected boolean startRebalance(final long hostId) {
- HostVO host = _hostDao.findById(hostId);
+ final HostVO host = _hostDao.findById(hostId);
if (host == null || host.getRemoved() != null) {
s_logger.warn("Unable to find host record, fail start rebalancing process");
@@ -1161,10 +1168,10 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
synchronized (_agents) {
- ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
+ final ClusteredDirectAgentAttache attache = (ClusteredDirectAgentAttache)_agents.get(hostId);
if (attache != null && attache.getQueueSize() == 0 && attache.getNonRecurringListenersSize() == 0) {
handleDisconnectWithoutInvestigation(attache, Event.StartAgentRebalance, true, true);
- ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId);
+ final ClusteredAgentAttache forwardAttache = (ClusteredAgentAttache)createAttache(hostId);
if (forwardAttache == null) {
s_logger.warn("Unable to create a forward attache for the host " + hostId + " as a part of rebalance process");
return false;
@@ -1186,15 +1193,15 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return true;
}
- protected void cleanupTransferMap(long msId) {
- List<HostTransferMapVO> hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId);
+ protected void cleanupTransferMap(final long msId) {
+ final List<HostTransferMapVO> hostsJoingingCluster = _hostTransferDao.listHostsJoiningCluster(msId);
- for (HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) {
+ for (final HostTransferMapVO hostJoingingCluster : hostsJoingingCluster) {
_hostTransferDao.remove(hostJoingingCluster.getId());
}
- List<HostTransferMapVO> hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId);
- for (HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) {
+ final List<HostTransferMapVO> hostsLeavingCluster = _hostTransferDao.listHostsLeavingCluster(msId);
+ for (final HostTransferMapVO hostLeavingCluster : hostsLeavingCluster) {
_hostTransferDao.remove(hostLeavingCluster.getId());
}
}
@@ -1204,7 +1211,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
Long currentOwnerId = null;
Long futureOwnerId = null;
- public RebalanceTask(long hostId, long currentOwnerId, long futureOwnerId) {
+ public RebalanceTask(final long hostId, final long currentOwnerId, final long futureOwnerId) {
this.hostId = hostId;
this.currentOwnerId = currentOwnerId;
this.futureOwnerId = futureOwnerId;
@@ -1217,20 +1224,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.debug("Rebalancing host id=" + hostId);
}
rebalanceHost(hostId, currentOwnerId, futureOwnerId);
- } catch (Exception e) {
+ } catch (final Exception e) {
s_logger.warn("Unable to rebalance host id=" + hostId, e);
}
}
}
- private String handleScheduleHostScanTaskCommand(ScheduleHostScanTaskCommand cmd) {
+ private String handleScheduleHostScanTaskCommand(final ScheduleHostScanTaskCommand cmd) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting resource manager command: " + _gson.toJson(cmd));
}
try {
scheduleHostScanTask();
- } catch (Exception e) {
+ } catch (final Exception e) {
// Scheduling host scan task in peer MS is a best effort operation during host add, regular host scan
// happens at fixed intervals anyways. So handling any exceptions that may be thrown
s_logger.warn("Exception happened while trying to schedule host scan task on mgmt server " + _clusterMgr.getSelfPeerName() +
@@ -1238,14 +1245,14 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
return null;
}
- Answer[] answers = new Answer[1];
+ final Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, true, null);
return _gson.toJson(answers);
}
- public Answer[] sendToAgent(Long hostId, Command[] cmds, boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
- Commands commands = new Commands(stopOnError ? Command.OnError.Stop : Command.OnError.Continue);
- for (Command cmd : cmds) {
+ public Answer[] sendToAgent(final Long hostId, final Command[] cmds, final boolean stopOnError) throws AgentUnavailableException, OperationTimedoutException {
+ final Commands commands = new Commands(stopOnError ? Command.OnError.Stop : Command.OnError.Continue);
+ for (final Command cmd : cmds) {
commands.addCommand(cmd);
}
return send(hostId, commands);
@@ -1258,7 +1265,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
@Override
- public String dispatch(ClusterServicePdu pdu) {
+ public String dispatch(final ClusterServicePdu pdu) {
if (s_logger.isDebugEnabled()) {
s_logger.debug("Dispatch ->" + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
@@ -1267,13 +1274,13 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
Command[] cmds = null;
try {
cmds = _gson.fromJson(pdu.getJsonPackage(), Command[].class);
- } catch (Throwable e) {
- assert (false);
+ } catch (final Throwable e) {
+ assert false;
s_logger.error("Excection in gson decoding : ", e);
}
if (cmds.length == 1 && cmds[0] instanceof ChangeAgentCommand) { // intercepted
- ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
+ final ChangeAgentCommand cmd = (ChangeAgentCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent change: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
@@ -1285,16 +1292,16 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.debug("Result is " + result);
}
- } catch (AgentUnavailableException e) {
+ } catch (final AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
}
- Answer[] answers = new Answer[1];
+ final Answer[] answers = new Answer[1];
answers[0] = new ChangeAgentAnswer(cmd, result);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof TransferAgentCommand) {
- TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
+ final TransferAgentCommand cmd = (TransferAgentCommand)cmds[0];
if (s_logger.isDebugEnabled()) {
s_logger.debug("Intercepting command for agent rebalancing: agent " + cmd.getAgentId() + " event: " + cmd.getEvent());
@@ -1306,18 +1313,18 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.debug("Result is " + result);
}
- } catch (AgentUnavailableException e) {
+ } catch (final AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
return null;
- } catch (OperationTimedoutException e) {
+ } catch (final OperationTimedoutException e) {
s_logger.warn("Operation timed out", e);
return null;
}
- Answer[] answers = new Answer[1];
+ final Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof PropagateResourceEventCommand) {
- PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
+ final PropagateResourceEventCommand cmd = (PropagateResourceEventCommand)cmds[0];
s_logger.debug("Intercepting command to propagate event " + cmd.getEvent().name() + " for host " + cmd.getHostId());
@@ -1325,29 +1332,29 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
try {
result = _resourceMgr.executeUserRequest(cmd.getHostId(), cmd.getEvent());
s_logger.debug("Result is " + result);
- } catch (AgentUnavailableException ex) {
+ } catch (final AgentUnavailableException ex) {
s_logger.warn("Agent is unavailable", ex);
return null;
}
- Answer[] answers = new Answer[1];
+ final Answer[] answers = new Answer[1];
answers[0] = new Answer(cmd, result, null);
return _gson.toJson(answers);
} else if (cmds.length == 1 && cmds[0] instanceof ScheduleHostScanTaskCommand) {
- ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
- String response = handleScheduleHostScanTaskCommand(cmd);
+ final ScheduleHostScanTaskCommand cmd = (ScheduleHostScanTaskCommand)cmds[0];
+ final String response = handleScheduleHostScanTaskCommand(cmd);
return response;
}
try {
- long startTick = System.currentTimeMillis();
+ final long startTick = System.currentTimeMillis();
if (s_logger.isDebugEnabled()) {
s_logger.debug("Dispatch -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage());
}
- Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
+ final Answer[] answers = sendToAgent(pdu.getAgentId(), cmds, pdu.isStopOnError());
if (answers != null) {
- String jsonReturn = _gson.toJson(answers);
+ final String jsonReturn = _gson.toJson(answers);
if (s_logger.isDebugEnabled()) {
s_logger.debug("Completed dispatching -> " + pdu.getAgentId() + ", json: " + pdu.getJsonPackage() + " in " +
@@ -1361,9 +1368,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
(System.currentTimeMillis() - startTick) + " ms, return null result");
}
}
- } catch (AgentUnavailableException e) {
+ } catch (final AgentUnavailableException e) {
s_logger.warn("Agent is unavailable", e);
- } catch (OperationTimedoutException e) {
+ } catch (final OperationTimedoutException e) {
s_logger.warn("Timed Out", e);
}
@@ -1372,11 +1379,11 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
- public boolean executeAgentUserRequest(long agentId, Event event) throws AgentUnavailableException {
+ public boolean executeAgentUserRequest(final long agentId, final Event event) throws AgentUnavailableException {
return executeUserRequest(agentId, event);
}
- public boolean rebalanceAgent(long agentId, Event event, long currentOwnerId, long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
+ public boolean rebalanceAgent(final long agentId, final Event event, final long currentOwnerId, final long futureOwnerId) throws AgentUnavailableException, OperationTimedoutException {
return executeRebalanceRequest(agentId, currentOwnerId, futureOwnerId, event);
}
@@ -1393,20 +1400,20 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
s_logger.trace("Agent rebalance task check, management server id:" + _nodeId);
}
// initiate agent lb task will be scheduled and executed only once, and only when number of agents
-// loaded exceeds _connectedAgentsThreshold
+ // loaded exceeds _connectedAgentsThreshold
if (!_agentLbHappened) {
QueryBuilder<HostVO> sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getManagementServerId(), Op.NNULL);
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
- List<HostVO> allManagedRoutingAgents = sc.list();
+ final List<HostVO> allManagedRoutingAgents = sc.list();
sc = QueryBuilder.create(HostVO.class);
sc.and(sc.entity().getType(), Op.EQ, Host.Type.Routing);
- List<HostVO> allAgents = sc.list();
- double allHostsCount = allAgents.size();
- double managedHostsCount = allManagedRoutingAgents.size();
+ final List<HostVO> allAgents = sc.list();
+ final double allHostsCount = allAgents.size();
+ final double managedHostsCount = allManagedRoutingAgents.size();
if (allHostsCount > 0.0) {
- double load = managedHostsCount / allHostsCount;
+ final double load = managedHostsCount / allHostsCount;
if (load >= ConnectedAgentThreshold.value()) {
s_logger.debug("Scheduling agent rebalancing task as the average agent load " + load + " is more than the threshold " +
ConnectedAgentThreshold.value());
@@ -1418,7 +1425,7 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
}
}
}
- } catch (Throwable e) {
+ } catch (final Throwable e) {
s_logger.error("Problem with the clustered agent transfer scan check!", e);
}
}
@@ -1440,9 +1447,9 @@ public class ClusteredAgentManagerImpl extends AgentManagerImpl implements Clust
@Override
public ConfigKey<?>[] getConfigKeys() {
- ConfigKey<?>[] keys = super.getConfigKeys();
+ final ConfigKey<?>[] keys = super.getConfigKeys();
- List<ConfigKey<?>> keysLst = new ArrayList<ConfigKey<?>>();
+ final List<ConfigKey<?>> keysLst = new ArrayList<ConfigKey<?>>();
keysLst.addAll(Arrays.asList(keys));
keysLst.add(EnableLB);
keysLst.add(ConnectedAgentThreshold);