You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/06/08 13:48:32 UTC

ambari git commit: AMBARI-11775 - Threading Can Cause Aggregate Alerts To Have Duplicate Alert Entries With Different States (jonathanhurley)

Repository: ambari
Updated Branches:
  refs/heads/trunk cbc34867b -> 7b41d3114


AMBARI-11775 - Threading Can Cause Aggregate Alerts To Have Duplicate Alert Entries With Different States (jonathanhurley)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/7b41d311
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/7b41d311
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/7b41d311

Branch: refs/heads/trunk
Commit: 7b41d3114db74082119990d92b9d0f7760ee6261
Parents: cbc3486
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Sun Jun 7 21:16:53 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Mon Jun 8 07:48:26 2015 -0400

----------------------------------------------------------------------
 .../alerts/AlertAggregateListener.java          |  41 ++++-
 .../listeners/alerts/AlertReceivedListener.java |   7 -
 .../ambari/server/orm/dao/AlertSummaryDTO.java  |  50 ++++++
 .../state/alert/AggregateDefinitionMapping.java |   2 +-
 .../alerts/AggregateAlertListenerTest.java      | 167 +++++++++++++++++++
 5 files changed, 255 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
index 4d2add1..ac2f907 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertAggregateListener.java
@@ -19,6 +19,8 @@ package org.apache.ambari.server.events.listeners.alerts;
 
 import java.text.MessageFormat;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.AggregateAlertRecalculateEvent;
@@ -35,9 +37,11 @@ import org.apache.ambari.server.state.alert.AggregateSource;
 import org.apache.ambari.server.state.alert.AlertDefinition;
 import org.apache.ambari.server.state.alert.Reporting;
 import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.eventbus.AllowConcurrentEvents;
 import com.google.common.eventbus.Subscribe;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -70,6 +74,13 @@ public class AlertAggregateListener {
   private final AlertEventPublisher m_publisher;
 
   /**
+   * A cache used to store the last state and text of an aggregate alert. We
+   * shouldn't need to fire new aggregate alerts unless the state or text has
+   * changed.
+   */
+  private Map<String, Alert> m_alertCache = new ConcurrentHashMap<String, Alert>();
+
+  /**
    * Used for quick lookups of aggregate alerts.
    */
   @Inject
@@ -122,6 +133,9 @@ public class AlertAggregateListener {
   /**
    * Calculates the aggregate alert state if there is an aggregate alert for the
    * specified alert.
+   * <p/>
+   * This method should not be decoratd with {@link AllowConcurrentEvents} since
+   * it would need extra locking around {@link #m_alertCache}.
    *
    * @param clusterId
    *          the ID of the cluster.
@@ -187,10 +201,29 @@ public class AlertAggregateListener {
       }
     }
 
-    // make a new event and allow others to consume it
-    AlertReceivedEvent aggEvent = new AlertReceivedEvent(clusterId,
-        aggregateAlert);
+    // now that the alert has been created, see if we need to send it; only send
+    // alerts if the state or the text has changed
+    boolean sendAlertEvent = true;
+    Alert cachedAlert = m_alertCache.get(aggregateAlert.getName());
+    if (null != cachedAlert) {
+      AlertState cachedState = cachedAlert.getState();
+      AlertState alertState = aggregateAlert.getState();
+      String cachedText = cachedAlert.getText();
+      String alertText = aggregateAlert.getText();
+
+      if (cachedState == alertState && StringUtils.equals(cachedText, alertText)) {
+        sendAlertEvent = false;
+      }
+    }
+
+    // update the cache
+    m_alertCache.put(aggregateAlert.getName(), aggregateAlert);
 
-    m_publisher.publish(aggEvent);
+    // make a new event and allow others to consume it, but only if the
+    // aggregate has changed
+    if (sendAlertEvent) {
+      AlertReceivedEvent aggEvent = new AlertReceivedEvent(clusterId, aggregateAlert);
+      m_publisher.publish(aggEvent);
+    }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
index 41dd1d5..f28929a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
@@ -17,10 +17,6 @@
  */
 package org.apache.ambari.server.events.listeners.alerts;
 
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
@@ -39,10 +35,7 @@ import org.apache.ambari.server.state.Alert;
 import org.apache.ambari.server.state.AlertState;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.state.Host;
 import org.apache.ambari.server.state.MaintenanceState;
-import org.apache.ambari.server.state.Service;
-import org.apache.ambari.server.state.ServiceComponentHost;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
index f56875f..0023def 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertSummaryDTO.java
@@ -78,4 +78,54 @@ public class AlertSummaryDTO {
   public int getMaintenanceCount() {
     return maintenanceCount;
   }
+
+  /**
+   * Sets the count of {@link AlertState#OK} states.
+   *
+   * @param okCount
+   *          the okCount to set
+   */
+  public void setOkCount(int okCount) {
+    this.okCount = okCount;
+  }
+
+  /**
+   * Sets the count of {@link AlertState#WARNING} states.
+   *
+   * @param warningCount
+   *          the warningCount to set
+   */
+  public void setWarningCount(int warningCount) {
+    this.warningCount = warningCount;
+  }
+
+  /**
+   * Sets the count of {@link AlertState#CRITICAL} states.
+   *
+   * @param criticalCount
+   *          the criticalCount to set
+   */
+  public void setCriticalCount(int criticalCount) {
+    this.criticalCount = criticalCount;
+  }
+
+  /**
+   * Sets the count of {@link AlertState#UNKNOWN} states.
+   *
+   * @param unknownCount
+   *          the unknownCount to set
+   */
+  public void setUnknownCount(int unknownCount) {
+    this.unknownCount = unknownCount;
+  }
+
+  /**
+   * Sets the count of alerts in maintenance state.
+   *
+   * @param maintenanceCount
+   *          the maintenanceCount to set
+   */
+  public void setMaintenanceCount(int maintenanceCount) {
+    this.maintenanceCount = maintenanceCount;
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
index 104ebef..21ad99b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/alert/AggregateDefinitionMapping.java
@@ -32,7 +32,7 @@ import com.google.inject.Singleton;
  * associated with them.
  */
 @Singleton
-public final class AggregateDefinitionMapping {
+public class AggregateDefinitionMapping {
   /**
    * In-memory mapping of cluster ID to definition name / aggregate definition.
    * This is used for fast lookups when receiving events.

http://git-wip-us.apache.org/repos/asf/ambari/blob/7b41d311/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
new file mode 100644
index 0000000..29969d6
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.state.alerts;
+
+import junit.framework.Assert;
+
+import org.apache.ambari.server.events.AlertReceivedEvent;
+import org.apache.ambari.server.events.AlertStateChangeEvent;
+import org.apache.ambari.server.events.MockEventListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
+import org.apache.ambari.server.orm.GuiceJpaInitializer;
+import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.dao.AlertSummaryDTO;
+import org.apache.ambari.server.orm.dao.AlertsDAO;
+import org.apache.ambari.server.orm.entities.AlertCurrentEntity;
+import org.apache.ambari.server.state.Alert;
+import org.apache.ambari.server.state.alert.AggregateDefinitionMapping;
+import org.apache.ambari.server.state.alert.AggregateSource;
+import org.apache.ambari.server.state.alert.AlertDefinition;
+import org.apache.ambari.server.state.alert.Reporting;
+import org.apache.ambari.server.state.alert.Reporting.ReportTemplate;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Binder;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Module;
+import com.google.inject.persist.PersistService;
+import com.google.inject.util.Modules;
+
+/**
+ * Tests the {@link AlertAggregateListener}.
+ */
+public class AggregateAlertListenerTest {
+
+  private Injector m_injector;
+  private MockEventListener m_listener;
+  private AlertsDAO m_alertsDao;
+  private AggregateDefinitionMapping m_aggregateMapping;
+
+  /**
+   *
+   */
+  @Before
+  public void setup() throws Exception {
+    m_injector = Guice.createInjector(Modules.override(
+        new InMemoryDefaultTestModule()).with(new MockModule()));
+
+    m_injector.getInstance(GuiceJpaInitializer.class);
+    m_listener = m_injector.getInstance(MockEventListener.class);
+
+    m_alertsDao = m_injector.getInstance(AlertsDAO.class);
+
+    // !!! need a synchronous op for testing
+    EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector).register(m_listener);
+    EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector).register(m_listener);
+  }
+
+  /**
+   * @throws Exception
+   */
+  @After
+  public void teardown() throws Exception {
+    m_injector.getInstance(PersistService.class).stop();
+    m_injector = null;
+  }
+
+  /**
+   * Tests that the {@link AlertAggregateListener} caches values of the
+   * aggregates and only triggers events when needed.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAlertNoticeCreationFromEvent() throws Exception {
+    AlertCurrentEntity currentEntityMock = EasyMock.createNiceMock(AlertCurrentEntity.class);
+
+    // setup the mocks for the aggregate definition to avoid NPEs
+    AlertDefinition aggregateDefinition = new AlertDefinition();
+    aggregateDefinition.setName("mock-aggregate-alert");
+    AggregateSource aggregateSource = new AggregateSource();
+    aggregateSource.setAlertName("mock-aggregate-alert");
+    Reporting reporting = new Reporting();
+    ReportTemplate criticalTemplate = new ReportTemplate();
+    ReportTemplate okTemplate = new ReportTemplate();
+    criticalTemplate.setValue(.05);
+    criticalTemplate.setText("CRITICAL");
+    okTemplate.setText("OK");
+    reporting.setCritical(criticalTemplate);
+    reporting.setWarning(criticalTemplate);
+    reporting.setOk(okTemplate);
+    aggregateSource.setReporting(reporting);
+    aggregateDefinition.setSource(aggregateSource);
+
+    EasyMock.expect(
+        m_aggregateMapping.getAggregateDefinition(EasyMock.anyLong(), EasyMock.eq("mock-alert"))).andReturn(
+        aggregateDefinition).atLeastOnce();
+
+    AlertSummaryDTO summaryDTO = new AlertSummaryDTO(5,0,0,0,0);
+    EasyMock.expect(
+        m_alertsDao.findAggregateCounts(EasyMock.anyLong(), EasyMock.eq("mock-aggregate-alert"))).andReturn(
+        summaryDTO).atLeastOnce();
+
+    EasyMock.replay(m_alertsDao, m_aggregateMapping);
+
+    // check that we're starting at 0
+    Assert.assertEquals(0, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class));
+
+    // trigger an alert which will trigger the aggregate
+    Alert alert = new Alert("mock-alert", null, null, null, null, null);
+    AlertAggregateListener aggregateListener = m_injector.getInstance(AlertAggregateListener.class);
+    AlertStateChangeEvent event = new AlertStateChangeEvent(0, alert, currentEntityMock, null);
+    aggregateListener.onAlertStateChangeEvent(event);
+
+    // verify that one AlertReceivedEvent was fired (it's the one the listener
+    // creates for the aggregate)
+    Assert.assertEquals(1, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class));
+
+    // fire the same alert event again; the cache in the aggregate listener
+    // should prevent it from firing a new alert received event of its own
+    aggregateListener.onAlertStateChangeEvent(event);
+
+    // check that we're still at 1
+    Assert.assertEquals(1, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class));
+
+    // now change the returned summary DTO so that a new alert will get generated
+    summaryDTO.setOkCount(0);
+    summaryDTO.setCriticalCount(5);
+    aggregateListener.onAlertStateChangeEvent(event);
+    Assert.assertEquals(2, m_listener.getAlertEventReceivedCount(AlertReceivedEvent.class));
+  }
+
+  /**
+   *
+   */
+  private class MockModule implements Module {
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void configure(Binder binder) {
+      m_alertsDao = EasyMock.createMock(AlertsDAO.class);
+      m_aggregateMapping = EasyMock.createMock(AggregateDefinitionMapping.class);
+      binder.bind(AlertsDAO.class).toInstance(m_alertsDao);
+      binder.bind(AggregateDefinitionMapping.class).toInstance(m_aggregateMapping);
+    }
+  }
+}