You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/06/08 13:48:32 UTC
ambari git commit: AMBARI-11775 - Threading Can Cause Aggregate
Alerts To Have Duplicate Alert Entries With Different States (jonathanhurley)
Repository: ambari
Updated Branches:
refs/heads/trunk cbc34867b -> 7b41d3114
AMBARI-11775 - Threading Can Cause Aggregate Alerts To Have Duplicate Alert Entries With Different States (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7b41d311
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7b41d311
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7b41d311
Branch: refs/heads/trunk
Commit: 7b41d3114db74082119990d92b9d0f7760ee6261
Parents: cbc3486
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Sun Jun 7 21:16:53 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Mon Jun 8 07:48:26 2015 -0400
----------------------------------------------------------------------
.../alerts/AlertAggregateListener.java | 41 ++++-
.../listeners/alerts/AlertReceivedListener.java | 7 -
.../ambari/server/orm/dao/AlertSummaryDTO.java | 50 ++++++
.../state/alert/AggregateDefinitionMapping.java | 2 +-
.../alerts/AggregateAlertListenerTest.java | 167 +++++++++++++++++++
5 files changed, 255 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
index 4d2add1..ac2f907 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
@@ -19,6 +19,8 @@ package org.apache.ambari.server.events.listeners.alerts;
import java.text.MessageFormat;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.events.AggregateAlertRecalculateEvent;
@@ -35,9 +37,11 @@ import org.apache.ambari.server.state.alert.AggregateSource;
import org.apache.ambari.server.state.alert.AlertDefinition;
import org.apache.ambari.server.state.alert.Reporting;
import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -70,6 +74,13 @@ public class AlertAggregateListener {
private final AlertEventPublisher m_publisher;
/**
+ * A cache used to store the last state and text of an aggregate alert. We
+ * shouldn't need to fire new aggregate alerts unless the state or text has
+ * changed.
+ */
+ private Map<String, Alert> m_alertCache = new ConcurrentHashMap<String, Alert>();
+
+ /**
* Used for quick lookups of aggregate alerts.
*/
@Inject
@@ -122,6 +133,9 @@ public class AlertAggregateListener {
/**
* Calculates the aggregate alert state if there is an aggregate alert for the
* specified alert.
+ * <p/>
+ * This method should not be decoratd with {@link AllowConcurrentEvents} since
+ * it would need extra locking around {@link #m_alertCache}.
*
* @param clusterId
* the ID of the cluster.
@@ -187,10 +201,29 @@ public class AlertAggregateListener {
}
}
- // make a new event and allow others to consume it
- AlertReceivedEvent aggEvent = new AlertReceivedEvent(clusterId,
- aggregateAlert);
+ // now that the alert has been created, see if we need to send it; only send
+ // alerts if the state or the text has changed
+ boolean sendAlertEvent = true;
+ Alert cachedAlert = m_alertCache.get(aggregateAlert.getName());
+ if (null != cachedAlert) {
+ AlertState cachedState = cachedAlert.getState();
+ AlertState alertState = aggregateAlert.getState();
+ String cachedText = cachedAlert.getText();
+ String alertText = aggregateAlert.getText();
+
+ if (cachedState == alertState && StringUtils.equals(cachedText, alertText)) {
+ sendAlertEvent = false;
+ }
+ }
+
+ // update the cache
+ m_alertCache.put(aggregateAlert.getName(), aggregateAlert);
- m_publisher.publish(aggEvent);
+ // make a new event and allow others to consume it, but only if the
+ // aggregate has changed
+ if (sendAlertEvent) {
+ AlertReceivedEvent aggEvent = new AlertReceivedEvent(clusterId, aggregateAlert);
+ m_publisher.publish(aggEvent);
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
index 41dd1d5..f28929a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
@@ -17,10 +17,6 @@
*/
package org.apache.ambari.server.events.listeners.alerts;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
@@ -39,10 +35,7 @@ import org.apache.ambari.server.state.Alert;
import org.apache.ambari.server.state.AlertState;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.Service;
-import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
index f56875f..0023def 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
@@ -78,4 +78,54 @@ public class AlertSummaryDTO {
public int getMaintenanceCount() {
return maintenanceCount;
}
+
+ /**
+ * Sets the count of {@link AlertState#OK} states.
+ *
+ * @param okCount
+ * the okCount to set
+ */
+ public void setOkCount(int okCount) {
+ this.okCount = okCount;
+ }
+
+ /**
+ * Sets the count of {@link AlertState#WARNING} states.
+ *
+ * @param warningCount
+ * the warningCount to set
+ */
+ public void setWarningCount(int warningCount) {
+ this.warningCount = warningCount;
+ }
+
+ /**
+ * Sets the count of {@link AlertState#CRITICAL} states.
+ *
+ * @param criticalCount
+ * the criticalCount to set
+ */
+ public void setCriticalCount(int criticalCount) {
+ this.criticalCount = criticalCount;
+ }
+
+ /**
+ * Sets the count of {@link AlertState#UNKNOWN} states.
+ *
+ * @param unknownCount
+ * the unknownCount to set
+ */
+ public void setUnknownCount(int unknownCount) {
+ this.unknownCount = unknownCount;
+ }
+
+ /**
+ * Sets the count of alerts in maintenance state.
+ *
+ * @param maintenanceCount
+ * the maintenanceCount to set
+ */
+ public void setMaintenanceCount(int maintenanceCount) {
+ this.maintenanceCount = maintenanceCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
index 104ebef..21ad99b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
@@ -32,7 +32,7 @@ import com.google.inject.Singleton;
* associated with them.
*/
@Singleton
-public final class AggregateDefinitionMapping {
+public class AggregateDefinitionMapping {
/**
* In-memory mapping of cluster ID to definition name / aggregate definition.
* This is used for fast lookups when receiving events.
http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
new file mode 100644
index 0000000..29969d6
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.alerts;
+
+import junit.framework.Assert;
+
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.MockEventListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.AlertSummaryDTO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
+import org.apache.ambari.server.state.alert.AggregateSource;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.Reporting;
+import org.apache.ambari.server.state.alert.Reporting.ReportTemplate;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.persist.PersistService;
+import com.google.inject.util.Modules;
+
+/**
+ * Tests the {@link AlertAggregateListener}.
+ */
+public class AggregateAlertListenerTest {
+
+ private Injector m_injector;
+ private MockEventListener m_listener;
+ private AlertsDAO m_alertsDao;
+ private AggregateDefinitionMapping m_aggregateMapping;
+
+ /**
+ *
+ */
+ @Before
+ public void setup() throws Exception {
+ m_injector = Guice.createInjector(Modules.override(
+ new InMemoryDefaultTestModule()).with(new MockModule()));
+
+ m_injector.getInstance(GuiceJpaInitializer.class);
+ m_listener = m_injector.getInstance(MockEventListener.class);
+
+ m_alertsDao = m_injector.getInstance(AlertsDAO.class);
+
+ // !!! need a synchronous op for testing
+ EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector).register(m_listener);
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector).register(m_listener);
+ }
+
+ /**
+ * @throws Exception
+ */
+ @After
+ public void teardown() throws Exception {
+ m_injector.getInstance(PersistService.class).stop();
+ m_injector = null;
+ }
+
+ /**
+ * Tests that the {@link AlertAggregateListener} caches values of the
+ * aggregates and only triggers events when needed.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAlertNoticeCreationFromEvent() throws Exception {
+ AlertCurrentEntity currentEntityMock = EasyMock.createNiceMock(AlertCurrentEntity.class);
+
+ // setup the mocks for the aggregate definition to avoid NPEs
+ AlertDefinition aggregateDefinition = new AlertDefinition();
+ aggregateDefinition.setName("mock-aggregate-alert");
+ AggregateSource aggregateSource = new AggregateSource();
+ aggregateSource.setAlertName("mock-aggregate-alert");
+ Reporting reporting = new Reporting();
+ ReportTemplate criticalTemplate = new ReportTemplate();
+ ReportTemplate okTemplate = new ReportTemplate();
+ criticalTemplate.setValue(.05);
+ criticalTemplate.setText("CRITICAL");
+ okTemplate.setText("OK");
+ reporting.setCritical(criticalTemplate);
+ reporting.setWarning(criticalTemplate);
+ reporting.setOk(okTemplate);
+ aggregateSource.setReporting(reporting);
+ aggregateDefinition.setSource(aggregateSource);
+
+ EasyMock.expect(
+ m_aggregateMapping.getAggregateDefinition(EasyMock.anyLong(), EasyMock.eq("mock-alert"))).andReturn(
+ aggregateDefinition).atLeastOnce();
+
+ AlertSummaryDTO summaryDTO = new AlertSummaryDTO(5,0,0,0,0);
+ EasyMock.expect(
+ m_alertsDao.findAggregateCounts(EasyMock.anyLong(), EasyMock.eq("mock-aggregate-alert"))).andReturn(
+ summaryDTO).atLeastOnce();
+
+ EasyMock.replay(m_alertsDao, m_aggregateMapping);
+
+ // check that we're starting at 0
+ Assert.assertEquals(0, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class));
+
+ // trigger an alert which will trigger the aggregate
+ Alert alert = new Alert("mock-alert", null, null, null, null, null);
+ AlertAggregateListener aggregateListener = m_injector.getInstance(AlertAggregateListener.class);
+ AlertStateChangeEvent event = new AlertStateChangeEvent(0, alert, currentEntityMock, null);
+ aggregateListener.onAlertStateChangeEvent(event);
+
+ // verify that one AlertReceivedEvent was fired (it's the one the listener
+ // creates for the aggregate)
+ Assert.assertEquals(1, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class));
+
+ // fire the same alert event again; the cache in the aggregate listener
+ // should prevent it from firing a new alert received event of its own
+ aggregateListener.onAlertStateChangeEvent(event);
+
+ // check that we're still at 1
+ Assert.assertEquals(1, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class));
+
+ // now change the returned summary DTO so that a new alert will get generated
+ summaryDTO.setOkCount(0);
+ summaryDTO.setCriticalCount(5);
+ aggregateListener.onAlertStateChangeEvent(event);
+ Assert.assertEquals(2, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class));
+ }
+
+ /**
+ *
+ */
+ private class MockModule implements Module {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void configure(Binder binder) {
+ m_alertsDao = EasyMock.createMock(AlertsDAO.class);
+ m_aggregateMapping = EasyMock.createMock(AggregateDefinitionMapping.class);
+ binder.bind(AlertsDAO.class).toInstance(m_alertsDao);
+ binder.bind(AggregateDefinitionMapping.class).toInstance(m_aggregateMapping);
+ }
+ }
+}