You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2014/08/20 17:30:46 UTC

[45/50] [abbrv] git commit: AMBARI-6880 - Alerts: Send Definitions Down Via Commands to the Agent (jonathanhurley)

AMBARI-6880 - Alerts: Send Definitions Down Via Commands to the Agent (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/8e481286
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/8e481286
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/8e481286

Branch: refs/heads/branch-alerts-dev
Commit: 8e48128648f0a74942b9e3bcc88261bd2728427e
Parents: 9b6c02f
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Fri Aug 15 14:27:27 2014 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Wed Aug 20 10:48:02 2014 -0400

----------------------------------------------------------------------
 .../apache/ambari/server/agent/ActionQueue.java |  51 +++++-
 .../ambari/server/agent/AgentCommand.java       |   3 +-
 .../server/agent/AlertDefinitionCommand.java    | 109 ++++++++++++
 .../ambari/server/agent/HeartBeatHandler.java   |  55 +++++-
 .../ambari/server/agent/HeartBeatResponse.java  |  56 +++---
 .../ambari/server/agent/HeartbeatMonitor.java   |  39 +++--
 .../server/agent/RegistrationResponse.java      |  50 +++++-
 .../server/api/services/AmbariMetaInfo.java     | 174 +++++++++----------
 .../ambari/server/controller/AmbariServer.java  |   6 +-
 .../AlertDefinitionResourceProvider.java        | 101 ++++++++++-
 .../server/state/alert/AlertDefinition.java     |  61 ++++---
 .../state/alert/AlertDefinitionFactory.java     | 151 ++++++++++++++++
 .../server/state/alert/AlertDefinitionHash.java | 165 +++++++++++++-----
 .../ambari/server/agent/TestActionQueue.java    |  88 ++++++++--
 .../server/api/services/AmbariMetaInfoTest.java |   4 +-
 .../AlertDefinitionResourceProviderTest.java    |  55 ++++--
 .../state/alerts/AlertDefinitionHashTest.java   |  86 +++++++--
 17 files changed, 980 insertions(+), 274 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
index 225c7df..2479f37 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/ActionQueue.java
@@ -18,17 +18,15 @@
 package org.apache.ambari.server.agent;
 
 import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +83,44 @@ public class ActionQueue {
   }
 
   /**
+   * Dequeue's all commands of a specified type for the given host.
+   *
+   * @param hostname
+   *          the host to remove commands for (not {@code null}).
+   * @param commandType
+   *          the type of command to remove (not {@code null}).
+   * @return the commands removed, or an empty list if none (never {@code null}
+   *         ).
+   */
+  public List<AgentCommand> dequeue(String hostname,
+      AgentCommandType commandType) {
+    if (null == hostname || null == commandType) {
+      return Collections.emptyList();
+    }
+
+    Queue<AgentCommand> queue = getQueue(hostname);
+    if (null == queue) {
+      return null;
+    }
+
+    List<AgentCommand> removedCommands = new ArrayList<AgentCommand>(
+        queue.size());
+
+    Iterator<AgentCommand> iterator = queue.iterator();
+    while (iterator.hasNext()) {
+      AgentCommand command = iterator.next();
+      if (command.getCommandType() == commandType) {
+        removedCommands.add(command);
+        iterator.remove();
+      }
+    }
+
+    return removedCommands;
+  }
+
+  /**
    * Try to dequeue command with provided id.
+   *
    * @param hostname
    * @param commandId
    * @return
@@ -99,8 +134,8 @@ public class ActionQueue {
       return null;
     } else {
       AgentCommand c = null;
-      for (Iterator it = q.iterator(); it.hasNext(); ) {
-        AgentCommand ac = (AgentCommand) it.next();
+      for (Iterator<AgentCommand> it = q.iterator(); it.hasNext();) {
+        AgentCommand ac = it.next();
         if (ac instanceof ExecutionCommand && ((ExecutionCommand) ac)
           .getCommandId().equals(commandId)) {
           c = ac;
@@ -111,7 +146,7 @@ public class ActionQueue {
       return c;
     }
   }
-  
+
   public int size(String hostname) {
     Queue<AgentCommand> q = getQueue(hostname);
     if (q == null) {
@@ -125,6 +160,7 @@ public class ActionQueue {
     if (q == null) {
       return null;
     }
+
     List<AgentCommand> l = new ArrayList<AgentCommand>();
 
     AgentCommand command;
@@ -137,6 +173,5 @@ public class ActionQueue {
     } while (command != null);
 
     return l;
-
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
index 54faf6a..29805a1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentCommand.java
@@ -34,7 +34,8 @@ public abstract class AgentCommand {
     BACKGROUND_EXECUTION_COMMAND,
     STATUS_COMMAND,
     CANCEL_COMMAND,
-    REGISTRATION_COMMAND
+    REGISTRATION_COMMAND,
+    ALERT_DEFINITION_COMMAND
   }
 
   public AgentCommandType getCommandType() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
new file mode 100644
index 0000000..3c9615f
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AlertDefinitionCommand.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import java.util.List;
+
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.AlertDefinitionHash;
+
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * The {@link AlertDefinitionCommand} class is used to encapsulate the
+ * {@link AlertDefinition}s that will be returned to an agent given a requested
+ * hash.
+ */
+public class AlertDefinitionCommand extends AgentCommand {
+  @SerializedName("clusterName")
+  private final String m_clusterName;
+
+  @SerializedName("hostName")
+  private final String m_hostName;
+
+  @SerializedName("hash")
+  private final String m_hash;
+
+  @SerializedName("alertDefinitions")
+  private final List<AlertDefinition> m_definitions;
+
+  /**
+   * Constructor.
+   *
+   * @param clusterName
+   *          the name of the cluster this response is for (
+   * @param hostName
+   * @param hash
+   * @param definitions
+   *
+   * @see AlertDefinitionHash
+   */
+  public AlertDefinitionCommand(String clusterName, String hostName,
+      String hash, List<AlertDefinition> definitions) {
+    super(AgentCommandType.ALERT_DEFINITION_COMMAND);
+
+    m_clusterName = clusterName;
+    m_hostName = hostName;
+    m_hash = hash;
+    m_definitions = definitions;
+  }
+
+  /**
+   *
+   */
+  @Override
+  public AgentCommandType getCommandType() {
+    return AgentCommandType.ALERT_DEFINITION_COMMAND;
+  }
+
+  /**
+   * Gets the global hash for all alert definitions for a given host.
+   *
+   * @return the hash (never {@code null}).
+   */
+  public String getHash() {
+    return m_hash;
+  }
+
+  /**
+   * Gets the alert definitions
+   *
+   * @return
+   */
+  public List<AlertDefinition> getAlertDefinitions() {
+    return m_definitions;
+  }
+
+  /**
+   * Gets the name of the cluster.
+   *
+   * @return the cluster name (not {@code null}).
+   */
+  public String getClusterName() {
+    return m_clusterName;
+  }
+
+  /**
+   * Gets the host name.
+   *
+   * @return the host name (not {@code null}).
+   */
+  public String getHostName() {
+    return m_hostName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 8a818a6..65b7b6f 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -58,6 +58,7 @@ import org.apache.ambari.server.state.ServiceInfo;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.state.alert.AlertDefinition;
 import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
 import org.apache.ambari.server.state.host.HostHealthyHeartbeatEvent;
@@ -223,10 +224,6 @@ public class HeartBeatHandler {
       annotateResponse(hostname, response);
     }
 
-    // send the alert definition hash for this host
-    Map<String, String> alertDefinitionHashes = alertDefinitionHash.getHashes(hostname);
-    response.setAlertDefinitionHash(alertDefinitionHashes);
-
     return response;
   }
 
@@ -597,9 +594,13 @@ public class HeartBeatHandler {
             response.addCancelCommand((CancelCommand) ac);
             break;
           }
+          case ALERT_DEFINITION_COMMAND: {
+            response.addAlertDefinitionCommand((AlertDefinitionCommand) ac);
+            break;
+          }
           default:
-            LOG.error("There is no action for agent command =" +
-                ac.getCommandType().name());
+            LOG.error("There is no action for agent command ="
+                + ac.getCommandType().name());
         }
       }
     }
@@ -677,6 +678,7 @@ public class HeartBeatHandler {
       clusterFsm.addHost(hostname);
       hostObject = clusterFsm.getHost(hostname);
     }
+
     // Resetting host state
     hostObject.setState(HostState.INIT);
 
@@ -693,6 +695,7 @@ public class HeartBeatHandler {
         null != register.getPublicHostname() ? register.getPublicHostname() : hostname,
         new AgentVersion(register.getAgentVersion()), now, register.getHardwareProfile(),
         register.getAgentEnv()));
+
     RegistrationResponse response = new RegistrationResponse();
     if (cmds.isEmpty()) {
       //No status commands needed let the fsm know that status step is done
@@ -706,6 +709,10 @@ public class HeartBeatHandler {
 
     response.setResponseStatus(RegistrationStatus.OK);
 
+    // force the registering agent host to receive its list of alert definitions
+    List<AlertDefinitionCommand> alertDefinitionCommands = getAlertDefinitionCommands(hostname);
+    response.setAlertDefinitionCommands(alertDefinitionCommands);
+
     Long requestId = 0L;
     hostResponseIds.put(hostname, requestId);
     response.setResponseId(requestId);
@@ -771,4 +778,40 @@ public class HeartBeatHandler {
 
     return result;
   }
+
+  /**
+   * Gets the {@link AlertDefinitionCommand} instances that need to be sent for
+   * each cluster that the registering host is a member of.
+   *
+   * @param hostname
+   * @return
+   * @throws AmbariException
+   */
+  private List<AlertDefinitionCommand> getAlertDefinitionCommands(
+      String hostname) throws AmbariException {
+
+    Set<Cluster> hostClusters = clusterFsm.getClustersForHost(hostname);
+    if (null == hostClusters || hostClusters.size() == 0) {
+      return null;
+    }
+
+    List<AlertDefinitionCommand> commands = new ArrayList<AlertDefinitionCommand>();
+
+    // for every cluster this host is a member of, build the command
+    for (Cluster cluster : hostClusters) {
+      String clusterName = cluster.getClusterName();
+      alertDefinitionHash.invalidate(clusterName, hostname);
+
+      List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions(
+          clusterName, hostname);
+
+      String hash = alertDefinitionHash.getHash(clusterName, hostname);
+      AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
+          hostname, hash, definitions);
+
+      commands.add(command);
+    }
+
+    return commands;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
index 24bd8a2..0dff507 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatResponse.java
@@ -20,7 +20,6 @@ package org.apache.ambari.server.agent;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
 import org.codehaus.jackson.annotate.JsonProperty;
 
@@ -35,20 +34,19 @@ public class HeartBeatResponse {
   private List<StatusCommand> statusCommands = new ArrayList<StatusCommand>();
   private List<CancelCommand> cancelCommands = new ArrayList<CancelCommand>();
 
+  /**
+   * {@link AlertDefinitionCommand}s are used to isntruct the agent as to which
+   * alert definitions it needs to schedule.
+   */
+  @JsonProperty("alertDefinitionCommands")
+  private List<AlertDefinitionCommand> alertDefinitionCommands = new ArrayList<AlertDefinitionCommand>();
+
+
   private RegistrationCommand registrationCommand;
 
   private boolean restartAgent = false;
   private boolean hasMappedComponents = false;
 
-  /**
-   * A mapping between cluster name and the alert defintion hash for that
-   * cluster. The alert definition hash for a cluster is a hashed value of all
-   * of the UUIDs for each alert definition that the agent host should be
-   * scheduling. If any of the alert definitions change, their UUID will change
-   * which will cause this hash value to change.
-   */
-  private Map<String, String> alertDefinitionHashes = null;
-
   @JsonProperty("responseId")
   public long getResponseId() {
     return responseId;
@@ -99,6 +97,28 @@ public class HeartBeatResponse {
     this.registrationCommand = registrationCommand;
   }
 
+  /**
+   * Gets the alert definition commands that contain the alert definitions for
+   * each cluster that the host is a member of.
+   *
+   * @param commands
+   *          the commands, or {@code null} for none.
+   */
+  public List<AlertDefinitionCommand> getAlertDefinitionCommands() {
+    return alertDefinitionCommands;
+  }
+
+  /**
+   * Sets the alert definition commands that contain the alert definitions for
+   * each cluster that the host is a member of.
+   *
+   * @param commands
+   *          the commands, or {@code null} for none.
+   */
+  public void setAlertDefinitionCommands(List<AlertDefinitionCommand> commands) {
+    alertDefinitionCommands = commands;
+  }
+
   @JsonProperty("restartAgent")
   public boolean isRestartAgent() {
     return restartAgent;
@@ -119,16 +139,6 @@ public class HeartBeatResponse {
     this.hasMappedComponents = hasMappedComponents;
   }
 
-  @JsonProperty("alertDefinitionHashes")
-  public Map<String, String> getAlertDefinitionHash() {
-    return alertDefinitionHashes;
-  }
-
-  @JsonProperty("alertDefinitionHashes")
-  public void setAlertDefinitionHash(Map<String, String> alertDefinitionHashes) {
-    this.alertDefinitionHashes = alertDefinitionHashes;
-  }
-
   public void addExecutionCommand(ExecutionCommand execCmd) {
     executionCommands.add(execCmd);
   }
@@ -141,6 +151,10 @@ public class HeartBeatResponse {
     cancelCommands.add(cancelCmd);
   }
 
+  public void addAlertDefinitionCommand(AlertDefinitionCommand command) {
+    alertDefinitionCommands.add(command);
+  }
+
   @Override
   public String toString() {
     StringBuilder buffer = new StringBuilder("HeartBeatResponse{");
@@ -148,9 +162,9 @@ public class HeartBeatResponse {
     buffer.append(", executionCommands=").append(executionCommands);
     buffer.append(", statusCommands=").append(statusCommands);
     buffer.append(", cancelCommands=").append(cancelCommands);
+    buffer.append(", alertDefinitionCommands=").append(alertDefinitionCommands);
     buffer.append(", registrationCommand=").append(registrationCommand);
     buffer.append(", restartAgent=").append(restartAgent);
-    buffer.append(", alertDefinitionHashes=").append(alertDefinitionHashes);
     buffer.append('}');
     return buffer.toString();
   }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
index 9eab651..5336694 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatMonitor.java
@@ -18,7 +18,6 @@
 package org.apache.ambari.server.agent;
 
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.COMMAND_TIMEOUT;
-import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.GLOBAL;
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.HOOKS_FOLDER;
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.JDK_LOCATION;
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.SCRIPT;
@@ -30,6 +29,7 @@ import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.STACK_VER
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -39,7 +39,6 @@ import org.apache.ambari.server.actionmanager.ActionManager;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
 import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
-import org.apache.ambari.server.controller.MaintenanceStateHelper;
 import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
@@ -62,7 +61,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import com.google.inject.Injector;
-import java.util.HashSet;
 
 /**
  * Monitors the node state and heartbeats.
@@ -83,14 +81,14 @@ public class HeartbeatMonitor implements Runnable {
   public HeartbeatMonitor(Clusters clusters, ActionQueue aq, ActionManager am,
                           int threadWakeupInterval, Injector injector) {
     this.clusters = clusters;
-    this.actionQueue = aq;
-    this.actionManager = am;
+    actionQueue = aq;
+    actionManager = am;
     this.threadWakeupInterval = threadWakeupInterval;
-    this.configHelper = injector.getInstance(ConfigHelper.class);
-    this.ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
-    this.ambariManagementController = injector.getInstance(
+    configHelper = injector.getInstance(ConfigHelper.class);
+    ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
+    ambariManagementController = injector.getInstance(
             AmbariManagementController.class);
-    this.configuration = injector.getInstance(Configuration.class);
+    configuration = injector.getInstance(Configuration.class);
   }
 
   public void shutdown() {
@@ -246,37 +244,38 @@ public class HeartbeatMonitor implements Runnable {
     // apply config group overrides
     //Config clusterConfig = cluster.getDesiredConfigByType(GLOBAL);
     Collection<Config> clusterConfigs = cluster.getAllConfigs();
-    
+
     for(Config clusterConfig: clusterConfigs) {
-      if(!clusterConfig.getType().endsWith("-env"))
+      if(!clusterConfig.getType().endsWith("-env")) {
         continue;
-    
+      }
+
       if (clusterConfig != null) {
         // cluster config for 'global'
         Map<String, String> props = new HashMap<String, String>(clusterConfig.getProperties());
-  
+
         // Apply global properties for this host from all config groups
         Map<String, Map<String, String>> allConfigTags = configHelper
                 .getEffectiveDesiredTags(cluster, hostname);
-  
+
         Map<String, Map<String, String>> configTags = new HashMap<String,
                 Map<String, String>>();
-  
+
         for (Map.Entry<String, Map<String, String>> entry : allConfigTags.entrySet()) {
           if (entry.getKey().equals(clusterConfig.getType())) {
             configTags.put(clusterConfig.getType(), entry.getValue());
           }
         }
-  
+
         Map<String, Map<String, String>> properties = configHelper
                 .getEffectiveConfigProperties(cluster, configTags);
-  
+
         if (!properties.isEmpty()) {
           for (Map<String, String> propertyMap : properties.values()) {
             props.putAll(propertyMap);
           }
         }
-  
+
         configurations.put(clusterConfig.getType(), props);
 
         Map<String, Map<String, String>> attrs = new TreeMap<String, Map<String, String>>();
@@ -298,7 +297,9 @@ public class HeartbeatMonitor implements Runnable {
       Collection<Alert> clusterAlerts = cluster.getAlerts();
       Collection<Alert> alerts = new HashSet<Alert>();
       for (Alert alert : clusterAlerts) {
-        if (!alert.getName().equals("host_alert")) alerts.add(alert);
+        if (!alert.getName().equals("host_alert")) {
+          alerts.add(alert);
+        }
       }
       if (alerts.size() > 0) {
         statusCmd = new NagiosAlertCommand();

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
index dae80bb..8a24560 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/RegistrationResponse.java
@@ -18,6 +18,7 @@
 
 package org.apache.ambari.server.agent;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.codehaus.jackson.annotate.JsonProperty;
@@ -32,6 +33,13 @@ public class RegistrationResponse {
   private RegistrationStatus response;
 
   /**
+   * {@link AlertDefinitionCommand}s are used to isntruct the agent as to which
+   * alert definitions it needs to schedule.
+   */
+  @JsonProperty("alertDefinitionCommands")
+  private List<AlertDefinitionCommand> alertDefinitionCommands = new ArrayList<AlertDefinitionCommand>();
+
+  /**
    * exitstatus is a code of error which was rised on server side.
    * exitstatus = 0 (OK - Default)
    * exitstatus = 1 (Registration failed because
@@ -44,12 +52,12 @@ public class RegistrationResponse {
    * log - message, which will be printed to agents  log
    */
   @JsonProperty("log")
-  private String log;  
-  
+  private String log;
+
   //Response id to start with, usually zero.
   @JsonProperty("responseId")
   private long responseId;
-  
+
   @JsonProperty("statusCommands")
   private List<StatusCommand> statusCommands = null;
 
@@ -69,6 +77,28 @@ public class RegistrationResponse {
     this.statusCommands = statusCommands;
   }
 
+  /**
+   * Gets the alert definition commands that contain the alert definitions for
+   * each cluster that the host is a member of.
+   *
+   * @param commands
+   *          the commands, or {@code null} for none.
+   */
+  public List<AlertDefinitionCommand> getAlertDefinitionCommands() {
+    return alertDefinitionCommands;
+  }
+
+  /**
+   * Sets the alert definition commands that contain the alert definitions for
+   * each cluster that the host is a member of.
+   *
+   * @param commands
+   *          the commands, or {@code null} for none.
+   */
+  public void setAlertDefinitionCommands(List<AlertDefinitionCommand> commands) {
+    alertDefinitionCommands = commands;
+  }
+
   public long getResponseId() {
     return responseId;
   }
@@ -84,13 +114,15 @@ public class RegistrationResponse {
   public void setLog(String log) {
     this.log = log;
   }
- 
+
   @Override
   public String toString() {
-    return "RegistrationResponse{" +
-            "response=" + response +
-            ", responseId=" + responseId +
-            ", statusCommands=" + statusCommands +
-            '}';
+    StringBuilder buffer = new StringBuilder("RegistrationResponse{");
+    buffer.append("response=").append(response);
+    buffer.append(", responseId=").append(responseId);
+    buffer.append(", statusCommands=").append(statusCommands);
+    buffer.append(", alertDefinitionCommands=").append(alertDefinitionCommands);
+    buffer.append('}');
+    return buffer.toString();
   }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index 80af575..3347a77 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -60,9 +60,7 @@ import org.apache.ambari.server.state.Stack;
 import org.apache.ambari.server.state.StackId;
 import org.apache.ambari.server.state.StackInfo;
 import org.apache.ambari.server.state.alert.AlertDefinition;
-import org.apache.ambari.server.state.alert.MetricSource;
-import org.apache.ambari.server.state.alert.Source;
-import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.state.alert.AlertDefinitionFactory;
 import org.apache.ambari.server.state.stack.LatestRepoCallable;
 import org.apache.ambari.server.state.stack.MetricDefinition;
 import org.apache.ambari.server.state.stack.RepositoryXml;
@@ -72,12 +70,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonDeserializationContext;
-import com.google.gson.JsonDeserializer;
-import com.google.gson.JsonElement;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
 import com.google.gson.reflect.TypeToken;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
@@ -133,12 +125,16 @@ public class AmbariMetaInfo {
   private File stackRoot;
   private File serverVersionFile;
   private File customActionRoot;
+
   @Inject
   private MetainfoDAO metainfoDAO;
+
   @Inject
   Injector injector;
-  
-  
+
+  @Inject
+  private AlertDefinitionFactory alertDefinitionFactory;
+
   // Required properties by stack version
   private final Map<StackId, Map<String, Map<String, PropertyInfo>>> requiredProperties =
     new HashMap<StackId, Map<String, Map<String, PropertyInfo>>>();
@@ -153,11 +149,11 @@ public class AmbariMetaInfo {
   public AmbariMetaInfo(Configuration conf) throws Exception {
     String stackPath = conf.getMetadataPath();
     String serverVersionFilePath = conf.getServerVersionFilePath();
-    this.stackRoot = new File(stackPath);
-    this.serverVersionFile = new File(serverVersionFilePath);
-    this.customActionRoot = new File(conf.getCustomActionDefinitionPath());
+    stackRoot = new File(stackPath);
+    serverVersionFile = new File(serverVersionFilePath);
+    customActionRoot = new File(conf.getCustomActionDefinitionPath());
   }
-  
+
   public AmbariMetaInfo(File stackRoot, File serverVersionFile) throws Exception {
     this.stackRoot = stackRoot;
     this.serverVersionFile = serverVersionFile;
@@ -174,6 +170,8 @@ public class AmbariMetaInfo {
     readServerVersion();
     getConfigurationInformation(stackRoot);
     getCustomActionDefinitions(customActionRoot);
+
+    alertDefinitionFactory = injector.getInstance(AlertDefinitionFactory.class);
   }
 
   /**
@@ -190,13 +188,14 @@ public class AmbariMetaInfo {
                                             String serviceName, String componentName) throws AmbariException {
     ComponentInfo component = null;
     List<ComponentInfo> components = getComponentsByService(stackName, version, serviceName);
-    if (components != null)
+    if (components != null) {
       for (ComponentInfo cmp : components) {
         if (cmp.getName().equals(componentName)) {
           component = cmp;
           break;
         }
       }
+    }
     return component;
   }
 
@@ -227,24 +226,27 @@ public class AmbariMetaInfo {
 
     List<ComponentInfo> componentsByService = getComponentsByService(stackName, version, serviceName);
 
-    if (componentsByService.size() == 0)
+    if (componentsByService.size() == 0) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion=" + version
           + ", serviceName=" + serviceName
           + ", componentName=" + componentName);
+    }
 
     ComponentInfo componentResult = null;
 
     for (ComponentInfo component : componentsByService) {
-      if (component.getName().equals(componentName))
+      if (component.getName().equals(componentName)) {
         componentResult = component;
+      }
     }
 
-    if (componentResult == null)
+    if (componentResult == null) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion=" + version
           + ", serviceName=" + serviceName
           + ", componentName=" + componentName);
+    }
 
     return componentResult;
   }
@@ -334,8 +336,9 @@ public class AmbariMetaInfo {
 
     List<RepositoryInfo> repositoriesResult = new ArrayList<RepositoryInfo>();
     for (RepositoryInfo repository : repositories) {
-      if (repository.getOsType().equals(osType))
+      if (repository.getOsType().equals(osType)) {
         repositoriesResult.add(repository);
+      }
     }
     return repositoriesResult;
   }
@@ -345,22 +348,25 @@ public class AmbariMetaInfo {
 
     List<RepositoryInfo> repositories = getRepositories(stackName, version, osType);
 
-    if (repositories.size() == 0)
+    if (repositories.size() == 0) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion=" + version
           + ", osType=" + osType
           + ", repoId=" + repoId);
+    }
 
     RepositoryInfo repoResult = null;
     for (RepositoryInfo repository : repositories) {
-      if (repository.getRepoId().equals(repoId))
+      if (repository.getRepoId().equals(repoId)) {
         repoResult = repository;
+      }
     }
-    if (repoResult == null)
+    if (repoResult == null) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion= " + version
           + ", osType=" + osType
           + ", repoId= " + repoId);
+    }
     return repoResult;
   }
 
@@ -452,10 +458,10 @@ public class AmbariMetaInfo {
     Map<String, Map<String, String>> propertiesResult = new HashMap<String, Map<String, String>>();
 
     ServiceInfo service = getServiceInfo(stackName, version, serviceName);
-    if (service != null)
+    if (service != null) {
       if (serviceName.equals(service.getName())) {
         List<PropertyInfo> properties = service.getProperties();
-        if (properties != null)
+        if (properties != null) {
           for (PropertyInfo propertyInfo : properties) {
             Map<String, String> fileProperties = propertiesResult
                 .get(propertyInfo.getFilename());
@@ -471,7 +477,9 @@ public class AmbariMetaInfo {
             }
 
           }
+        }
       }
+    }
 
     return propertiesResult;
   }
@@ -497,10 +505,11 @@ public class AmbariMetaInfo {
     }
 
     services = stack.getServices();
-    if (services != null)
+    if (services != null) {
       for (ServiceInfo service : services) {
         servicesInfoResult.put(service.getName(), service);
       }
+    }
     return servicesInfoResult;
   }
 
@@ -508,13 +517,15 @@ public class AmbariMetaInfo {
 
     Map<String, ServiceInfo> services = getServices(stackName, version);
 
-    if (services.size() == 0)
+    if (services.size() == 0) {
       throw new StackAccessException("stackName=" + stackName + ", stackVersion=" + version + ", serviceName=" + serviceName);
+    }
 
     ServiceInfo serviceInfo = services.get(serviceName);
 
-    if (serviceInfo == null)
+    if (serviceInfo == null) {
       throw new StackAccessException("stackName=" + stackName + ", stackVersion=" + version + ", serviceName=" + serviceName);
+    }
 
     return serviceInfo;
 
@@ -532,13 +543,14 @@ public class AmbariMetaInfo {
     }
 
     services = stack.getServices();
-    if (services != null)
+    if (services != null) {
       for (ServiceInfo service : services) {
         if (serviceName.equals(service.getName())) {
           serviceInfoResult = service;
           break;
         }
       }
+    }
     return serviceInfoResult;
   }
 
@@ -546,8 +558,9 @@ public class AmbariMetaInfo {
     throws AmbariException {
     List<ServiceInfo> servicesResult = null;
     StackInfo stack = getStackInfo(stackName, version);
-    if (stack != null)
+    if (stack != null) {
       servicesResult = stack.getServices();
+    }
     return servicesResult;
   }
 
@@ -568,18 +581,18 @@ public class AmbariMetaInfo {
     throws AmbariException{
 
     HashSet<String> needRestartServices = new HashSet<String>();
-    
+
     List<ServiceInfo> serviceInfos = getSupportedServices(stackName, version);
-     
-    
+
+
     for (ServiceInfo service : serviceInfos) {
       if (service.isRestartRequiredAfterChange() != null && service.isRestartRequiredAfterChange()) {
         needRestartServices.add(service.getName());
       }
     }
     return needRestartServices;
-  }  
-  
+  }
+
   public List<StackInfo> getSupportedStacks() {
     return stacksResult;
   }
@@ -601,18 +614,21 @@ public class AmbariMetaInfo {
 
     Set<Stack> supportedStackNames = getStackNames();
 
-    if (supportedStackNames.size() == 0)
+    if (supportedStackNames.size() == 0) {
       throw new StackAccessException("stackName=" + stackName);
+    }
 
     Stack stackResult = null;
 
     for (Stack stack : supportedStackNames) {
-      if (stack.getStackName().equals(stackName))
+      if (stack.getStackName().equals(stackName)) {
         stackResult = stack;
+      }
     }
 
-    if (stackResult == null)
+    if (stackResult == null) {
       throw new StackAccessException("stackName=" + stackName);
+    }
 
     return stackResult;
   }
@@ -621,8 +637,9 @@ public class AmbariMetaInfo {
 
     Set<StackInfo> stackVersions = new HashSet<StackInfo>();
     for (StackInfo stackInfo : stacksResult) {
-      if (stackName.equals(stackInfo.getName()))
+      if (stackName.equals(stackInfo.getName())) {
         stackVersions.add(stackInfo);
+      }
     }
     return stackVersions;
   }
@@ -638,9 +655,10 @@ public class AmbariMetaInfo {
       }
     }
 
-    if (stackInfoResult == null)
+    if (stackInfoResult == null) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion=" + version);
+    }
 
     return stackInfoResult;
   }
@@ -659,24 +677,27 @@ public class AmbariMetaInfo {
       throws AmbariException {
     Set<PropertyInfo> properties = getProperties(stackName, version, serviceName);
 
-    if (properties.size() == 0)
+    if (properties.size() == 0) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion=" + version
           + ", serviceName=" + serviceName
           + ", propertyName=" + propertyName);
+    }
 
     Set<PropertyInfo> propertyResult = new HashSet<PropertyInfo>();
 
     for (PropertyInfo property : properties) {
-      if (property.getName().equals(propertyName))
+      if (property.getName().equals(propertyName)) {
         propertyResult.add(property);
+      }
     }
 
-    if (propertyResult.isEmpty())
+    if (propertyResult.isEmpty()) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion=" + version
           + ", serviceName=" + serviceName
           + ", propertyName=" + propertyName);
+    }
 
     return propertyResult;
   }
@@ -703,28 +724,31 @@ public class AmbariMetaInfo {
 
     Set<OperatingSystemInfo> operatingSystems = getOperatingSystems(stackName, version);
 
-    if (operatingSystems.size() == 0)
+    if (operatingSystems.size() == 0) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion=" + version
           + ", osType=" + osType);
+    }
 
     OperatingSystemInfo resultOperatingSystem = null;
 
     for (OperatingSystemInfo operatingSystem : operatingSystems) {
-      if (operatingSystem.getOsType().equals(osType))
+      if (operatingSystem.getOsType().equals(osType)) {
         resultOperatingSystem = operatingSystem;
+      }
     }
 
-    if (resultOperatingSystem == null)
+    if (resultOperatingSystem == null) {
       throw new StackAccessException("stackName=" + stackName
           + ", stackVersion=" + version
           + ", osType=" + osType);
+    }
 
     return resultOperatingSystem;
   }
 
   private void readServerVersion() throws Exception {
-    File versionFile = this.serverVersionFile;
+    File versionFile = serverVersionFile;
     if (!versionFile.exists()) {
       throw new AmbariException("Server version file does not exist.");
     }
@@ -773,10 +797,11 @@ public class AmbariMetaInfo {
         + ", stackRoot = " + stackRootAbsPath);
     }
 
-    if (!stackRoot.isDirectory() && !stackRoot.exists())
+    if (!stackRoot.isDirectory() && !stackRoot.exists()) {
       throw new IOException("" + Configuration.METADETA_DIR_PATH
         + " should be a directory with stack"
         + ", stackRoot = " + stackRootAbsPath);
+    }
 
     StackExtensionHelper stackExtensionHelper = new StackExtensionHelper(injector, stackRoot);
     stackExtensionHelper.fillInfo();
@@ -961,8 +986,9 @@ public class AmbariMetaInfo {
     // validate existing
     RepositoryInfo ri = getRepository(stackName, stackVersion, osType, repoId);
 
-    if (!stackRoot.exists())
+    if (!stackRoot.exists()) {
       throw new StackAccessException("Stack root does not exist.");
+    }
 
     ri.setBaseUrl(newBaseUrl);
 
@@ -1048,7 +1074,7 @@ public class AmbariMetaInfo {
     }
     return requiredProperties;
   }
-  
+
   /**
    * @param stackName the stack name
    * @param stackVersion the stack version
@@ -1058,53 +1084,18 @@ public class AmbariMetaInfo {
    */
   public Set<AlertDefinition> getAlertDefinitions(String stackName, String stackVersion,
       String serviceName) throws AmbariException {
-    
+
     ServiceInfo svc = getService(stackName, stackVersion, serviceName);
+    File alertsFile = svc.getAlertsFile();
 
-    if (null == svc.getAlertsFile() || !svc.getAlertsFile().exists()) {
+    if (null == alertsFile || !alertsFile.exists()) {
       LOG.debug("Alerts file for " + stackName + "/" + stackVersion + "/" + serviceName + " not found.");
       return null;
     }
-    
-    Map<String, List<AlertDefinition>> map = null;
-
-    GsonBuilder builder = new GsonBuilder().registerTypeAdapter(Source.class,
-        new JsonDeserializer<Source>() {
-          @Override
-          public Source deserialize(JsonElement json, Type typeOfT,
-              JsonDeserializationContext context) throws JsonParseException {
-            JsonObject jsonObj = (JsonObject) json;
-
-            SourceType type = SourceType.valueOf(jsonObj.get("type").getAsString());
-            Class<? extends Source> cls = null;
-            
-            switch (type) {
-              case METRIC:
-                cls = MetricSource.class;
-                break;
-              default:
-                break;
-            }
-
-            if (null != cls)
-              return context.deserialize(json, cls);
-            else
-              return null;
-          }
-        });
-    
-    Gson gson = builder.create();
-
-    try {
-      Type type = new TypeToken<Map<String, List<AlertDefinition>>>(){}.getType();
-      map = gson.fromJson(new FileReader(svc.getAlertsFile()), type);
-    } catch (Exception e) {
-      LOG.error ("Could not read the alert definition file", e);
-      throw new AmbariException("Could not read alert definition file", e);
-    }
 
     Set<AlertDefinition> defs = new HashSet<AlertDefinition>();
-    
+    Map<String, List<AlertDefinition>> map = alertDefinitionFactory.getAlertDefinitions(alertsFile);
+
     for (Entry<String, List<AlertDefinition>> entry : map.entrySet()) {
       for (AlertDefinition ad : entry.getValue()) {
         ad.setServiceName(serviceName);
@@ -1114,8 +1105,7 @@ public class AmbariMetaInfo {
       }
       defs.addAll(entry.getValue());
     }
-    
+
     return defs;
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
index e9b0c9e..489fbb2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariServer.java
@@ -59,7 +59,6 @@ import org.apache.ambari.server.controller.internal.ViewPermissionResourceProvid
 import org.apache.ambari.server.controller.nagios.NagiosPropertyProvider;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.PersistenceType;
-import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.dao.BlueprintDAO;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.GroupDAO;
@@ -92,7 +91,6 @@ import org.apache.ambari.server.security.unsecured.rest.CertificateSign;
 import org.apache.ambari.server.security.unsecured.rest.ConnectionInfo;
 import org.apache.ambari.server.state.Clusters;
 import org.apache.ambari.server.state.ConfigHelper;
-import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.utils.StageUtils;
 import org.apache.ambari.server.utils.VersionUtils;
 import org.apache.ambari.server.view.ViewRegistry;
@@ -534,9 +532,7 @@ public class AmbariServer {
         injector.getInstance(Gson.class), ambariMetaInfo);
     StackDependencyResourceProvider.init(ambariMetaInfo);
     ClusterResourceProvider.init(injector.getInstance(BlueprintDAO.class), ambariMetaInfo, injector.getInstance(ConfigHelper.class));
-    AlertDefinitionResourceProvider.init(
-        injector.getInstance(AlertDefinitionDAO.class),
-        injector.getInstance(AlertDefinitionHash.class));
+    AlertDefinitionResourceProvider.init(injector);
     PermissionResourceProvider.init(injector.getInstance(PermissionDAO.class));
     ViewPermissionResourceProvider.init(injector.getInstance(PermissionDAO.class));
     PrivilegeResourceProvider.init(injector.getInstance(PrivilegeDAO.class), injector.getInstance(UserDAO.class),

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
index 83bd7b1..e3b5d93 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProvider.java
@@ -28,6 +28,9 @@ import java.util.Set;
 import java.util.UUID;
 
 import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.ActionQueue;
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
+import org.apache.ambari.server.agent.AlertDefinitionCommand;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
 import org.apache.ambari.server.controller.spi.NoSuchResourceException;
@@ -42,6 +45,8 @@ import org.apache.ambari.server.controller.utilities.PropertyHelper;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.alert.AlertDefinition;
 import org.apache.ambari.server.state.alert.AlertDefinitionHash;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.SourceType;
@@ -49,6 +54,7 @@ import org.apache.ambari.server.state.alert.SourceType;
 import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import com.google.inject.Inject;
+import com.google.inject.Injector;
 
 /**
  * ResourceProvider for Alert Definitions
@@ -76,13 +82,16 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
 
   private static AlertDefinitionHash alertDefinitionHash;
 
+  private static ActionQueue actionQueue;
+
   /**
    * @param instance
    */
   @Inject
-  public static void init(AlertDefinitionDAO instance, AlertDefinitionHash adh) {
-    alertDefinitionDAO = instance;
-    alertDefinitionHash = adh;
+  public static void init(Injector injector) {
+    alertDefinitionDAO = injector.getInstance(AlertDefinitionDAO.class);
+    alertDefinitionHash = injector.getInstance(AlertDefinitionHash.class);
+    actionQueue = injector.getInstance(ActionQueue.class);
   }
 
   AlertDefinitionResourceProvider(Set<String> propertyIds,
@@ -117,15 +126,25 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
     throws AmbariException {
     List<AlertDefinitionEntity> entities = new ArrayList<AlertDefinitionEntity>();
 
+    String clusterName = null;
     for (Map<String, Object> requestMap : requestMaps) {
       entities.add(toCreateEntity(requestMap));
+
+      if (null == clusterName) {
+        clusterName = (String) requestMap.get(ALERT_DEF_CLUSTER_NAME);
+      }
     }
 
+    Set<String> invalidatedHosts = new HashSet<String>();
+
     // !!! TODO multi-create in a transaction
     for (AlertDefinitionEntity entity : entities) {
       alertDefinitionDAO.create(entity);
-      alertDefinitionHash.invalidateHosts(entity);
+      invalidatedHosts.addAll(alertDefinitionHash.invalidateHosts(entity));
     }
+
+    // build alert definition commands for all agent hosts affected
+    enqueueAgentCommands(clusterName, invalidatedHosts);
   }
 
   private AlertDefinitionEntity toCreateEntity(Map<String, Object> requestMap)
@@ -248,6 +267,10 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
       throws SystemException, UnsupportedPropertyException,
       NoSuchResourceException, NoSuchParentResourceException {
 
+    String clusterName = null;
+    Set<String> invalidatedHosts = new HashSet<String>();
+    Clusters clusters = getManagementController().getClusters();
+
     for (Map<String, Object> requestPropMap : request.getProperties()) {
       for (Map<String, Object> propertyMap : getPropertyMaps(requestPropMap, predicate)) {
         Long id = (Long) propertyMap.get(ALERT_DEF_ID);
@@ -257,6 +280,17 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
           continue;
         }
 
+        if (null == clusterName) {
+          try {
+            Cluster cluster = clusters.getClusterById(entity.getClusterId());
+            if (null != cluster) {
+              clusterName = cluster.getClusterName();
+            }
+          } catch (AmbariException ae) {
+            throw new IllegalArgumentException("Invalid cluster ID", ae);
+          }
+        }
+
         if (propertyMap.containsKey(ALERT_DEF_NAME)) {
           entity.setDefinitionName((String) propertyMap.get(ALERT_DEF_NAME));
         }
@@ -301,11 +335,13 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
         entity.setHash(UUID.randomUUID().toString());
 
         alertDefinitionDAO.merge(entity);
-
-        alertDefinitionHash.invalidateHosts(entity);
+        invalidatedHosts.addAll(alertDefinitionHash.invalidateHosts(entity));
       }
     }
 
+    // build alert definition commands for all agent hosts affected
+    enqueueAgentCommands(clusterName, invalidatedHosts);
+
     notifyUpdate(Resource.Type.AlertDefinition, request, predicate);
 
     return getRequestStatus(null);
@@ -321,12 +357,17 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
 
     Set<Long> definitionIds = new HashSet<Long>();
 
+    String clusterName = null;
     for (final Resource resource : resources) {
       definitionIds.add((Long) resource.getPropertyValue(ALERT_DEF_ID));
+
+      if (null == clusterName) {
+        clusterName = (String) resource.getPropertyValue(ALERT_DEF_CLUSTER_NAME);
+      }
     }
 
+    final Set<String> invalidatedHosts = new HashSet<String>();
     for (Long definitionId : definitionIds) {
-
       LOG.info("Deleting alert definition {}", definitionId);
 
       final AlertDefinitionEntity entity = alertDefinitionDAO.findById(definitionId.longValue());
@@ -335,12 +376,15 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
         @Override
         public Void invoke() throws AmbariException {
           alertDefinitionDAO.remove(entity);
-          alertDefinitionHash.invalidateHosts(entity);
+          invalidatedHosts.addAll(alertDefinitionHash.invalidateHosts(entity));
           return null;
         }
       });
     }
 
+    // build alert definition commands for all agent hosts affected
+    enqueueAgentCommands(clusterName, invalidatedHosts);
+
     notifyDelete(Resource.Type.AlertDefinition, predicate);
     return getRequestStatus(null);
 
@@ -379,4 +423,45 @@ public class AlertDefinitionResourceProvider extends AbstractControllerResourceP
     return resource;
   }
 
+  /**
+   * Enqueue {@link AlertDefinitionCommand}s for every host specified so that
+   * they will receive a payload of alert definitions that they should be
+   * running.
+   * <p/>
+   * This method is typically called after
+   * {@link AlertDefinitionHash#invalidateHosts(AlertDefinitionEntity)} has
+   * caused a cache invalidation of the alert definition hash.
+   *
+   * @param clusterName
+   *          the name of the cluster (not {@code null}).
+   * @param hosts
+   *          the hosts to push {@link AlertDefinitionCommand}s for.
+   */
+  private void enqueueAgentCommands(String clusterName, Set<String> hosts) {
+    if (null == clusterName) {
+      LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
+      return;
+    }
+
+    if (null == hosts || hosts.size() == 0) {
+      return;
+    }
+
+    for (String hostName : hosts) {
+      List<AlertDefinition> definitions = alertDefinitionHash.getAlertDefinitions(
+          clusterName, hostName);
+
+      String hash = alertDefinitionHash.getHash(clusterName, hostName);
+
+      AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
+          hostName, hash, definitions);
+
+      // unlike other commands, the alert definitions commands are really
+      // designed to be 1:1 per change; if multiple invalidations happened
+      // before the next heartbeat, there would be several commands that would
+      // force the agents to reschedule their alerts more than once
+      actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
+      actionQueue.enqueue(hostName, command);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
index e6f9b39..8d9b3c2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinition.java
@@ -25,10 +25,9 @@ public class AlertDefinition {
 
   private String serviceName = null;
   private String componentName = null;
-  
+
   private String name = null;
-  private String label = null;
-  private String scope = null;
+  private Scope scope = null;
   private int interval = 1;
   private boolean enabled = true;
   private Source source = null;
@@ -39,7 +38,7 @@ public class AlertDefinition {
   public String getServiceName() {
     return serviceName;
   }
-  
+
   /**
    * @param name the service name
    */
@@ -53,70 +52,88 @@ public class AlertDefinition {
   public String getComponentName() {
     return componentName;
   }
-  
+
   /**
-   * 
+   *
    * @param name the component name
    */
   public void setComponentName(String name) {
     componentName = name;
   }
-  
+
   /**
    * @return the name
    */
   public String getName() {
     return name;
   }
-  
+
   /**
-   * @return the label
+   * @param definitionName
+   *          the definition name.
    */
-  public String getLabel() {
-    return label;
+  public void setName(String definitionName) {
+    name = definitionName;
   }
-  
+
   /**
    * @return the scope
    */
-  public String getScope() {
+  public Scope getScope() {
     return scope;
   }
-  
+
+  public void setScope(Scope definitionScope) {
+    scope = definitionScope;
+  }
+
   /**
    * @return the interval
    */
   public int getInterval() {
     return interval;
   }
-  
+
+  public void setInterval(int definitionInterval) {
+    interval = definitionInterval;
+  }
+
   /**
    * @return {@code true} if enabled
    */
   public boolean isEnabled() {
     return enabled;
   }
-  
+
+  public void setEnabled(boolean definitionEnabled) {
+    enabled = definitionEnabled;
+  }
+
   public Source getSource() {
     return source;
   }
-  
+
+  public void setSource(Source definitionSource) {
+    source = definitionSource;
+  }
+
   @Override
   public boolean equals(Object obj) {
-    if (null == obj || !obj.getClass().equals(AlertDefinition.class))
+    if (null == obj || !obj.getClass().equals(AlertDefinition.class)) {
       return false;
-    
+    }
+
     return name.equals(((AlertDefinition) obj).name);
   }
-  
+
   @Override
   public int hashCode() {
     return name.hashCode();
   }
-  
+
   @Override
   public String toString() {
     return name;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
new file mode 100644
index 0000000..1775f88
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionFactory.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.alert;
+
+import java.io.File;
+import java.io.FileReader;
+import java.lang.reflect.Type;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.reflect.TypeToken;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertDefinitionFactory} class is used to construct
+ * {@link AlertDefinition} instances from a variety of sources.
+ */
+@Singleton
+public class AlertDefinitionFactory {
+  /**
+   * Logger.
+   */
+  private final static Logger LOG = LoggerFactory.getLogger(AlertDefinitionFactory.class);
+
+  /**
+   * Builder used for type adapter registration.
+   */
+  private final GsonBuilder m_builder = new GsonBuilder().registerTypeAdapter(
+      Source.class, new AlertDefinitionSourceAdapter());
+
+  /**
+   * Thread safe deserializer.
+   */
+  private final Gson m_gson = m_builder.create();
+
+
+  /**
+   * Gets a list of all of the alert definitions defined in the specified JSON
+   * {@link File}.
+   *
+   * @param alertDefinitionFile
+   * @return
+   * @throws AmbariException
+   *           if there was a problem reading the file or parsing the JSON.
+   */
+  public Map<String, List<AlertDefinition>> getAlertDefinitions(
+      File alertDefinitionFile) throws AmbariException {
+    try {
+      Type type = new TypeToken<Map<String, List<AlertDefinition>>>(){}.getType();
+
+      return m_gson.fromJson(new FileReader(alertDefinitionFile), type);
+    } catch (Exception e) {
+      LOG.error("Could not read the alert definition file", e);
+      throw new AmbariException("Could not read alert definition file", e);
+    }
+  }
+
+  /**
+   * Gets an {@link AlertDefinition} constructed from the specified
+   * {@link AlertDefinitionEntity}.
+   *
+   * @param entity
+   *          the entity to use to construct the {@link AlertDefinition} (not
+   *          {@code null}).
+   * @return the definiion or {@code null} if it could not be coerced.
+   */
+  public AlertDefinition coerce(AlertDefinitionEntity entity) {
+    if (null == entity) {
+      return null;
+    }
+
+    AlertDefinition definition = new AlertDefinition();
+    definition.setComponentName(entity.getComponentName());
+    definition.setEnabled(entity.getEnabled());
+    definition.setInterval(entity.getScheduleInterval());
+    definition.setName(entity.getDefinitionName());
+    definition.setScope(entity.getScope());
+    definition.setServiceName(entity.getServiceName());
+
+    try{
+      String sourceJson = entity.getSource();
+      Source source = m_gson.fromJson(sourceJson, Source.class);
+      definition.setSource(source);
+    } catch (Exception exception) {
+      LOG.error(
+          "Unable to deserialized the alert definition source during coercion",
+          exception);
+    }
+
+    return definition;
+  }
+
+  /**
+   * Deserializes {@link Source} implementations.
+   */
+  private static final class AlertDefinitionSourceAdapter implements JsonDeserializer<Source>{
+    /**
+     *
+     */
+    @Override
+    public Source deserialize(JsonElement json, Type typeOfT,
+        JsonDeserializationContext context) throws JsonParseException {
+      JsonObject jsonObj = (JsonObject) json;
+
+      SourceType type = SourceType.valueOf(jsonObj.get("type").getAsString());
+      Class<? extends Source> cls = null;
+
+      switch (type) {
+        case METRIC:
+          cls = MetricSource.class;
+          break;
+        default:
+          break;
+      }
+
+      if (null != cls) {
+        return context.deserialize(json, cls);
+      } else {
+        return null;
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
index 7cbd4b3..3a89dd9 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
@@ -29,6 +29,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.controller.RootServiceResponseFactory.Components;
@@ -73,6 +75,9 @@ public class AlertDefinitionHash {
   @Inject
   private AlertDefinitionDAO m_definitionDao;
 
+  @Inject
+  private AlertDefinitionFactory m_factory;
+
   /**
    * All clusters.
    */
@@ -80,9 +85,15 @@ public class AlertDefinitionHash {
   private Clusters m_clusters;
 
   /**
-   * The hashes for all hosts.
+   * !!! TODO: this class needs some thoughts on locking
+   */
+  private ReadWriteLock m_lock = new ReentrantReadWriteLock();
+
+  /**
+   * The hashes for all hosts for any cluster. The key is the hostname and the
+   * value is a map between cluster name and hash.
    */
-  private Map<String, String> m_hashes = new ConcurrentHashMap<String, String>();
+  private Map<String, Map<String, String>> m_hashes = new HashMap<String, Map<String, String>>();
 
   /**
    * Gets a unique hash value reprssenting all of the alert definitions that
@@ -101,13 +112,19 @@ public class AlertDefinitionHash {
    * @return the unique hash or {@value #NULL_MD5_HASH} if none.
    */
   public String getHash(String clusterName, String hostName) {
-    String hash = m_hashes.get(hostName);
+    Map<String, String> clusterMapping = m_hashes.get(hostName);
+    if (null == clusterMapping) {
+      clusterMapping = new ConcurrentHashMap<String, String>();
+      m_hashes.put(hostName, clusterMapping);
+    }
+
+    String hash = clusterMapping.get(hostName);
     if (null != hash) {
       return hash;
     }
 
     hash = hash(clusterName, hostName);
-    m_hashes.put(hostName, hash);
+    clusterMapping.put(clusterName, hash);
 
     return hash;
   }
@@ -123,8 +140,7 @@ public class AlertDefinitionHash {
    * @see #getHash(String, String)
    * @throws AmbariException
    */
-  public Map<String, String> getHashes(String hostName)
-      throws AmbariException {
+  public Map<String, String> getHashes(String hostName) throws AmbariException {
     Set<Cluster> clusters = m_clusters.getClustersForHost(hostName);
     if (null == clusters || clusters.size() == 0) {
       return Collections.emptyMap();
@@ -148,7 +164,8 @@ public class AlertDefinitionHash {
   }
 
   /**
-   * Invalidates the cached hash for the specified agent host.
+   * Invalidates the cached hash for the specified agent host across all
+   * clusters.
    *
    * @param hostName
    *          the host to invalidate the cache for (not {@code null}).
@@ -158,6 +175,22 @@ public class AlertDefinitionHash {
   }
 
   /**
+   * Invalidates the cached hash for the specified agent host in the specified
+   * cluster.
+   *
+   * @param clusterName
+   *          the name of the cluster (not {@code null}).
+   * @param hostName
+   *          the host to invalidate the cache for (not {@code null}).
+   */
+  public void invalidate(String clusterName, String hostName) {
+    Map<String, String> clusterMapping = m_hashes.get(hostName);
+    if (null != clusterMapping) {
+      clusterMapping.remove(clusterName);
+    }
+  }
+
+  /**
    * Gets whether the alert definition has for the specified host has been
    * calculated and cached.
    *
@@ -165,12 +198,17 @@ public class AlertDefinitionHash {
    *          the host.
    * @return {@code true} if the hash was calculated; {@code false} otherwise.
    */
-  public boolean isHashCached(String hostName) {
-    if (null == hostName) {
+  public boolean isHashCached(String clusterName, String hostName) {
+    if (null == clusterName || null == hostName) {
+      return false;
+    }
+
+    Map<String, String> clusterMapping = m_hashes.get(hostName);
+    if (null == clusterMapping) {
       return false;
     }
 
-    return m_hashes.containsKey(hostName);
+    return clusterMapping.containsKey(clusterName);
   }
 
   /**
@@ -189,7 +227,42 @@ public class AlertDefinitionHash {
    * @return the alert definitions for the host, or an empty set (never
    *         {@code null}).
    */
-  public Set<AlertDefinitionEntity> getAlertDefinitions(String clusterName,
+  public List<AlertDefinition> getAlertDefinitions(
+      String clusterName,
+      String hostName) {
+
+    Set<AlertDefinitionEntity> entities = getAlertDefinitionEntities(
+        clusterName, hostName);
+
+    List<AlertDefinition> definitions = new ArrayList<AlertDefinition>(
+        entities.size());
+
+    for (AlertDefinitionEntity entity : entities) {
+      definitions.add(m_factory.coerce(entity));
+    }
+
+    return definitions;
+  }
+
+
+  /**
+   * Gets the alert definition entities for the specified host. This will include the
+   * following types of alert definitions:
+   * <ul>
+   * <li>Service/Component alerts</li>
+   * <li>Service alerts where the host is a MASTER</li>
+   * <li>Host alerts that are not bound to a service</li>
+   * </ul>
+   *
+   * @param clusterName
+   *          the cluster name (not {@code null}).
+   * @param hostName
+   *          the host name (not {@code null}).
+   * @return the alert definitions for the host, or an empty set (never
+   *         {@code null}).
+   */
+  private Set<AlertDefinitionEntity> getAlertDefinitionEntities(
+      String clusterName,
       String hostName) {
     Set<AlertDefinitionEntity> definitions = new HashSet<AlertDefinitionEntity>();
 
@@ -261,26 +334,21 @@ public class AlertDefinitionHash {
    * @param definition
    *          the definition to use to find the hosts to invlidate (not
    *          {@code null}).
+   * @return the hosts that were invalidated, or an empty set (never
+   *         {@code null}).
    */
-  public void invalidateHosts(AlertDefinitionEntity definition) {
+  public Set<String> invalidateHosts(AlertDefinitionEntity definition) {
     long clusterId = definition.getClusterId();
-
-    // intercept host agent alerts; they affect all hosts
-    String definitionServiceName = definition.getServiceName();
-    String definitionComponentName = definition.getComponentName();
-    if (Services.AMBARI.equals(definitionServiceName)
-        && Components.AMBARI_AGENT.equals(definitionComponentName)) {
-
-      invalidateAll();
-      return;
-    }
+    Set<String> invalidatedHosts = new HashSet<String>();
 
     Cluster cluster = null;
     Map<String, Host> hosts = null;
+    String clusterName = null;
     try {
       cluster = m_clusters.getClusterById(clusterId);
       if (null != cluster) {
-        hosts = m_clusters.getHostsForCluster(cluster.getClusterName());
+        clusterName = cluster.getClusterName();
+        hosts = m_clusters.getHostsForCluster(clusterName);
       }
 
       if (null == cluster) {
@@ -291,25 +359,35 @@ public class AlertDefinitionHash {
     }
 
     if (null == cluster) {
-      return;
+      return invalidatedHosts;
+    }
+
+    // intercept host agent alerts; they affect all hosts
+    String definitionServiceName = definition.getServiceName();
+    String definitionComponentName = definition.getComponentName();
+    if (Services.AMBARI.equals(definitionServiceName)
+        && Components.AMBARI_AGENT.equals(definitionComponentName)) {
+
+      invalidateAll();
+      invalidatedHosts.addAll(hosts.keySet());
+      return invalidatedHosts;
     }
 
     // find all hosts that have the matching service and component
-    if (null != hosts) {
-      for (String hostName : hosts.keySet()) {
-        List<ServiceComponentHost> hostComponents = cluster.getServiceComponentHosts(hostName);
-        if (null == hostComponents || hostComponents.size() == 0) {
-          continue;
-        }
+    for (String hostName : hosts.keySet()) {
+      List<ServiceComponentHost> hostComponents = cluster.getServiceComponentHosts(hostName);
+      if (null == hostComponents || hostComponents.size() == 0) {
+        continue;
+      }
 
-        // if a host has a matching service/component, invalidate it
-        for (ServiceComponentHost component : hostComponents) {
-          String serviceName = component.getServiceName();
-          String componentName = component.getServiceComponentName();
-          if (serviceName.equals(definitionServiceName)
-              && componentName.equals(definitionComponentName)) {
-            invalidate(hostName);
-          }
+      // if a host has a matching service/component, invalidate it
+      for (ServiceComponentHost component : hostComponents) {
+        String serviceName = component.getServiceName();
+        String componentName = component.getServiceComponentName();
+        if (serviceName.equals(definitionServiceName)
+            && componentName.equals(definitionComponentName)) {
+          invalidate(clusterName, hostName);
+          invalidatedHosts.add(hostName);
         }
       }
     }
@@ -320,7 +398,8 @@ public class AlertDefinitionHash {
     if (null == service) {
       LOG.warn("The alert definition {} has an unknown service of {}",
           definition.getDefinitionName(), definitionServiceName);
-      return;
+
+      return invalidatedHosts;
     }
 
     // get all master components of the definition's service; any hosts that
@@ -332,12 +411,15 @@ public class AlertDefinitionHash {
           Map<String, ServiceComponentHost> componentHosts = component.getValue().getServiceComponentHosts();
           if (null != componentHosts) {
             for (String componentHost : componentHosts.keySet()) {
-              invalidate(componentHost);
+              invalidate(clusterName, componentHost);
+              invalidatedHosts.add(componentHost);
             }
           }
         }
       }
     }
+
+    return invalidatedHosts;
   }
 
   /**
@@ -353,7 +435,8 @@ public class AlertDefinitionHash {
    * @return the unique hash or {@value #NULL_MD5_HASH} if none.
    */
   private String hash(String clusterName, String hostName) {
-    Set<AlertDefinitionEntity> definitions = getAlertDefinitions(clusterName,
+    Set<AlertDefinitionEntity> definitions = getAlertDefinitionEntities(
+        clusterName,
         hostName);
 
     // no definitions found for this host, don't bother hashing

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java
index 847db33..c4f5b86 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestActionQueue.java
@@ -17,52 +17,55 @@
  */
 package org.apache.ambari.server.agent;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.ambari.server.agent.AgentCommand.AgentCommandType;
+import org.easymock.EasyMock;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class TestActionQueue {
-  
+
   private static Logger LOG = LoggerFactory.getLogger(TestActionQueue.class);
-  
+
   private static int threadCount = 100;
   static class ActionQueueOperation implements Runnable {
-    
+
     enum OpType {
       ENQUEUE,
       DEQUEUE,
       DEQUEUEALL
     }
-  
+
     private volatile boolean shouldRun = true;
     private long [] opCounts;
     private ActionQueue actionQueue;
     private OpType operation;
     private String[] hosts;
-    
+
     public ActionQueueOperation(ActionQueue aq, String [] hosts, OpType op) {
-      this.actionQueue = aq;
-      this.operation = op;
+      actionQueue = aq;
+      operation = op;
       this.hosts = hosts;
       opCounts = new long [hosts.length];
       for (int i = 0; i < hosts.length; i++) {
         opCounts[i] = 0;
       }
     }
-    
+
     public long [] getOpCounts() {
       return opCounts;
     }
-    
+
     public void stop() {
-      this.shouldRun = false;
+      shouldRun = false;
     }
-    
+
     @Override
     public void run() {
       try {
@@ -82,7 +85,7 @@ public class TestActionQueue {
         throw new RuntimeException("Failure", ex);
       }
     }
-    
+
     private void enqueueOp() throws InterruptedException {
       while (shouldRun) {
         int index = 0;
@@ -94,7 +97,7 @@ public class TestActionQueue {
         Thread.sleep(1);
       }
     }
-    
+
     private void dequeueOp() throws InterruptedException {
       while (shouldRun) {
         int index = 0;
@@ -108,7 +111,7 @@ public class TestActionQueue {
         Thread.sleep(1);
       }
     }
-    
+
     private void dequeueAllOp() throws InterruptedException {
       while (shouldRun) {
         int index = 0;
@@ -123,7 +126,7 @@ public class TestActionQueue {
       }
     }
   }
-  
+
   @Test
   public void testConcurrentOperations() throws InterruptedException {
     ActionQueue aq = new ActionQueue();
@@ -185,7 +188,7 @@ public class TestActionQueue {
         }
       }
     }
-    
+
     // Stop all threads
     for (int i = 0; i < threadCount; i++) {
       dequeOperators[i].stop();
@@ -195,7 +198,7 @@ public class TestActionQueue {
     for (Thread consumer : consumers) {
       consumer.join();
     }
-    
+
     for (int h = 0; h<hosts.length; h++) {
       long opsEnqueued = 0;
       long opsDequeued = 0;
@@ -211,4 +214,53 @@ public class TestActionQueue {
       assertEquals(opsDequeued, opsEnqueued);
     }
   }
+
+  /**
+   * @throws Exception
+   */
+  @Test
+  public void testDequeueCommandType() throws Exception {
+    ActionQueue queue = new ActionQueue();
+    String c6401 = "c6401.ambari.apache.org";
+    String c6402 = "c6402.ambari.apache.org";
+
+    queue.enqueue(c6401,
+        EasyMock.createMockBuilder(ExecutionCommand.class).createNiceMock());
+
+    queue.enqueue(c6401,
+        EasyMock.createMockBuilder(StatusCommand.class).createNiceMock());
+
+    queue.enqueue(c6401,
+        EasyMock.createMockBuilder(AlertDefinitionCommand.class).createNiceMock());
+
+    queue.enqueue(c6401,
+        EasyMock.createMockBuilder(StatusCommand.class).createNiceMock());
+
+    queue.enqueue(c6401,
+        EasyMock.createMockBuilder(AlertDefinitionCommand.class).createNiceMock());
+
+    queue.enqueue(c6401,
+        EasyMock.createMockBuilder(StatusCommand.class).createNiceMock());
+
+    queue.enqueue(c6401,
+        EasyMock.createMockBuilder(AlertDefinitionCommand.class).createNiceMock());
+
+    queue.enqueue(c6402,
+        EasyMock.createMockBuilder(ExecutionCommand.class).createNiceMock());
+
+    queue.enqueue(c6402,
+        EasyMock.createMockBuilder(StatusCommand.class).createNiceMock());
+
+    queue.enqueue(c6402,
+        EasyMock.createMockBuilder(AlertDefinitionCommand.class).createNiceMock());
+
+    assertEquals(7, queue.size(c6401));
+
+    List<AgentCommand> commands = queue.dequeue(c6401,
+        AgentCommandType.ALERT_DEFINITION_COMMAND);
+
+    assertEquals(3, commands.size());
+    assertEquals(4, queue.size(c6401));
+    assertEquals(3, queue.size(c6402));
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java b/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
index 53a78eb..b1b83fa 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/api/services/AmbariMetaInfoTest.java
@@ -24,8 +24,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.lang.reflect.Method;
@@ -108,10 +106,12 @@ public class AmbariMetaInfoTest {
   @Before
   public void before() throws Exception {
     injector = Guice.createInjector(new MockModule());
+
     File stackRoot = new File("src/test/resources/stacks");
     LOG.info("Stacks file " + stackRoot.getAbsolutePath());
     metaInfo = new AmbariMetaInfo(stackRoot, new File("target/version"));
     metaInfo.injector = injector;
+
     try {
       metaInfo.init();
     } catch(Exception e) {

http://git-wip-us.apache.org/repos/asf/ambari/blob/8e481286/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
index 864eb08..d21df88 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertEquals;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -43,6 +44,7 @@ import org.apache.ambari.server.controller.spi.Request;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
 import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
 import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
 import org.apache.ambari.server.state.Cluster;
@@ -54,13 +56,20 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.util.Modules;
+
 /**
  * AlertDefinition tests
  */
 public class AlertDefinitionResourceProviderTest {
 
-  AlertDefinitionDAO dao = null;
-  AlertDefinitionHash definitionHash = null;
+  private AlertDefinitionDAO dao = null;
+  private AlertDefinitionHash definitionHash = null;
+  private Injector m_injector;
 
   private static String DEFINITION_UUID = UUID.randomUUID().toString();
 
@@ -69,7 +78,10 @@ public class AlertDefinitionResourceProviderTest {
     dao = createStrictMock(AlertDefinitionDAO.class);
     definitionHash = createNiceMock(AlertDefinitionHash.class);
 
-    AlertDefinitionResourceProvider.init(dao, definitionHash);
+    m_injector = Guice.createInjector(Modules.override(
+        new InMemoryDefaultTestModule()).with(new MockModule()));
+
+    AlertDefinitionResourceProvider.init(m_injector);
   }
 
   /**
@@ -185,8 +197,9 @@ public class AlertDefinitionResourceProviderTest {
     expectLastCall();
 
     // creating a single definition should invalidate hosts of the definition
-    definitionHash.invalidateHosts(EasyMock.anyObject(AlertDefinitionEntity.class));
-    expectLastCall().once();
+    expect(
+        definitionHash.invalidateHosts(EasyMock.anyObject(AlertDefinitionEntity.class))).andReturn(
+        new HashSet<String>()).once();
 
     replay(amc, clusters, cluster, dao, definitionHash);
 
@@ -235,15 +248,18 @@ public class AlertDefinitionResourceProviderTest {
     Cluster cluster = createMock(Cluster.class);
     expect(amc.getClusters()).andReturn(clusters).atLeastOnce();
     expect(clusters.getCluster((String) anyObject())).andReturn(cluster).atLeastOnce();
-    expect(cluster.getClusterId()).andReturn(Long.valueOf(1)).anyTimes();
+    expect(clusters.getClusterById(EasyMock.anyInt())).andReturn(cluster).atLeastOnce();
+    expect(cluster.getClusterId()).andReturn(Long.valueOf(1)).atLeastOnce();
+    expect(cluster.getClusterName()).andReturn("c1").atLeastOnce();
 
     Capture<AlertDefinitionEntity> entityCapture = new Capture<AlertDefinitionEntity>();
     dao.create(capture(entityCapture));
     expectLastCall();
 
     // updateing a single definition should invalidate hosts of the definition
-    definitionHash.invalidateHosts(EasyMock.anyObject(AlertDefinitionEntity.class));
-    expectLastCall().once();
+    expect(
+        definitionHash.invalidateHosts(EasyMock.anyObject(AlertDefinitionEntity.class))).andReturn(
+        new HashSet<String>()).atLeastOnce();
 
     replay(amc, clusters, cluster, dao, definitionHash);
 
@@ -311,8 +327,9 @@ public class AlertDefinitionResourceProviderTest {
     expectLastCall();
 
     // deleting a single definition should invalidate hosts of the definition
-    definitionHash.invalidateHosts(EasyMock.anyObject(AlertDefinitionEntity.class));
-    expectLastCall().once();
+    expect(
+        definitionHash.invalidateHosts(EasyMock.anyObject(AlertDefinitionEntity.class))).andReturn(
+        new HashSet<String>()).atLeastOnce();
 
     replay(amc, clusters, cluster, dao, definitionHash);
 
@@ -351,7 +368,6 @@ public class AlertDefinitionResourceProviderTest {
     Assert.assertEquals(Long.valueOf(1), entity1.getDefinitionId());
 
     verify(amc, clusters, cluster, dao);
-
   }
 
   /**
@@ -385,4 +401,21 @@ public class AlertDefinitionResourceProviderTest {
     return Arrays.asList(entity);
   }
 
+  /**
+  *
+  */
+  private class MockModule implements Module {
+    /**
+    *
+    */
+    @Override
+    public void configure(Binder binder) {
+      binder.bind(AlertDefinitionDAO.class).toInstance(dao);
+      binder.bind(AlertDefinitionHash.class).toInstance(definitionHash);
+      binder.bind(Clusters.class).toInstance(
+          EasyMock.createNiceMock(Clusters.class));
+      binder.bind(Cluster.class).toInstance(
+          EasyMock.createNiceMock(Cluster.class));
+    }
+  }
 }