You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by wu...@apache.org on 2022/11/24 08:19:29 UTC

[ambari] branch trunk updated: AMBARI-25576: Primary key duplication error during flushing alerts from alerts cache (#3572)

This is an automated email from the ASF dual-hosted git repository.

wuzhiguo pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f7d5cb242e AMBARI-25576: Primary key duplication error during flushing alerts from alerts cache (#3572)
f7d5cb242e is described below

commit f7d5cb242ef0b66f16f04fc83143b85056e099db
Author: Yu Hou <52...@qq.com>
AuthorDate: Thu Nov 24 16:19:24 2022 +0800

    AMBARI-25576: Primary key duplication error during flushing alerts from alerts cache (#3572)
---
 .../listeners/alerts/AlertReceivedListener.java    | 29 +-------
 .../apache/ambari/server/orm/dao/AlertsDAO.java    | 77 ++++++++++++++++++++--
 .../ambari/server/utils/EventBusSynchronizer.java  | 49 ++++++++++++++
 .../ambari/server/orm/dao/AlertsDAOTest.java       | 29 +++++++-
 .../state/alerts/AggregateAlertListenerTest.java   |  2 +
 5 files changed, 152 insertions(+), 34 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
index db80e8cb16..57972ec6ab 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
@@ -66,7 +66,6 @@ import com.google.common.util.concurrent.Striped;
 import com.google.inject.Inject;
 import com.google.inject.Provider;
 import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
 
 /**
  * The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent}
@@ -385,7 +384,7 @@ public class AlertReceivedListener {
 
     // invokes the EntityManager create/merge on various entities in a single
     // transaction
-    saveEntities(toMerge, toCreateHistoryAndMerge);
+    m_alertsDao.saveEntities(toMerge, toCreateHistoryAndMerge);
 
     // broadcast events
     for (AlertEvent eventToFire : alertEvents) {
@@ -445,32 +444,6 @@ public class AlertReceivedListener {
     }
   }
 
-  /**
-   * Saves alert and alert history entities in single transaction
-   * @param toMerge - merge alert only
-   * @param toCreateHistoryAndMerge - create new history, merge alert
-   */
-  @Transactional
-  void saveEntities(List<AlertCurrentEntity> toMerge,
-      List<AlertCurrentEntity> toCreateHistoryAndMerge) {
-    for (AlertCurrentEntity entity : toMerge) {
-      m_alertsDao.merge(entity, m_configuration.isAlertCacheEnabled());
-    }
-
-    for (AlertCurrentEntity entity : toCreateHistoryAndMerge) {
-      m_alertsDao.create(entity.getAlertHistory());
-      m_alertsDao.merge(entity);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}",
-            entity.getAlertId(), entity.getLatestTimestamp(),
-            entity.getAlertHistory().getAlertId(),
-            entity.getAlertHistory().getAlertState());
-      }
-    }
-  }
-
   /**
    * Gets whether the specified alert is valid for its reported cluster,
    * service, component, and host. This method is necessary for the following
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
index dceafcb0f1..374e1275fa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
@@ -985,8 +985,18 @@ public class AlertsDAO implements Cleanable {
    *          the alert to merge (not {@code null}).
    * @return the updated alert with merged content (never {@code null}).
    */
-  @Transactional
   public AlertHistoryEntity merge(AlertHistoryEntity alert) {
+    if (m_configuration.isAlertCacheEnabled()) {
+      synchronized (this) {
+        return mergeTransactional(alert);
+      }
+    } else {
+      return mergeTransactional(alert);
+    }
+  }
+
+  @Transactional
+  protected AlertHistoryEntity mergeTransactional(AlertHistoryEntity alert) {
     return m_entityManagerProvider.get().merge(alert);
   }
 
@@ -1033,8 +1043,18 @@ public class AlertsDAO implements Cleanable {
    *          the current alert to merge (not {@code null}).
    * @return the updated current alert with merged content (never {@code null}).
    */
-  @Transactional
   public AlertCurrentEntity merge(AlertCurrentEntity alert) {
+    if (m_configuration.isAlertCacheEnabled()) {
+      synchronized (this) {
+        return mergeTransactional(alert);
+      }
+    } else {
+      return mergeTransactional(alert);
+    }
+  }
+
+  @Transactional
+  protected AlertCurrentEntity mergeTransactional(AlertCurrentEntity alert) {
     // perform the JPA merge
     alert = m_entityManagerProvider.get().merge(alert);
 
@@ -1174,12 +1194,18 @@ public class AlertsDAO implements Cleanable {
    * Writes all cached {@link AlertCurrentEntity} instances to the database and
    * clears the cache.
    */
-  @Transactional
   public void flushCachedEntitiesToJPA() {
-    if (!m_configuration.isAlertCacheEnabled()) {
+    if (m_configuration.isAlertCacheEnabled()) {
+      synchronized (this) {
+        flushCachedEntitiesToJPATransactional();
+      }
+    } else {
       LOG.warn("Unable to flush cached alerts to JPA because caching is not enabled");
-      return;
     }
+  }
+
+  @Transactional
+  protected void flushCachedEntitiesToJPATransactional() {
 
     // capture for logging purposes
     long cachedEntityCount = m_currentAlertCache.size();
@@ -1212,6 +1238,10 @@ public class AlertsDAO implements Cleanable {
       AlertCacheKey key = AlertCacheKey.build(alert);
       AlertCurrentEntity cachedEntity = m_currentAlertCache.getIfPresent(key);
       if (null != cachedEntity) {
+        if (cachedEntity.getAlertHistory() == null) {
+          LOG.warn("There is current entity with null history in the cache, currentId: {}, persisted historyId: {}",
+              cachedEntity.getAlertId(), alert.getHistoryId());
+        }
         alert = cachedEntity;
       }
 
@@ -1584,4 +1614,41 @@ public class AlertsDAO implements Cleanable {
     return affectedRows;
   }
 
+  /**
+   * Saves alert and alert history entities in single transaction
+   * @param toMerge - merge alert only
+   * @param toCreateHistoryAndMerge - create new history, merge alert
+   */
+  public void saveEntities(List<AlertCurrentEntity> toMerge,
+                                 List<AlertCurrentEntity> toCreateHistoryAndMerge) {
+    if (m_configuration.isAlertCacheEnabled()) {
+      synchronized (this) {
+        saveEntitiesTransactional(toMerge, toCreateHistoryAndMerge);
+      }
+    } else {
+      saveEntitiesTransactional(toMerge, toCreateHistoryAndMerge);
+    }
+  }
+
+  @Transactional
+  protected void saveEntitiesTransactional(List<AlertCurrentEntity> toMerge,
+                    List<AlertCurrentEntity> toCreateHistoryAndMerge) {
+    for (AlertCurrentEntity entity : toMerge) {
+      merge(entity, m_configuration.isAlertCacheEnabled());
+    }
+
+    for (AlertCurrentEntity entity : toCreateHistoryAndMerge) {
+      create(entity.getAlertHistory());
+      merge(entity);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}",
+            entity.getAlertId(), entity.getLatestTimestamp(),
+            entity.getAlertHistory().getAlertId(),
+            entity.getAlertHistory().getAlertState());
+      }
+    }
+  }
+
 }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
index c72ffbe032..ecde673255 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
@@ -25,10 +25,13 @@ import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeList
 import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
 import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
 import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.services.ServiceUpdateListener;
 import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
 import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
+import org.apache.ambari.server.events.listeners.upgrade.UpgradeUpdateListener;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
 
 import com.google.common.eventbus.AsyncEventBus;
 import com.google.common.eventbus.EventBus;
@@ -97,6 +100,24 @@ public class EventBusSynchronizer {
     return synchronizedBus;
   }
 
+  /**
+   * Force both {@link EventBus} from {@link STOMPUpdatePublisher} to be serial
+   * and synchronous. Also register the known listeners. Registering known
+   * listeners is necessary since the event bus was replaced.
+   *
+   * @param injector
+   */
+  public static void synchronizeSTOMPUpdatePublisher(Injector injector) {
+    EventBus agentEventBus = new EventBus();
+    EventBus apiEventBus = new EventBus();
+    STOMPUpdatePublisher publisher = injector.getInstance(STOMPUpdatePublisher.class);
+
+    replaceSTOMPEventBuses(STOMPUpdatePublisher.class, publisher, agentEventBus, apiEventBus);
+
+    // register common agent event listeners
+    registerSTOMPApiListeners(injector, apiEventBus);
+  }
+
   /**
    * Register the normal listeners with the replaced synchronous bus.
    *
@@ -125,6 +146,18 @@ public class EventBusSynchronizer {
     synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
   }
 
+  /**
+   * Register the normal listeners with the replaced synchronous bus.
+   *
+   * @param injector
+   * @param synchronizedBus
+   */
+  private static void registerSTOMPApiListeners(Injector injector,
+      EventBus synchronizedBus) {
+    synchronizedBus.register(injector.getInstance(ServiceUpdateListener.class));
+    synchronizedBus.register(injector.getInstance(UpgradeUpdateListener.class));
+  }
+
   private static void replaceEventBus(Class<?> eventPublisherClass,
       Object instance, EventBus eventBus) {
 
@@ -136,4 +169,20 @@ public class EventBusSynchronizer {
       throw new RuntimeException(exception);
     }
   }
+
+  private static void replaceSTOMPEventBuses(Class<?> eventPublisherClass,
+      Object instance, EventBus agentEventBus, EventBus apiEventBus) {
+
+    try {
+      Field agentEventBusField = eventPublisherClass.getDeclaredField("agentEventBus");
+      agentEventBusField.setAccessible(true);
+      agentEventBusField.set(instance, agentEventBus);
+
+      Field apiEventBusField = eventPublisherClass.getDeclaredField("apiEventBus");
+      apiEventBusField.setAccessible(true);
+      apiEventBusField.set(instance, apiEventBus);
+    } catch (Exception exception) {
+      throw new RuntimeException(exception);
+    }
+  }
 }
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
index 49f5679e4f..26c49d698c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
@@ -48,6 +48,9 @@ import org.apache.ambari.server.controller.spi.SortRequest;
 import org.apache.ambari.server.controller.spi.SortRequest.Order;
 import org.apache.ambari.server.controller.spi.SortRequestProperty;
 import org.apache.ambari.server.controller.utilities.PredicateBuilder;
+import org.apache.ambari.server.events.publishers.HostComponentUpdateEventPublisher;
+import org.apache.ambari.server.events.publishers.RequestUpdateEventPublisher;
+import org.apache.ambari.server.events.publishers.ServiceUpdateEventPublisher;
 import org.apache.ambari.server.orm.AlertDaoHelper;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -68,13 +71,17 @@ import org.apache.ambari.server.state.ServiceFactory;
 import org.apache.ambari.server.state.alert.Scope;
 import org.apache.ambari.server.state.alert.SourceType;
 import org.apache.ambari.server.utils.EventBusSynchronizer;
+import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.inject.Binder;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
+import com.google.inject.Module;
 import com.google.inject.persist.UnitOfWork;
+import com.google.inject.util.Modules;
 
 /**
  * Tests {@link AlertsDAO}.
@@ -102,7 +109,8 @@ public class AlertsDAOTest {
    */
   @Before
   public void setup() throws Exception {
-    m_injector = Guice.createInjector(new InMemoryDefaultTestModule());
+    m_injector = Guice.createInjector(Modules.override(
+        new InMemoryDefaultTestModule()).with(new MockModule()));
     m_injector.getInstance(GuiceJpaInitializer.class);
     m_injector.getInstance(UnitOfWork.class).begin();
 
@@ -117,6 +125,8 @@ public class AlertsDAOTest {
 
     // !!! need a synchronous op for testing
     EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
+    EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector);
+    EventBusSynchronizer.synchronizeSTOMPUpdatePublisher(m_injector);
 
     // install YARN so there is at least 1 service installed and no
     // unexpected alerts since the test YARN service doesn't have any alerts
@@ -1472,4 +1482,21 @@ public class AlertsDAOTest {
     currentAlerts = m_dao.findCurrent();
     assertEquals(4, currentAlerts.size());
   }
+
+  private class MockModule implements Module {
+
+    @Override
+    public void configure(Binder binder) {
+      HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher =
+          EasyMock.createNiceMock(HostComponentUpdateEventPublisher.class);
+      RequestUpdateEventPublisher requestUpdateEventPublisher =
+          EasyMock.createNiceMock(RequestUpdateEventPublisher.class);
+      ServiceUpdateEventPublisher serviceUpdateEventPublisher =
+          EasyMock.createNiceMock(ServiceUpdateEventPublisher.class);
+
+      binder.bind(HostComponentUpdateEventPublisher.class).toInstance(hostComponentUpdateEventPublisher);
+      binder.bind(RequestUpdateEventPublisher.class).toInstance(requestUpdateEventPublisher);
+      binder.bind(ServiceUpdateEventPublisher.class).toInstance(serviceUpdateEventPublisher);
+    }
+  }
 }
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
index 503c11fe38..2a63b4207c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
@@ -112,6 +112,8 @@ public class AggregateAlertListenerTest {
     EasyMock.expect(
         m_alertsDao.findAggregateCounts(EasyMock.anyLong(), EasyMock.eq("mock-aggregate-alert"))).andReturn(
         summaryDTO).atLeastOnce();
+    m_alertsDao.saveEntities(EasyMock.anyObject(), EasyMock.anyObject());
+    EasyMock.expectLastCall().anyTimes();
 
     EasyMock.replay(m_alertsDao, m_aggregateMapping, currentEntityMock);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ambari.apache.org
For additional commands, e-mail: commits-help@ambari.apache.org