You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2014/11/25 22:50:35 UTC
[2/3] ambari git commit: AMBARI-8430 - Current Alerts Should Be
Cleaned Up With Ambari Cluster/Service/Component/Host Changes
(jonathanhurley)
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
new file mode 100644
index 0000000..6d8f34a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import java.text.MessageFormat;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertSummaryDTO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
+import org.apache.ambari.server.state.alert.AggregateSource;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.Reporting;
+import org.apache.ambari.server.state.alert.SourceType;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertAggregateListener} is used to listen for all incoming
+ * {@link AlertReceivedEvent} instances and determine if there exists a
+ * {@link SourceType#AGGREGATE} alert that needs to run.
+ */
+@Singleton
+@EagerSingleton
+public class AlertAggregateListener {
+
+ @Inject
+ private AlertsDAO m_alertsDao = null;
+
+ /**
+ * The event publisher used to receive incoming events and publish new events
+ * when an aggregate alert is run.
+ */
+ private final AlertEventPublisher m_publisher;
+
+ /**
+ * Used for quick lookups of aggregate alerts.
+ */
+ @Inject
+ private AggregateDefinitionMapping m_aggregateMapping;
+
+ @Inject
+ public AlertAggregateListener(AlertEventPublisher publisher) {
+ m_publisher = publisher;
+ m_publisher.register(this);
+ }
+
+ /**
+ * Consume an alert that was received.
+ */
+ @Subscribe
+ public void onAlertEvent(AlertReceivedEvent event) {
+ AlertDefinition def = m_aggregateMapping.getAggregateDefinition(
+ event.getClusterId(), event.getAlert().getName());
+
+ if (null == def || null == m_alertsDao) {
+ return;
+ }
+
+ AggregateSource as = (AggregateSource) def.getSource();
+
+ AlertSummaryDTO summary = m_alertsDao.findAggregateCounts(
+ event.getClusterId(), as.getAlertName());
+
+ Alert alert = new Alert(def.getName(), null, def.getServiceName(),
+ null, null, AlertState.UNKNOWN);
+
+ alert.setLabel(def.getLabel());
+ alert.setTimestamp(System.currentTimeMillis());
+
+ if (0 == summary.getOkCount()) {
+ alert.setText("Cannot determine, there are no records");
+ } else if (summary.getUnknownCount() > 0) {
+ alert.setText("There are alerts with status UNKNOWN.");
+ } else {
+ Reporting reporting = as.getReporting();
+
+ int numerator = summary.getCriticalCount() + summary.getWarningCount();
+ int denominator = summary.getOkCount();
+ double value = (double)(numerator) / denominator;
+
+ if (value > reporting.getCritical().getValue()) {
+ alert.setState(AlertState.CRITICAL);
+ alert.setText(MessageFormat.format(reporting.getCritical().getText(),
+ denominator, numerator));
+
+ } else if (value > reporting.getWarning().getValue()) {
+ alert.setState(AlertState.WARNING);
+ alert.setText(MessageFormat.format(reporting.getWarning().getText(),
+ denominator, numerator));
+
+ } else {
+ alert.setState(AlertState.OK);
+ alert.setText(MessageFormat.format(reporting.getOk().getText(),
+ denominator, numerator));
+ }
+
+ }
+
+ // make a new event and allow others to consume it
+ AlertReceivedEvent aggEvent = new AlertReceivedEvent(event.getClusterId(), alert);
+
+ m_publisher.publish(aggEvent);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertDefinitionDisabledListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertDefinitionDisabledListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertDefinitionDisabledListener.java
new file mode 100644
index 0000000..010526b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertDefinitionDisabledListener.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.AlertDefinitionDisabledEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+
+/**
+ * The {@link AlertDefinitionDisabledListener} handles event relating to the
+ * disabling of an alert definition.
+ */
+@EagerSingleton
+public class AlertDefinitionDisabledListener {
+ /**
+ * Used for deleting the alert notices when a definition is disabled.
+ */
+ @Inject
+ private AlertsDAO m_alertsDao = null;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ * the publisher to register this listener with (not {@code null}).
+ */
+ @Inject
+ public AlertDefinitionDisabledListener(AmbariEventPublisher publisher) {
+ publisher.register(this);
+ }
+
+ /**
+ * Removes any {@link AlertCurrentEntity} instance associated with the
+ * specified alert definition.
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onEvent(AlertDefinitionDisabledEvent event) {
+ m_alertsDao.removeCurrentDisabledAlerts();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHashInvalidationListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHashInvalidationListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHashInvalidationListener.java
new file mode 100644
index 0000000..0accaf9
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHashInvalidationListener.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.agent.AlertDefinitionCommand;
+import org.apache.ambari.server.agent.HeartBeatResponse;
+import org.apache.ambari.server.events.AlertHashInvalidationEvent;
+import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.state.alert.AlertDefinitionHash;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertHashInvalidationListener} is used to respond to
+ * {@link AlertHashInvalidationEvent} instances and ensure that the
+ * {@link AlertDefinitionCommand}s are enqueued for the
+ * {@link HeartBeatResponse}.
+ */
+@Singleton
+@EagerSingleton
+public class AlertHashInvalidationListener {
+ /**
+ * Logger.
+ */
+ private static Logger LOG = LoggerFactory.getLogger(AlertHashInvalidationListener.class);
+
+ /**
+ * Invalidates hosts so that they can receive updated alert definition
+ * commands.
+ */
+ @Inject
+ private Provider<AlertDefinitionHash> m_alertDefinitionHash;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ */
+ @Inject
+ public AlertHashInvalidationListener(AmbariEventPublisher publisher) {
+ publisher.register(this);
+ }
+
+ /**
+ * Handles {@link AlertHashInvalidationEvent} by performing the following
+ * tasks:
+ * <ul>
+ * <li>Enqueuing {@link AlertDefinitionCommand}</li>
+ * </ul>
+ *
+ * @param event
+ * the event being handled.
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onAmbariEvent(AlertHashInvalidationEvent event) {
+ LOG.debug("An alert definition hash invalidation event was received: {}",
+ event);
+
+ Collection<String> hosts = event.getHosts();
+ long clusterId = event.getClusterId();
+
+ // no hosts, nothing to do
+ if (null == hosts || hosts.isEmpty()) {
+ return;
+ }
+
+ m_alertDefinitionHash.get().enqueueAgentCommands(clusterId, hosts);
+ }
+
+ /**
+ * Handles {@link AlertHashInvalidationEvent} by performing the following
+ * tasks:
+ * <ul>
+ * <li>Alert has invalidation</li>
+ * <li>Enqueuing {@link AlertDefinitionCommand}</li>
+ * </ul>
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onEvent(ServiceComponentUninstalledEvent event) {
+ long clusterId = event.getClusterId();
+ String hostName = event.getHostName();
+
+ if (null == hostName) {
+ return;
+ }
+
+ // invalidate hash and enqueue commands
+ m_alertDefinitionHash.get().invalidate(hostName);
+ m_alertDefinitionHash.get().enqueueAgentCommands(clusterId,
+ Collections.singletonList(hostName));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHostListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHostListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHostListener.java
new file mode 100644
index 0000000..51e5e67
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertHostListener.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.HostRemovedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertHostListener} class handles {@link HostRemovedEvent} and
+ * ensures that {@link AlertCurrentEntity} instances are properly cleaned up
+ */
+@Singleton
+@EagerSingleton
+public class AlertHostListener {
+ /**
+ * Logger.
+ */
+ private static Log LOG = LogFactory.getLog(AlertHostListener.class);
+
+ /**
+ * Used for removing current alerts when a service is removed.
+ */
+ @Inject
+ private AlertsDAO m_alertsDao;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ */
+ @Inject
+ public AlertHostListener(AmbariEventPublisher publisher) {
+ publisher.register(this);
+ }
+
+ /**
+ * Removes any current alerts associated with the specified host.
+ *
+ * @param event
+ * the published event being handled (not {@code null}).
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onAmbariEvent(HostRemovedEvent event) {
+ LOG.debug(event);
+
+ // remove any current alerts for the removed host
+ m_alertsDao.removeCurrentByHost(event.getHostName());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertLifecycleListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertLifecycleListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertLifecycleListener.java
new file mode 100644
index 0000000..df95d3d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertLifecycleListener.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import java.util.Set;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
+import org.apache.ambari.server.events.AlertHashInvalidationEvent;
+import org.apache.ambari.server.events.AlertDefinitionRegistrationEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.AlertDefinitionHash;
+import org.apache.ambari.server.state.alert.SourceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertLifecycleListener} handles events that are part of the alert
+ * infrastructure lifecycle such as definition registration events.
+ */
+@Singleton
+@EagerSingleton
+public class AlertLifecycleListener {
+ /**
+ * Logger.
+ */
+ private static Logger LOG = LoggerFactory.getLogger(AlertLifecycleListener.class);
+
+ /**
+ * Used for quick lookups of aggregate alerts.
+ */
+ @Inject
+ private AggregateDefinitionMapping m_aggregateMapping;
+
+ /**
+ * Invalidates hosts so that they can receive updated alert definition
+ * commands.
+ */
+ @Inject
+ private Provider<AlertDefinitionHash> m_alertDefinitionHash;
+
+ /**
+ * Used to publish events when an alert definition has a lifecycle event.
+ */
+ @Inject
+ private AmbariEventPublisher m_eventPublisher;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ */
+ @Inject
+ public AlertLifecycleListener(AmbariEventPublisher publisher) {
+ publisher.register(this);
+ }
+
+ /**
+ * Handles {@link AlertDefinitionRegistrationEvent} by performing the
+ * following tasks:
+ * <ul>
+ * <li>Registration with {@link AggregateDefinitionMapping}</li>
+ * </ul>
+ *
+ * @param event
+ * the event being handled.
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onAmbariEvent(AlertDefinitionRegistrationEvent event) {
+ AlertDefinition definition = event.getDefinition();
+
+ LOG.debug("Registering alert definition {}", definition);
+
+ if (definition.getSource().getType() == SourceType.AGGREGATE) {
+ m_aggregateMapping.registerAggregate(event.getClusterId(), definition);
+ }
+ }
+
+ /**
+ * Handles {@link AlertDefinitionDeleteEvent} by performing the following
+ * tasks:
+ * <ul>
+ * <li>Removal from with {@link AggregateDefinitionMapping}</li>
+ * <li>{@link AlertDefinitionHash} invalidation</li>
+ * </ul>
+ *
+ * @param event
+ * the event being handled.
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onAmbariEvent(AlertDefinitionDeleteEvent event) {
+ AlertDefinition definition = event.getDefinition();
+
+ LOG.debug("Removing alert definition {}", definition);
+
+ if (null == definition) {
+ return;
+ }
+
+ m_aggregateMapping.removeAssociatedAggregate(event.getClusterId(),
+ definition.getName());
+
+ // invalidate and publish
+ AlertDefinitionHash hashHelper = m_alertDefinitionHash.get();
+ Set<String> invalidatedHosts = hashHelper.invalidateHosts(definition);
+ AlertHashInvalidationEvent hashInvalidationEvent = new AlertHashInvalidationEvent(
+ definition.getClusterId(), invalidatedHosts);
+
+ m_eventPublisher.publish(hashInvalidationEvent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertMaintenanceModeListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertMaintenanceModeListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertMaintenanceModeListener.java
new file mode 100644
index 0000000..cbd99a5
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertMaintenanceModeListener.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import java.util.List;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.controller.MaintenanceStateHelper;
+import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertMaintenanceModeListener} handles events that relate to
+ * Maintenance Mode changes.
+ */
+@Singleton
+@EagerSingleton
+public class AlertMaintenanceModeListener {
+ /**
+ * Logger.
+ */
+ private static Logger LOG = LoggerFactory.getLogger(AlertMaintenanceModeListener.class);
+
+ /**
+ * Used for updating the MM of current alerts.
+ */
+ @Inject
+ private AlertsDAO m_alertsDao = null;
+
+ /**
+ * Used to assist in determining implied maintenance state.
+ */
+ @Inject
+ private Provider<MaintenanceStateHelper> m_maintenanceHelper;
+
+ /**
+ * Used to lookup MM states.
+ */
+ @Inject
+ private Provider<Clusters> m_clusters;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ */
+ @Inject
+ public AlertMaintenanceModeListener(AmbariEventPublisher publisher) {
+ publisher.register(this);
+ }
+
+ /**
+ * Handles {@link MaintenanceModeEvent} by performing the following tasks:
+ * <ul>
+ * <li>Iterates through all {@link AlertNoticeEntity}, updating the MM state</li>
+ * </ul>
+ *
+ * @param event
+ * the event being handled.
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onEvent(MaintenanceModeEvent event) {
+ List<AlertCurrentEntity> currentAlerts = m_alertsDao.findCurrent();
+
+ for( AlertCurrentEntity currentAlert : currentAlerts ){
+ MaintenanceState currentState = currentAlert.getMaintenanceState();
+ AlertHistoryEntity history = currentAlert.getAlertHistory();
+ AlertDefinitionEntity definition = history.getAlertDefinition();
+
+ long clusterId = history.getClusterId();
+ String hostName = history.getHostName();
+ String serviceName = history.getServiceName();
+ String componentName = history.getComponentName();
+
+ try {
+ Cluster cluster = m_clusters.get().getClusterById(clusterId);
+ if (null == cluster) {
+ LOG.warn("Unable to find cluster with ID {}", clusterId);
+ continue;
+ }
+
+ Service service = cluster.getService(serviceName);
+ if (null == service) {
+ LOG.warn("Unable to find service named {} in cluster {}",
+ serviceName, cluster.getClusterName());
+
+ continue;
+ }
+
+ // if this is a service-level alert, then check explicitely against the
+ // service for the MM state
+ if (null == componentName) {
+ MaintenanceState serviceState = service.getMaintenanceState();
+ if (currentState != serviceState) {
+ currentAlert.setMaintenanceState(serviceState);
+ m_alertsDao.merge(currentAlert);
+ }
+ }
+ // the presence of a component name means that it's a component alert
+ // which require a host
+ else {
+ if (hostName == null) {
+ LOG.warn("The alert {} for component {} must have a host",
+ definition.getDefinitionName(), componentName);
+
+ continue;
+ }
+
+ List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(hostName);
+ if (null == serviceComponentHosts) {
+ LOG.warn(
+ "Unable to find service components on host {} for {} in cluster {}",
+ hostName, serviceName, cluster.getClusterName());
+
+ continue;
+ }
+
+ ServiceComponentHost serviceComponentHost = null;
+ for (ServiceComponentHost sch : serviceComponentHosts) {
+ if (componentName.equals(sch.getServiceComponentName())) {
+ serviceComponentHost = sch;
+ break;
+ }
+ }
+
+ if (null == serviceComponentHost) {
+ LOG.warn("Unable to find component {} of {} on host {}",
+ componentName, serviceName, hostName);
+
+ continue;
+ }
+
+ MaintenanceState effectiveState = m_maintenanceHelper.get().getEffectiveState(
+ serviceComponentHost);
+
+ switch (effectiveState) {
+ case OFF:
+ if (currentState != MaintenanceState.OFF) {
+ currentAlert.setMaintenanceState(MaintenanceState.OFF);
+ m_alertsDao.merge(currentAlert);
+ }
+
+ break;
+ case ON:
+ case IMPLIED_FROM_HOST:
+ case IMPLIED_FROM_SERVICE:
+ case IMPLIED_FROM_SERVICE_AND_HOST:
+ if (currentState == MaintenanceState.OFF) {
+ currentAlert.setMaintenanceState(MaintenanceState.ON);
+ m_alertsDao.merge(currentAlert);
+ }
+
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (AmbariException ambariException) {
+ LOG.error("Unable to put alert '{}' for host {} into maintenance mode",
+ definition.getDefinitionName(), hostName, ambariException);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
new file mode 100644
index 0000000..83ee231
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
@@ -0,0 +1,337 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
+import org.apache.ambari.server.events.AlertEvent;
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent}
+ * and updates the appropriate DAOs. It may also fire new
+ * {@link AlertStateChangeEvent} when an {@link AlertState} change is detected.
+ */
+@Singleton
+@EagerSingleton
+public class AlertReceivedListener {
+ /**
+ * Logger.
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(AlertReceivedListener.class);
+
+ @Inject
+ private AlertsDAO m_alertsDao;
+
+ @Inject
+ private AlertDefinitionDAO m_definitionDao;
+
+ /**
+ * Used for looking up whether an alert has a valid service/component/host
+ */
+ @Inject
+ private Provider<Clusters> m_clusters;
+
+ /**
+ * Receives and publishes {@link AlertEvent} instances.
+ */
+ private AlertEventPublisher m_alertEventPublisher;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ */
+ @Inject
+ public AlertReceivedListener(AlertEventPublisher publisher) {
+ m_alertEventPublisher = publisher;
+ m_alertEventPublisher.register(this);
+ }
+
+ /**
+ * Adds an alert. Checks for a new state before creating a new history record.
+ *
+ * @param event
+ * the event to handle.
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onAlertEvent(AlertReceivedEvent event) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(event.toString());
+ }
+
+ Alert alert = event.getAlert();
+ long clusterId = event.getClusterId();
+
+ AlertDefinitionEntity definition = m_definitionDao.findByName(clusterId,
+ alert.getName());
+
+ if (null == definition) {
+ LOG.warn(
+ "Received an alert for {} which is a definition that does not exist anymore",
+ alert.getName());
+
+ return;
+ }
+
+ // it's possible that a definition which is disabled will still have a
+ // running alert returned; this will ensure we don't record it
+ if (!definition.getEnabled()) {
+ LOG.debug(
+ "Received an alert for {} which is disabled. No more alerts should be received for this definition.",
+ alert.getName());
+
+ return;
+ }
+
+ // jobs that were running when a service/component/host was changed
+ // which invalidate the alert should not be reported
+ if (!isValid(alert)) {
+ return;
+ }
+
+ AlertCurrentEntity current = null;
+
+ if (null == alert.getHost() || definition.isHostIgnored()) {
+ current = m_alertsDao.findCurrentByNameNoHost(clusterId, alert.getName());
+ } else {
+ current = m_alertsDao.findCurrentByHostAndName(clusterId, alert.getHost(),
+ alert.getName());
+ }
+
+ if (null == current) {
+ AlertHistoryEntity history = createHistory(clusterId, definition, alert);
+
+ current = new AlertCurrentEntity();
+ current.setMaintenanceState(MaintenanceState.OFF);
+ current.setAlertHistory(history);
+ current.setLatestTimestamp(alert.getTimestamp());
+ current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
+
+ m_alertsDao.create(current);
+
+ } else if (alert.getState() == current.getAlertHistory().getAlertState()) {
+ current.setLatestTimestamp(alert.getTimestamp());
+ current.setLatestText(alert.getText());
+
+ current = m_alertsDao.merge(current);
+ } else {
+ LOG.debug(
+ "Alert State Changed: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}",
+ current.getAlertId(), current.getLatestTimestamp(),
+ current.getAlertHistory().getAlertId(),
+ current.getAlertHistory().getAlertState());
+
+ AlertHistoryEntity oldHistory = current.getAlertHistory();
+ AlertState oldState = oldHistory.getAlertState();
+
+ // insert history, update current
+ AlertHistoryEntity history = createHistory(clusterId,
+ oldHistory.getAlertDefinition(), alert);
+
+ // manually create the new history entity since we are merging into
+ // an existing current entity
+ m_alertsDao.create(history);
+
+ current.setAlertHistory(history);
+ current.setLatestTimestamp(Long.valueOf(alert.getTimestamp()));
+ current.setOriginalTimestamp(Long.valueOf(alert.getTimestamp()));
+
+ current = m_alertsDao.merge(current);
+
+ LOG.debug(
+ "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}",
+ current.getAlertId(), current.getLatestTimestamp(),
+ current.getAlertHistory().getAlertId(),
+ current.getAlertHistory().getAlertState());
+
+ // broadcast the alert changed event for other subscribers
+ AlertStateChangeEvent alertChangedEvent = new AlertStateChangeEvent(
+ event.getClusterId(), event.getAlert(), current,
+ oldState);
+
+ m_alertEventPublisher.publish(alertChangedEvent);
+ }
+ }
+
+ /**
+ * Gets whether the specified alert is valid for its reported cluster,
+ * service, component, and host. This method is necessary for the case where a
+ * component has been removed from a host, but the alert data is going to be
+ * returned before the agent alert job can be unscheduled.
+ *
+ * @param alert
+ * the alert.
+ * @return {@code true} if the alert is for a valid combination of
+ * cluster/service/component/host.
+ */
+ private boolean isValid(Alert alert) {
+ String clusterName = alert.getCluster();
+ String serviceName = alert.getService();
+ String componentName = alert.getComponent();
+ String hostName = alert.getHost();
+
+ // if the alert is not bound to a cluster, then it's most likely a
+ // host alert and is always valid
+ if( null == clusterName ){
+ return true;
+ }
+
+ // AMBARI is always a valid service
+ String ambariServiceName = Services.AMBARI.name();
+ if (ambariServiceName.equals(serviceName)) {
+ return true;
+ }
+
+ final Cluster cluster;
+ try {
+ cluster = m_clusters.get().getCluster(clusterName);
+ if (null == cluster) {
+ LOG.error("Unable to process alert {} for an invalid cluster named {}",
+ alert.getName(), clusterName);
+
+ return false;
+ }
+ } catch (AmbariException ambariException) {
+ LOG.error("Unable to process alert {} for an invalid cluster named {}",
+ alert.getName(), clusterName, ambariException);
+
+ return false;
+ }
+
+ Map<String, Service> services = cluster.getServices();
+ Service service = services.get(serviceName);
+ if (null == service) {
+ LOG.error("Unable to process alert {} for an invalid service named {}",
+ alert.getName(), serviceName);
+
+ return false;
+ }
+
+ if (null != hostName) {
+ List<Host> hosts = m_clusters.get().getHosts();
+ if (null == hosts) {
+ LOG.error("Unable to process alert {} for an invalid host named {}",
+ alert.getName(), hostName);
+
+ return false;
+ }
+
+ boolean validHost = false;
+ for (Host host : hosts) {
+ if (hostName.equals(host.getHostName())) {
+ validHost = true;
+ break;
+ }
+ }
+
+ if (!validHost) {
+ LOG.error("Unable to process alert {} for an invalid host named {}",
+ alert.getName(), hostName);
+
+ return false;
+ }
+ }
+
+ // if the alert is for a host/component then verify that the component
+ // is actually installed on that host
+ if (null != hostName && null != componentName) {
+ boolean validServiceComponentHost = false;
+ List<ServiceComponentHost> serviceComponentHosts = cluster.getServiceComponentHosts(hostName);
+
+ for (ServiceComponentHost serviceComponentHost : serviceComponentHosts) {
+ if (componentName.equals(serviceComponentHost.getServiceComponentName())) {
+ validServiceComponentHost = true;
+ break;
+ }
+ }
+
+ if (!validServiceComponentHost) {
+ LOG.warn(
+ "Unable to process alert {} for an invalid service {} and component {} on host {}",
+ alert.getName(), serviceName, componentName, hostName);
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Convenience to create a new alert.
+ *
+ * @param clusterId
+ * the cluster id
+ * @param definition
+ * the definition
+ * @param alert
+ * the alert data
+ * @return the new history record
+ */
+ private AlertHistoryEntity createHistory(long clusterId,
+ AlertDefinitionEntity definition, Alert alert) {
+ AlertHistoryEntity history = new AlertHistoryEntity();
+ history.setAlertDefinition(definition);
+ history.setAlertInstance(alert.getInstance());
+ history.setAlertLabel(alert.getLabel());
+ history.setAlertState(alert.getState());
+ history.setAlertText(alert.getText());
+ history.setAlertTimestamp(Long.valueOf(alert.getTimestamp()));
+ history.setClusterId(Long.valueOf(clusterId));
+ history.setComponentName(alert.getComponent());
+ history.setServiceName(alert.getService());
+
+ // only set a host for the history item if the alert definition says to
+ if (definition.isHostIgnored()) {
+ history.setHostName(null);
+ } else {
+ history.setHostName(alert.getHost());
+ }
+
+ return history;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceComponentHostListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceComponentHostListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceComponentHostListener.java
new file mode 100644
index 0000000..92008e3
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceComponentHostListener.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+
+/**
+ * The {@link AlertServiceComponentHostListener} handles event relating to the
+ * disabling of an alert definition.
+ */
+@EagerSingleton
+public class AlertServiceComponentHostListener {
+
+ /**
+ * Used for deleting the alert notices when a definition is disabled.
+ */
+ @Inject
+ private AlertsDAO m_alertsDao = null;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ */
+ @Inject
+ public AlertServiceComponentHostListener(AmbariEventPublisher publisher) {
+ publisher.register(this);
+ }
+
+ /**
+ * Removes any {@link AlertCurrentEntity} for the given service, component and
+ * host.
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onEvent(ServiceComponentUninstalledEvent event) {
+ String serviceName = event.getServiceName();
+ String componentName = event.getComponentName();
+ String hostName = event.getHostName();
+
+ m_alertsDao.removeCurrentByServiceComponentHost(serviceName, componentName,
+ hostName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java
new file mode 100644
index 0000000..b56f23d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertServiceStateListener.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import java.text.MessageFormat;
+import java.util.Set;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.ControllerModule;
+import org.apache.ambari.server.events.ServiceInstalledEvent;
+import org.apache.ambari.server.events.ServiceRemovedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
+import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertGroupEntity;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.AlertDefinitionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Provider;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertServiceStateListener} class handles
+ * {@link ServiceInstalledEvent} and {@link ServiceRemovedEvent} and ensures
+ * that {@link AlertDefinitionEntity} and {@link AlertGroupEntity} instances are
+ * correctly populated or cleaned up.
+ */
+@Singleton
+@EagerSingleton
+public class AlertServiceStateListener {
+ /**
+ * Logger.
+ */
+ private static Log LOG = LogFactory.getLog(AlertServiceStateListener.class);
+
+ /**
+ * Services metainfo; injected lazily as a {@link Provider} since JPA is not
+ * fully initialized when this singleton is eagerly instantiated. See
+ * {@link AmbariServer#main(String[])} and the ordering of
+ * {@link ControllerModule} and {@link GuiceJpaInitializer}.
+ */
+ @Inject
+ private Provider<AmbariMetaInfo> m_metaInfoProvider;
+
+ /**
+ * Used when a service is installed to read alert definitions from the stack
+ * and coerce them into {@link AlertDefinitionEntity}.
+ */
+ @Inject
+ private AlertDefinitionFactory m_alertDefinitionFactory;
+
+ /**
+ * Used when a service is installed to insert a default
+ * {@link AlertGroupEntity} into the database.
+ */
+ @Inject
+ private AlertDispatchDAO m_alertDispatchDao;
+
+ /**
+ * Used when a service is installed to insert {@link AlertDefinitionEntity}
+ * into the database.
+ */
+ @Inject
+ private AlertDefinitionDAO m_definitionDao;
+
+ /**
+ * Used for removing current alerts when a service is removed.
+ */
+ @Inject
+ private AlertsDAO m_alertsDao;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ */
+ @Inject
+ public AlertServiceStateListener(AmbariEventPublisher publisher) {
+ publisher.register(this);
+ }
+
+ /**
+ * Handles service installed events by populating the database with all known
+ * alert definitions for the newly installed service and creates the service's
+ * default alert group.
+ *
+ * @param event
+ * the published event being handled (not {@code null}).
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onAmbariEvent(ServiceInstalledEvent event) {
+ LOG.debug(event);
+
+ long clusterId = event.getClusterId();
+ String stackName = event.getStackName();
+ String stackVersion = event.getStackVersion();
+ String serviceName = event.getServiceName();
+
+ // create the default alert group for the new service; this MUST be done
+ // before adding definitions so that they are properly added to the
+ // default group
+ AlertGroupEntity serviceAlertGroup = new AlertGroupEntity();
+ serviceAlertGroup.setClusterId(clusterId);
+ serviceAlertGroup.setDefault(true);
+ serviceAlertGroup.setGroupName(serviceName);
+ serviceAlertGroup.setServiceName(serviceName);
+
+ m_alertDispatchDao.create(serviceAlertGroup);
+
+ // populate alert definitions for the new service from the database, but
+ // don't worry about sending down commands to the agents; the host
+ // components are not yet bound to the hosts so we'd have no way of knowing
+ // which hosts are invalidated; do that in another impl
+ try {
+ Set<AlertDefinition> alertDefinitions = m_metaInfoProvider.get().getAlertDefinitions(
+ stackName, stackVersion, serviceName);
+
+ for (AlertDefinition definition : alertDefinitions) {
+ AlertDefinitionEntity entity = m_alertDefinitionFactory.coerce(
+ clusterId,
+ definition);
+
+ m_definitionDao.create(entity);
+ }
+ } catch (AmbariException ae) {
+ String message = MessageFormat.format(
+ "Unable to populate alert definitions from the database during installation of {0}",
+ serviceName);
+ LOG.error(message, ae);
+ }
+ }
+
+ /**
+ * Removes any current alerts associated with the specified service and the
+ * service's default alert group.
+ *
+ * @param event
+ * the published event being handled (not {@code null}).
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onAmbariEvent(ServiceRemovedEvent event) {
+ LOG.debug(event);
+
+ // remove any current alerts
+ m_alertsDao.removeCurrentByService(event.getServiceName());
+
+ // remove the default group for the service
+ AlertGroupEntity group = m_alertDispatchDao.findGroupByName(
+ event.getClusterId(), event.getServiceName());
+
+ if (null != group && group.isDefault()) {
+ m_alertDispatchDao.remove(group);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertStateChangedListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertStateChangedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertStateChangedListener.java
new file mode 100644
index 0000000..d84c449
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertStateChangedListener.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.alerts;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.orm.dao.AlertDispatchDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
+import org.apache.ambari.server.orm.entities.AlertGroupEntity;
+import org.apache.ambari.server.orm.entities.AlertHistoryEntity;
+import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
+import org.apache.ambari.server.orm.entities.AlertTargetEntity;
+import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.NotificationState;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.eventbus.AllowConcurrentEvents;
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The {@link AlertStateChangedListener} class response to
+ * {@link AlertStateChangeEvent} and updates {@link AlertNoticeEntity} instances
+ * in the database.
+ */
+@Singleton
+@EagerSingleton
+public class AlertStateChangedListener {
+
+ /**
+ * Logger.
+ */
+ private static Log LOG = LogFactory.getLog(AlertStateChangedListener.class);
+
+ /**
+ * Used for looking up groups and targets.
+ */
+ @Inject
+ private AlertDispatchDAO m_alertsDispatchDao;
+
+ /**
+ * Constructor.
+ *
+ * @param publisher
+ */
+ @Inject
+ public AlertStateChangedListener(AlertEventPublisher publisher) {
+ publisher.register(this);
+ }
+
+ /**
+ * Listens for when an alert's state has changed and creates
+ * {@link AlertNoticeEntity} instances when appropriate to notify
+ * {@link AlertTargetEntity}.
+ * <p/>
+ * {@link AlertNoticeEntity} are only created when the target has the
+ * {@link AlertState} in its list of states.
+ */
+ @Subscribe
+ @AllowConcurrentEvents
+ public void onAlertEvent(AlertStateChangeEvent event) {
+ LOG.debug(event);
+
+ // don't create any outbound alert notices if in MM
+ AlertCurrentEntity currentAlert = event.getCurrentAlert();
+ if (null != currentAlert
+ && currentAlert.getMaintenanceState() != MaintenanceState.OFF) {
+ return;
+ }
+
+ AlertHistoryEntity history = event.getNewHistoricalEntry();
+ AlertDefinitionEntity definition = history.getAlertDefinition();
+
+ List<AlertGroupEntity> groups = m_alertsDispatchDao.findGroupsByDefinition(definition);
+
+ // for each group, determine if there are any targets that need to receive
+ // a notification about the alert state change event
+ for (AlertGroupEntity group : groups) {
+ Set<AlertTargetEntity> targets = group.getAlertTargets();
+ if (null == targets || targets.size() == 0) {
+ continue;
+ }
+
+ for (AlertTargetEntity target : targets) {
+ if (!isAlertTargetInterested(target, history)) {
+ continue;
+ }
+
+ AlertNoticeEntity notice = new AlertNoticeEntity();
+ notice.setUuid(UUID.randomUUID().toString());
+ notice.setAlertTarget(target);
+ notice.setAlertHistory(event.getNewHistoricalEntry());
+ notice.setNotifyState(NotificationState.PENDING);
+
+ m_alertsDispatchDao.create(notice);
+ }
+ }
+ }
+
+ /**
+ * Gets whether the {@link AlertTargetEntity} is interested in receiving a
+ * notification about the {@link AlertHistoryEntity}'s state change.
+ *
+ * @param target
+ * the target (not {@code null}).
+ * @param history
+ * the history entry that represents the state change (not
+ * {@code null}).
+ * @return {@code true} if the target cares about this state change,
+ * {@code false} otherwise.
+ */
+ private boolean isAlertTargetInterested(AlertTargetEntity target,
+ AlertHistoryEntity history) {
+ Set<AlertState> alertStates = target.getAlertStates();
+ if (null != alertStates && alertStates.size() > 0) {
+ if (!alertStates.contains(history.getAlertState())) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
index 0236e03..96e66a62 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
@@ -29,7 +29,7 @@ import com.google.inject.Singleton;
* single-threaded, serial {@link EventBus}.
*/
@Singleton
-public final class AmbariEventPublisher {
+public class AmbariEventPublisher {
/**
* A single threaded event bus for processing Ambari events in serial.
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
index 8a8dfe6..1127dd1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
@@ -436,6 +436,87 @@ public class AlertsDAO {
}
/**
+ * Remove all current alerts that are disabled.
+ *
+ * @return the number of alerts removed.
+ */
+ @Transactional
+ public int removeCurrentDisabledAlerts() {
+ TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery(
+ "AlertCurrentEntity.removeDisabled", AlertCurrentEntity.class);
+
+ return query.executeUpdate();
+ }
+
+ /**
+ * Remove the current alert that matches the given service. This is used in
+ * cases where the service was removed from the cluster.
+ *
+ * @param serviceName
+ * the name of the service that the current alerts are being removed
+ * for (not {@code null}).
+ * @return the number of alerts removed.
+ */
+ @Transactional
+ public int removeCurrentByService(String serviceName) {
+
+ TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery(
+ "AlertCurrentEntity.removeByService", AlertCurrentEntity.class);
+
+ query.setParameter("serviceName", serviceName);
+ return query.executeUpdate();
+ }
+
+ /**
+ * Remove the current alert that matches the given host. This is used in cases
+ * where the host was removed from the cluster.
+ *
+ * @param hostName
+ * the name of the host that the current alerts are being removed for
+ * (not {@code null}).
+ * @return the number of alerts removed.
+ */
+ @Transactional
+ public int removeCurrentByHost(String hostName) {
+
+ TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery(
+ "AlertCurrentEntity.removeByHost", AlertCurrentEntity.class);
+
+ query.setParameter("hostName", hostName);
+ return query.executeUpdate();
+ }
+
+ /**
+ * Remove the current alert that matches the given service, component and
+ * host. This is used in cases where the component was removed from the host.
+ *
+ * @param serviceName
+ * the name of the service that the current alerts are being removed
+ * for (not {@code null}).
+ * @param componentName
+ * the name of the component that the current alerts are being
+ * removed for (not {@code null}).
+ * @param hostName
+ * the name of the host that the current alerts are being removed for
+ * (not {@code null}).
+ * @return the number of alerts removed.
+ */
+ @Transactional
+ public int removeCurrentByServiceComponentHost(String serviceName,
+ String componentName,
+ String hostName) {
+
+ TypedQuery<AlertCurrentEntity> query = entityManagerProvider.get().createNamedQuery(
+ "AlertCurrentEntity.removeByHostComponent", AlertCurrentEntity.class);
+
+ query.setParameter("serviceName", serviceName);
+ query.setParameter("componentName", componentName);
+ query.setParameter("hostName", hostName);
+
+ return query.executeUpdate();
+ }
+
+ /**
* Persists a new alert.
*
* @param alert
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
index 8ca297b..08c7fb3 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/entities/AlertCurrentEntity.java
@@ -53,7 +53,11 @@ import org.apache.ambari.server.state.MaintenanceState;
@NamedQuery(name = "AlertCurrentEntity.findByHostAndName", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId AND alert.alertHistory.alertDefinition.definitionName = :definitionName AND alert.alertHistory.hostName = :hostName"),
@NamedQuery(name = "AlertCurrentEntity.findByNameAndNoHost", query = "SELECT alert FROM AlertCurrentEntity alert WHERE alert.alertHistory.clusterId = :clusterId AND alert.alertHistory.alertDefinition.definitionName = :definitionName AND alert.alertHistory.hostName IS NULL"),
@NamedQuery(name = "AlertCurrentEntity.removeByHistoryId", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.alertId = :historyId"),
- @NamedQuery(name = "AlertCurrentEntity.removeByDefinitionId", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertDefinition.definitionId = :definitionId") })
+ @NamedQuery(name = "AlertCurrentEntity.removeByDefinitionId", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertDefinition.definitionId = :definitionId"),
+ @NamedQuery(name = "AlertCurrentEntity.removeDisabled", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertDefinition.enabled = 0"),
+ @NamedQuery(name = "AlertCurrentEntity.removeByService", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.serviceName = :serviceName"),
+ @NamedQuery(name = "AlertCurrentEntity.removeByHost", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.hostName = :hostName"),
+ @NamedQuery(name = "AlertCurrentEntity.removeByHostComponent", query = "DELETE FROM AlertCurrentEntity alert WHERE alert.alertHistory.serviceName = :serviceName AND alert.alertHistory.componentName = :componentName AND alert.alertHistory.hostName = :hostName") })
public class AlertCurrentEntity {
@Id
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
index 3675f87..3211cfc 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/Alert.java
@@ -178,10 +178,16 @@ public class Alert {
/**
* @return
*/
+ @JsonProperty("cluster")
public String getCluster() {
return cluster;
}
+ @JsonProperty("cluster")
+ public void setCluster(String cluster){
+ this.cluster = cluster;
+ }
+
@Override
public int hashCode() {
int result = alertHashCode();
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
index 7fa5afe..85e8314 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.controller.ServiceResponse;
import org.apache.ambari.server.events.MaintenanceModeEvent;
import org.apache.ambari.server.events.ServiceInstalledEvent;
+import org.apache.ambari.server.events.ServiceRemovedEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ClusterServiceDAO;
@@ -699,6 +700,14 @@ public class ServiceImpl implements Service {
if (persisted) {
removeEntities();
persisted = false;
+
+ // publish the service removed event
+ StackId stackId = cluster.getDesiredStackVersion();
+
+ ServiceRemovedEvent event = new ServiceRemovedEvent(getClusterId(),
+ stackId.getStackName(), stackId.getStackVersion(), getName());
+
+ eventPublisher.publish(event);
}
} finally {
readWriteLock.writeLock().unlock();
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
index da3a572..abffa5d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AlertDefinitionHash.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.state.alert;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,8 +30,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.ActionQueue;
@@ -80,6 +80,9 @@ public class AlertDefinitionHash {
@Inject
private AlertDefinitionDAO m_definitionDao;
+ /**
+ * Used to coerce {@link AlertDefinitionEntity} into {@link AlertDefinition}.
+ */
@Inject
private AlertDefinitionFactory m_factory;
@@ -89,6 +92,10 @@ public class AlertDefinitionHash {
@Inject
private Provider<Clusters> m_clusters;
+ /**
+ * Used to enqueue {@link AlertDefinitionCommand} on the heartbeat response
+ * queue.
+ */
@Inject
private ActionQueue m_actionQueue;
@@ -101,9 +108,11 @@ public class AlertDefinitionHash {
private Provider<ConfigHelper> m_configHelper;
/**
- * !!! TODO: this class needs some thoughts on locking
+ * Due to the nature of the asynchronous events for alerts and Ambari, this
+ * lock will ensure that only a single writer is writing to the
+ * {@link ActionQueue}.
*/
- private ReadWriteLock m_lock = new ReentrantReadWriteLock();
+ private ReentrantLock m_actionQueueLock = new ReentrantLock();
/**
* The hashes for all hosts for any cluster. The key is the hostname and the
@@ -146,33 +155,6 @@ public class AlertDefinitionHash {
}
/**
- * Gets a mapping between cluster and alert definition hashes for all of the
- * clusters that the given host belongs to.
- *
- * @param hostName
- * the host name (not {@code null}).
- * @return a mapping between cluster and alert definition hash or an empty map
- * (never @code null).
- * @see #getHash(String, String)
- * @throws AmbariException
- */
- public Map<String, String> getHashes(String hostName) throws AmbariException {
- Set<Cluster> clusters = m_clusters.get().getClustersForHost(hostName);
- if (null == clusters || clusters.size() == 0) {
- return Collections.emptyMap();
- }
-
- Map<String, String> hashes = new HashMap<String, String>();
- for (Cluster cluster : clusters) {
- String clusterName = cluster.getClusterName();
- String hash = getHash(clusterName, hostName);
- hashes.put(clusterName, hash);
- }
-
- return hashes;
- }
-
- /**
* Invalidate all cached hashes causing subsequent lookups to recalculate.
*/
public void invalidateAll() {
@@ -207,7 +189,7 @@ public class AlertDefinitionHash {
}
/**
- * Gets whether the alert definition has for the specified host has been
+ * Gets whether the alert definition hash for the specified host has been
* calculated and cached.
*
* @param hostName
@@ -314,7 +296,7 @@ public class AlertDefinitionHash {
* @return the hosts that were invalidated, or an empty set (never
* {@code null}).
*/
- public Set<String> invalidateHosts(long clusterId,
+ private Set<String> invalidateHosts(long clusterId,
SourceType definitionSourceType, String definitionName,
String definitionServiceName, String definitionComponentName) {
@@ -388,9 +370,12 @@ public class AlertDefinitionHash {
return affectedHosts;
}
+ String ambariServiceName = Services.AMBARI.name();
+ String agentComponentName = Components.AMBARI_AGENT.name();
+
// intercept host agent alerts; they affect all hosts
- if (Services.AMBARI.equals(definitionServiceName)
- && Components.AMBARI_AGENT.equals(definitionComponentName)) {
+ if (ambariServiceName.equals(definitionServiceName)
+ && agentComponentName.equals(definitionComponentName)) {
affectedHosts.addAll(hosts.keySet());
return affectedHosts;
}
@@ -456,7 +441,7 @@ public class AlertDefinitionHash {
* @param hosts
* the hosts to push {@link AlertDefinitionCommand}s for.
*/
- public void enqueueAgentCommands(long clusterId, Set<String> hosts) {
+ public void enqueueAgentCommands(long clusterId, Collection<String> hosts) {
String clusterName = null;
try {
@@ -483,7 +468,7 @@ public class AlertDefinitionHash {
* @param hosts
* the hosts to push {@link AlertDefinitionCommand}s for.
*/
- public void enqueueAgentCommands(String clusterName, Set<String> hosts) {
+ private void enqueueAgentCommands(String clusterName, Collection<String> hosts) {
if (null == clusterName) {
LOG.warn("Unable to create alert definition agent commands because of a null cluster name");
return;
@@ -493,29 +478,39 @@ public class AlertDefinitionHash {
return;
}
- for (String hostName : hosts) {
- List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
- hostName);
+ try {
+ m_actionQueueLock.lock();
+ for (String hostName : hosts) {
+ List<AlertDefinition> definitions = getAlertDefinitions(clusterName,
+ hostName);
- String hash = getHash(clusterName, hostName);
+ String hash = getHash(clusterName, hostName);
- AlertDefinitionCommand command = new AlertDefinitionCommand(clusterName,
- hostName, hash, definitions);
+ AlertDefinitionCommand command = new AlertDefinitionCommand(
+ clusterName, hostName, hash, definitions);
- try {
- Cluster cluster = m_clusters.get().getCluster(clusterName);
- command.addConfigs(m_configHelper.get(), cluster);
- } catch (AmbariException ae) {
- LOG.warn("Unable to add configurations to alert definition command", ae);
- }
+ try {
+ Cluster cluster = m_clusters.get().getCluster(clusterName);
+ command.addConfigs(m_configHelper.get(), cluster);
+ } catch (AmbariException ae) {
+ LOG.warn("Unable to add configurations to alert definition command",
+ ae);
+ }
+
+ // unlike other commands, the alert definitions commands are really
+ // designed to be 1:1 per change; if multiple invalidations happened
+ // before the next heartbeat, there would be several commands that would
+ // force the agents to reschedule their alerts more than once
+ m_actionQueue.dequeue(hostName,
+ AgentCommandType.ALERT_DEFINITION_COMMAND);
- // unlike other commands, the alert definitions commands are really
- // designed to be 1:1 per change; if multiple invalidations happened
- // before the next heartbeat, there would be several commands that would
- // force the agents to reschedule their alerts more than once
- m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_DEFINITION_COMMAND);
- m_actionQueue.dequeue(hostName, AgentCommandType.ALERT_EXECUTION_COMMAND);
- m_actionQueue.enqueue(hostName, command);
+ m_actionQueue.dequeue(hostName,
+ AgentCommandType.ALERT_EXECUTION_COMMAND);
+
+ m_actionQueue.enqueue(hostName, command);
+ }
+ } finally {
+ m_actionQueueLock.unlock();
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index 9cdb498..991d289 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -39,6 +39,8 @@ import org.apache.ambari.server.HostNotFoundException;
import org.apache.ambari.server.agent.DiskInfo;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.events.HostRemovedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ClusterVersionDAO;
import org.apache.ambari.server.orm.dao.ConfigGroupHostMappingDAO;
@@ -123,6 +125,12 @@ public class ClustersImpl implements Clusters {
@Inject
private SecurityHelper securityHelper;
+ /**
+ * Used to publish events relating to cluster CRUD operations.
+ */
+ @Inject
+ private AmbariEventPublisher eventPublisher;
+
@Inject
public ClustersImpl() {
clusters = new ConcurrentHashMap<String, Cluster>();
@@ -417,10 +425,11 @@ public class ClustersImpl implements Clusters {
r.lock();
try {
for (String host : hostSet) {
- if (!hosts.containsKey(host))
+ if (!hosts.containsKey(host)) {
throw new HostNotFoundException(host);
- else
+ } else {
hostMap.put(host, hosts.get(host));
+ }
}
} finally {
r.unlock();
@@ -437,10 +446,11 @@ public class ClustersImpl implements Clusters {
try {
for (String c : clusterSet) {
if (c != null) {
- if (!clusters.containsKey(c))
+ if (!clusters.containsKey(c)) {
throw new ClusterNotFoundException(c);
- else
+ } else {
clusterMap.put(c, clusters.get(c));
+ }
}
}
} finally {
@@ -574,6 +584,7 @@ public class ClustersImpl implements Clusters {
}
+ @Override
public void debugDump(StringBuilder sb) {
r.lock();
try {
@@ -645,13 +656,13 @@ public class ClustersImpl implements Clusters {
w.unlock();
}
}
-
+
@Override
public void unmapHostFromCluster(String hostname, String clusterName)
throws AmbariException {
checkLoaded();
-
+
w.lock();
try {
@@ -679,9 +690,9 @@ public class ClustersImpl implements Clusters {
} finally {
w.unlock();
}
-
+
}
-
+
@Transactional
private void unmapHostClusterEntities(String hostName, long clusterId) {
HostEntity hostEntity = hostDAO.findByName(hostName);
@@ -703,14 +714,15 @@ public class ClustersImpl implements Clusters {
}
}
}
-
+
@Override
public void deleteHost(String hostname) throws AmbariException {
checkLoaded();
- if (!hosts.containsKey(hostname))
+ if (!hosts.containsKey(hostname)) {
return;
-
+ }
+
w.lock();
try {
@@ -725,12 +737,17 @@ public class ClustersImpl implements Clusters {
hostDAO.refresh(entity);
hostDAO.remove(entity);
hosts.remove(hostname);
+
+ // publish the event
+ HostRemovedEvent event = new HostRemovedEvent(hostname);
+ eventPublisher.publish(event);
+
} catch (Exception e) {
throw new AmbariException("Could not remove host", e);
} finally {
w.unlock();
}
-
+
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
index a3fec84..98a1826 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/svccomphost/ServiceComponentHostImpl.java
@@ -31,7 +31,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.agent.AlertDefinitionCommand;
import org.apache.ambari.server.controller.ServiceComponentHostResponse;
+import org.apache.ambari.server.events.AlertHashInvalidationEvent;
import org.apache.ambari.server.events.MaintenanceModeEvent;
+import org.apache.ambari.server.events.ServiceComponentUninstalledEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.dao.HostComponentDesiredStateDAO;
import org.apache.ambari.server.orm.dao.HostComponentStateDAO;
@@ -46,13 +48,13 @@ import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntity;
import org.apache.ambari.server.orm.entities.ServiceComponentDesiredStateEntityPK;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.SecurityState;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostComponentAdminState;
import org.apache.ambari.server.state.HostConfig;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.MaintenanceState;
+import org.apache.ambari.server.state.SecurityState;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceComponentHostEvent;
@@ -561,12 +563,17 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
return;
}
+ // invalidate the host
String hostName = impl.getHostName();
impl.alertDefinitionHash.invalidate(impl.getClusterName(), hostName);
- impl.alertDefinitionHash.enqueueAgentCommands(impl.getClusterName(),
- Collections.singleton(hostName));
+ // publish the event
+ AlertHashInvalidationEvent hashInvalidationEvent = new AlertHashInvalidationEvent(
+ impl.getClusterId(), Collections.singletonList(hostName));
+
+ impl.eventPublisher.publish(hashInvalidationEvent);
impl.updateLastOpInfo(event.getType(), event.getOpTimestamp());
+
}
}
@@ -819,8 +826,9 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public void setDesiredSecurityState(SecurityState securityState) throws AmbariException {
- if(!securityState.isEndpoint())
+ if(!securityState.isEndpoint()) {
throw new AmbariException("The security state must be an endpoint state");
+ }
clusterGlobalLock.readLock().lock();
try {
@@ -1402,6 +1410,8 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
@Override
public void delete() {
+ boolean fireRemovalEvent = false;
+
clusterGlobalLock.writeLock().lock();
try {
writeLock.lock();
@@ -1409,12 +1419,12 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
if (persisted) {
removeEntities();
persisted = false;
+ fireRemovalEvent = true;
}
+
clusters.getCluster(getClusterName()).removeServiceComponentHost(this);
} catch (AmbariException ex) {
- if (LOG.isDebugEnabled()) {
- LOG.error(ex.getMessage());
- }
+ LOG.error("Unable to remove a service component from a host", ex);
} finally {
writeLock.unlock();
}
@@ -1422,6 +1432,23 @@ public class ServiceComponentHostImpl implements ServiceComponentHost {
clusterGlobalLock.writeLock().unlock();
}
+ // publish event for the removal of the SCH after the removal is
+ // completed, but only if it was persisted
+ if (fireRemovalEvent) {
+ long clusterId = getClusterId();
+ StackId stackId = getStackVersion();
+ String stackVersion = stackId.getStackVersion();
+ String stackName = stackId.getStackName();
+ String serviceName = getServiceName();
+ String componentName = getServiceComponentName();
+ String hostName = getHostName();
+
+ ServiceComponentUninstalledEvent event = new ServiceComponentUninstalledEvent(
+ clusterId, stackName, stackVersion, serviceName, componentName,
+ hostName);
+
+ eventPublisher.publish(event);
+ }
}
@Transactional
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
index 19d9f87..1f57397 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AmbariManagementControllerTest.java
@@ -83,6 +83,7 @@ import org.apache.ambari.server.controller.internal.RequestResourceFilter;
import org.apache.ambari.server.controller.internal.ServiceResourceProviderTest;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.customactions.ActionDefinition;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.metadata.ActionMetadata;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -93,7 +94,6 @@ import org.apache.ambari.server.security.authorization.Users;
import org.apache.ambari.server.serveraction.ServerAction;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.RepositoryVersionState;
import org.apache.ambari.server.state.Config;
import org.apache.ambari.server.state.ConfigFactory;
import org.apache.ambari.server.state.ConfigHelper;
@@ -103,6 +103,7 @@ import org.apache.ambari.server.state.HostComponentAdminState;
import org.apache.ambari.server.state.HostState;
import org.apache.ambari.server.state.MaintenanceState;
import org.apache.ambari.server.state.RepositoryInfo;
+import org.apache.ambari.server.state.RepositoryVersionState;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentFactory;
@@ -123,6 +124,7 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStoppedEve
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.collections.CollectionUtils;
import org.easymock.Capture;
+import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -8383,8 +8385,8 @@ public class AmbariManagementControllerTest {
controller.deleteHostComponents(schRequests);
ServiceComponentHost sch = sc1.getServiceComponentHost(host1);
assertTrue(sch.isRestartRequired());
- }
-
+ }
+
@Test
public void testDeleteHost() throws Exception {
String clusterName = "foo1";
@@ -8878,8 +8880,13 @@ public class AmbariManagementControllerTest {
properties.setProperty(Configuration.OS_VERSION_KEY,
"centos5");
properties.setProperty(Configuration.SHARED_RESOURCES_DIR_KEY, "src/test/resources/");
+
try {
install(new ControllerModule(properties));
+
+ // ambari events interfere with the workflow of this test
+ bind(AmbariEventPublisher.class).toInstance(
+ EasyMock.createMock(AmbariEventPublisher.class));
} catch (Exception e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/024a301b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
index 9b78c00..e94c012 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/AlertDefinitionResourceProviderTest.java
@@ -410,9 +410,7 @@ public class AlertDefinitionResourceProviderTest {
Cluster cluster = createMock(Cluster.class);
expect(amc.getClusters()).andReturn(clusters).atLeastOnce();
expect(clusters.getCluster((String) anyObject())).andReturn(cluster).atLeastOnce();
- expect(clusters.getClusterById(EasyMock.anyInt())).andReturn(cluster).atLeastOnce();
expect(cluster.getClusterId()).andReturn(Long.valueOf(1)).atLeastOnce();
- expect(cluster.getClusterName()).andReturn("c1").atLeastOnce();
Capture<AlertDefinitionEntity> entityCapture = new Capture<AlertDefinitionEntity>();
dao.create(capture(entityCapture));