You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by wu...@apache.org on 2022/11/24 08:19:29 UTC
[ambari] branch trunk updated: AMBARI-25576: Primary key duplication error during flushing alerts from alerts cache (#3572)
This is an automated email from the ASF dual-hosted git repository.
wuzhiguo pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new f7d5cb242e AMBARI-25576: Primary key duplication error during flushing alerts from alerts cache (#3572)
f7d5cb242e is described below
commit f7d5cb242ef0b66f16f04fc83143b85056e099db
Author: Yu Hou <52...@qq.com>
AuthorDate: Thu Nov 24 16:19:24 2022 +0800
AMBARI-25576: Primary key duplication error during flushing alerts from alerts cache (#3572)
---
.../listeners/alerts/AlertReceivedListener.java | 29 +-------
.../apache/ambari/server/orm/dao/AlertsDAO.java | 77 ++++++++++++++++++++--
.../ambari/server/utils/EventBusSynchronizer.java | 49 ++++++++++++++
.../ambari/server/orm/dao/AlertsDAOTest.java | 29 +++++++-
.../state/alerts/AggregateAlertListenerTest.java | 2 +
5 files changed, 152 insertions(+), 34 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
index db80e8cb16..57972ec6ab 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/alerts/AlertReceivedListener.java
@@ -66,7 +66,6 @@ import com.google.common.util.concurrent.Striped;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
-import com.google.inject.persist.Transactional;
/**
* The {@link AlertReceivedListener} class handles {@link AlertReceivedEvent}
@@ -385,7 +384,7 @@ public class AlertReceivedListener {
// invokes the EntityManager create/merge on various entities in a single
// transaction
- saveEntities(toMerge, toCreateHistoryAndMerge);
+ m_alertsDao.saveEntities(toMerge, toCreateHistoryAndMerge);
// broadcast events
for (AlertEvent eventToFire : alertEvents) {
@@ -445,32 +444,6 @@ public class AlertReceivedListener {
}
}
- /**
- * Saves alert and alert history entities in single transaction
- * @param toMerge - merge alert only
- * @param toCreateHistoryAndMerge - create new history, merge alert
- */
- @Transactional
- void saveEntities(List<AlertCurrentEntity> toMerge,
- List<AlertCurrentEntity> toCreateHistoryAndMerge) {
- for (AlertCurrentEntity entity : toMerge) {
- m_alertsDao.merge(entity, m_configuration.isAlertCacheEnabled());
- }
-
- for (AlertCurrentEntity entity : toCreateHistoryAndMerge) {
- m_alertsDao.create(entity.getAlertHistory());
- m_alertsDao.merge(entity);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}",
- entity.getAlertId(), entity.getLatestTimestamp(),
- entity.getAlertHistory().getAlertId(),
- entity.getAlertHistory().getAlertState());
- }
- }
- }
-
/**
* Gets whether the specified alert is valid for its reported cluster,
* service, component, and host. This method is necessary for the following
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
index dceafcb0f1..374e1275fa 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/orm/dao/AlertsDAO.java
@@ -985,8 +985,18 @@ public class AlertsDAO implements Cleanable {
* the alert to merge (not {@code null}).
* @return the updated alert with merged content (never {@code null}).
*/
- @Transactional
public AlertHistoryEntity merge(AlertHistoryEntity alert) {
+ if (m_configuration.isAlertCacheEnabled()) {
+ synchronized (this) {
+ return mergeTransactional(alert);
+ }
+ } else {
+ return mergeTransactional(alert);
+ }
+ }
+
+ @Transactional
+ protected AlertHistoryEntity mergeTransactional(AlertHistoryEntity alert) {
return m_entityManagerProvider.get().merge(alert);
}
@@ -1033,8 +1043,18 @@ public class AlertsDAO implements Cleanable {
* the current alert to merge (not {@code null}).
* @return the updated current alert with merged content (never {@code null}).
*/
- @Transactional
public AlertCurrentEntity merge(AlertCurrentEntity alert) {
+ if (m_configuration.isAlertCacheEnabled()) {
+ synchronized (this) {
+ return mergeTransactional(alert);
+ }
+ } else {
+ return mergeTransactional(alert);
+ }
+ }
+
+ @Transactional
+ protected AlertCurrentEntity mergeTransactional(AlertCurrentEntity alert) {
// perform the JPA merge
alert = m_entityManagerProvider.get().merge(alert);
@@ -1174,12 +1194,18 @@ public class AlertsDAO implements Cleanable {
* Writes all cached {@link AlertCurrentEntity} instances to the database and
* clears the cache.
*/
- @Transactional
public void flushCachedEntitiesToJPA() {
- if (!m_configuration.isAlertCacheEnabled()) {
+ if (m_configuration.isAlertCacheEnabled()) {
+ synchronized (this) {
+ flushCachedEntitiesToJPATransactional();
+ }
+ } else {
LOG.warn("Unable to flush cached alerts to JPA because caching is not enabled");
- return;
}
+ }
+
+ @Transactional
+ protected void flushCachedEntitiesToJPATransactional() {
// capture for logging purposes
long cachedEntityCount = m_currentAlertCache.size();
@@ -1212,6 +1238,10 @@ public class AlertsDAO implements Cleanable {
AlertCacheKey key = AlertCacheKey.build(alert);
AlertCurrentEntity cachedEntity = m_currentAlertCache.getIfPresent(key);
if (null != cachedEntity) {
+ if (cachedEntity.getAlertHistory() == null) {
+ LOG.warn("There is current entity with null history in the cache, currentId: {}, persisted historyId: {}",
+ cachedEntity.getAlertId(), alert.getHistoryId());
+ }
alert = cachedEntity;
}
@@ -1584,4 +1614,41 @@ public class AlertsDAO implements Cleanable {
return affectedRows;
}
+ /**
+ * Saves alert and alert history entities in single transaction
+ * @param toMerge - merge alert only
+ * @param toCreateHistoryAndMerge - create new history, merge alert
+ */
+ public void saveEntities(List<AlertCurrentEntity> toMerge,
+ List<AlertCurrentEntity> toCreateHistoryAndMerge) {
+ if (m_configuration.isAlertCacheEnabled()) {
+ synchronized (this) {
+ saveEntitiesTransactional(toMerge, toCreateHistoryAndMerge);
+ }
+ } else {
+ saveEntitiesTransactional(toMerge, toCreateHistoryAndMerge);
+ }
+ }
+
+ @Transactional
+ protected void saveEntitiesTransactional(List<AlertCurrentEntity> toMerge,
+ List<AlertCurrentEntity> toCreateHistoryAndMerge) {
+ for (AlertCurrentEntity entity : toMerge) {
+ merge(entity, m_configuration.isAlertCacheEnabled());
+ }
+
+ for (AlertCurrentEntity entity : toCreateHistoryAndMerge) {
+ create(entity.getAlertHistory());
+ merge(entity);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Alert State Merged: CurrentId {}, CurrentTimestamp {}, HistoryId {}, HistoryState {}",
+ entity.getAlertId(), entity.getLatestTimestamp(),
+ entity.getAlertHistory().getAlertId(),
+ entity.getAlertHistory().getAlertState());
+ }
+ }
+ }
+
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
index c72ffbe032..ecde673255 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
@@ -25,10 +25,13 @@ import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeList
import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.services.ServiceUpdateListener;
import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
+import org.apache.ambari.server.events.listeners.upgrade.UpgradeUpdateListener;
import org.apache.ambari.server.events.publishers.AlertEventPublisher;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.STOMPUpdatePublisher;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
@@ -97,6 +100,24 @@ public class EventBusSynchronizer {
return synchronizedBus;
}
+ /**
+ * Force both {@link EventBus} from {@link STOMPUpdatePublisher} to be serial
+ * and synchronous. Also register the known listeners. Registering known
+ * listeners is necessary since the event bus was replaced.
+ *
+ * @param injector
+ */
+ public static void synchronizeSTOMPUpdatePublisher(Injector injector) {
+ EventBus agentEventBus = new EventBus();
+ EventBus apiEventBus = new EventBus();
+ STOMPUpdatePublisher publisher = injector.getInstance(STOMPUpdatePublisher.class);
+
+ replaceSTOMPEventBuses(STOMPUpdatePublisher.class, publisher, agentEventBus, apiEventBus);
+
+ // register common agent event listeners
+ registerSTOMPApiListeners(injector, apiEventBus);
+ }
+
/**
* Register the normal listeners with the replaced synchronous bus.
*
@@ -125,6 +146,18 @@ public class EventBusSynchronizer {
synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
}
+ /**
+ * Register the normal listeners with the replaced synchronous bus.
+ *
+ * @param injector
+ * @param synchronizedBus
+ */
+ private static void registerSTOMPApiListeners(Injector injector,
+ EventBus synchronizedBus) {
+ synchronizedBus.register(injector.getInstance(ServiceUpdateListener.class));
+ synchronizedBus.register(injector.getInstance(UpgradeUpdateListener.class));
+ }
+
private static void replaceEventBus(Class<?> eventPublisherClass,
Object instance, EventBus eventBus) {
@@ -136,4 +169,20 @@ public class EventBusSynchronizer {
throw new RuntimeException(exception);
}
}
+
+ private static void replaceSTOMPEventBuses(Class<?> eventPublisherClass,
+ Object instance, EventBus agentEventBus, EventBus apiEventBus) {
+
+ try {
+ Field agentEventBusField = eventPublisherClass.getDeclaredField("agentEventBus");
+ agentEventBusField.setAccessible(true);
+ agentEventBusField.set(instance, agentEventBus);
+
+ Field apiEventBusField = eventPublisherClass.getDeclaredField("apiEventBus");
+ apiEventBusField.setAccessible(true);
+ apiEventBusField.set(instance, apiEventBus);
+ } catch (Exception exception) {
+ throw new RuntimeException(exception);
+ }
+ }
}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
index 49f5679e4f..26c49d698c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
@@ -48,6 +48,9 @@ import org.apache.ambari.server.controller.spi.SortRequest;
import org.apache.ambari.server.controller.spi.SortRequest.Order;
import org.apache.ambari.server.controller.spi.SortRequestProperty;
import org.apache.ambari.server.controller.utilities.PredicateBuilder;
+import org.apache.ambari.server.events.publishers.HostComponentUpdateEventPublisher;
+import org.apache.ambari.server.events.publishers.RequestUpdateEventPublisher;
+import org.apache.ambari.server.events.publishers.ServiceUpdateEventPublisher;
import org.apache.ambari.server.orm.AlertDaoHelper;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -68,13 +71,17 @@ import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.alert.Scope;
import org.apache.ambari.server.state.alert.SourceType;
import org.apache.ambari.server.utils.EventBusSynchronizer;
+import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
+import com.google.inject.Module;
import com.google.inject.persist.UnitOfWork;
+import com.google.inject.util.Modules;
/**
* Tests {@link AlertsDAO}.
@@ -102,7 +109,8 @@ public class AlertsDAOTest {
*/
@Before
public void setup() throws Exception {
- m_injector = Guice.createInjector(new InMemoryDefaultTestModule());
+ m_injector = Guice.createInjector(Modules.override(
+ new InMemoryDefaultTestModule()).with(new MockModule()));
m_injector.getInstance(GuiceJpaInitializer.class);
m_injector.getInstance(UnitOfWork.class).begin();
@@ -117,6 +125,8 @@ public class AlertsDAOTest {
// !!! need a synchronous op for testing
EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
+ EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector);
+ EventBusSynchronizer.synchronizeSTOMPUpdatePublisher(m_injector);
// install YARN so there is at least 1 service installed and no
// unexpected alerts since the test YARN service doesn't have any alerts
@@ -1472,4 +1482,21 @@ public class AlertsDAOTest {
currentAlerts = m_dao.findCurrent();
assertEquals(4, currentAlerts.size());
}
+
+ private class MockModule implements Module {
+
+ @Override
+ public void configure(Binder binder) {
+ HostComponentUpdateEventPublisher hostComponentUpdateEventPublisher =
+ EasyMock.createNiceMock(HostComponentUpdateEventPublisher.class);
+ RequestUpdateEventPublisher requestUpdateEventPublisher =
+ EasyMock.createNiceMock(RequestUpdateEventPublisher.class);
+ ServiceUpdateEventPublisher serviceUpdateEventPublisher =
+ EasyMock.createNiceMock(ServiceUpdateEventPublisher.class);
+
+ binder.bind(HostComponentUpdateEventPublisher.class).toInstance(hostComponentUpdateEventPublisher);
+ binder.bind(RequestUpdateEventPublisher.class).toInstance(requestUpdateEventPublisher);
+ binder.bind(ServiceUpdateEventPublisher.class).toInstance(serviceUpdateEventPublisher);
+ }
+ }
}
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
index 503c11fe38..2a63b4207c 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AggregateAlertListenerTest.java
@@ -112,6 +112,8 @@ public class AggregateAlertListenerTest {
EasyMock.expect(
m_alertsDao.findAggregateCounts(EasyMock.anyLong(), EasyMock.eq("mock-aggregate-alert"))).andReturn(
summaryDTO).atLeastOnce();
+ m_alertsDao.saveEntities(EasyMock.anyObject(), EasyMock.anyObject());
+ EasyMock.expectLastCall().anyTimes();
EasyMock.replay(m_alertsDao, m_aggregateMapping, currentEntityMock);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ambari.apache.org
For additional commands, e-mail: commits-help@ambari.apache.org