You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by jo...@apache.org on 2015/04/14 20:00:10 UTC
ambari git commit: AMBARI-10456 - Ambari Server Deadlock When Mapping
Hosts (jonathanhurley)
Repository: ambari
Updated Branches:
refs/heads/trunk ceb196b83 -> f11e2f063
AMBARI-10456 - Ambari Server Deadlock When Mapping Hosts (jonathanhurley)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f11e2f06
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f11e2f06
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f11e2f06
Branch: refs/heads/trunk
Commit: f11e2f0630f922cf7102fe4291d5c80230b4cb8c
Parents: ceb196b
Author: Jonathan Hurley <jh...@hortonworks.com>
Authored: Tue Apr 14 10:59:57 2015 -0400
Committer: Jonathan Hurley <jh...@hortonworks.com>
Committed: Tue Apr 14 13:58:27 2015 -0400
----------------------------------------------------------------------
.../events/publishers/AmbariEventPublisher.java | 10 +-
.../server/state/cluster/ClustersImpl.java | 101 ++++----------
.../services/AmbariServerAlertService.java | 28 ++--
.../server/agent/TestHeartbeatHandler.java | 4 +
.../apache/ambari/server/events/EventsTest.java | 13 +-
.../server/orm/dao/AlertDispatchDAOTest.java | 19 +--
.../ambari/server/orm/dao/AlertsDAOTest.java | 15 +-
.../state/alerts/AlertEventPublisherTest.java | 25 +---
.../alerts/AlertStateChangedEventTest.java | 20 +--
.../state/alerts/InitialAlertEventTest.java | 26 +---
.../state/cluster/AlertDataManagerTest.java | 9 +-
.../state/cluster/ClusterDeadlockTest.java | 4 -
.../state/cluster/ClustersDeadlockTest.java | 136 +++++++++++++++++-
.../server/utils/EventBusSynchronizer.java | 137 +++++++++++++++++++
14 files changed, 350 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
index 96e66a62..05194e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/AmbariEventPublisher.java
@@ -17,16 +17,19 @@
*/
package org.apache.ambari.server.events.publishers;
+import java.util.concurrent.Executors;
+
import org.apache.ambari.server.events.AmbariEvent;
+import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.inject.Singleton;
/**
* The {@link AmbariEventPublisher} is used to publish instances of
- * {@link AmbariEvent} to any {@link Subscribe} interested. It uses a
- * single-threaded, serial {@link EventBus}.
+ * {@link AmbariEvent} to any {@link Subscribe} methods interested. It uses a
+ * single-threaded {@link AsyncEventBus}.
*/
@Singleton
public class AmbariEventPublisher {
@@ -40,7 +43,8 @@ public class AmbariEventPublisher {
* Constructor.
*/
public AmbariEventPublisher() {
- m_eventBus = new EventBus("ambari-event-bus");
+ m_eventBus = new AsyncEventBus("ambari-event-bus",
+ Executors.newSingleThreadExecutor());
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
index 9cf1f5a..c7a8ddb 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/cluster/ClustersImpl.java
@@ -28,7 +28,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.persistence.RollbackException;
@@ -230,29 +229,25 @@ public class ClustersImpl implements Clusters {
public Cluster getCluster(String clusterName)
throws AmbariException {
checkLoaded();
- r.lock();
- try {
- if (!clusters.containsKey(clusterName)) {
- throw new ClusterNotFoundException(clusterName);
- }
- return clusters.get(clusterName);
- } finally {
- r.unlock();
+
+ Cluster cluster = clusters.get(clusterName);
+ if (null == cluster) {
+ throw new ClusterNotFoundException(clusterName);
}
+
+ return cluster;
}
@Override
public Cluster getClusterById(long id) throws AmbariException {
checkLoaded();
- r.lock();
- try {
- if (!clustersById.containsKey(id)) {
- throw new ClusterNotFoundException("clusterID=" + id);
- }
- return clustersById.get(id);
- } finally {
- r.unlock();
+
+ Cluster cluster = clustersById.get(id);
+ if (null == cluster) {
+ throw new ClusterNotFoundException("clusterID=" + id);
}
+
+ return clustersById.get(id);
}
@Override
@@ -281,15 +276,8 @@ public class ClustersImpl implements Clusters {
@Override
public List<Host> getHosts() {
checkLoaded();
- r.lock();
- try {
- List<Host> hostList = new ArrayList<Host>(hosts.size());
- hostList.addAll(hosts.values());
- return hostList;
- } finally {
- r.unlock();
- }
+ return new ArrayList<Host>(hosts.values());
}
@Override
@@ -315,15 +303,12 @@ public class ClustersImpl implements Clusters {
@Override
public Host getHost(String hostname) throws AmbariException {
checkLoaded();
- r.lock();
- try {
- if (!hosts.containsKey(hostname)) {
- throw new HostNotFoundException(hostname);
- }
- return hosts.get(hostname);
- } finally {
- r.unlock();
+
+ if (!hosts.containsKey(hostname)) {
+ throw new HostNotFoundException(hostname);
}
+
+ return hosts.get(hostname);
}
/**
@@ -421,41 +406,19 @@ public class ClustersImpl implements Clusters {
private Map<String, Host> getHostsMap(Collection<String> hostSet) throws
HostNotFoundException {
checkLoaded();
+
Map<String, Host> hostMap = new HashMap<String, Host>();
- r.lock();
- try {
- for (String host : hostSet) {
- if (!hosts.containsKey(host)) {
- throw new HostNotFoundException(host);
- } else {
- hostMap.put(host, hosts.get(host));
- }
- }
- } finally {
- r.unlock();
- }
- return hostMap;
- }
- private Map<String, Cluster> getClustersMap(Collection<String> clusterSet) throws
- ClusterNotFoundException {
- checkLoaded();
- Map<String, Cluster> clusterMap = new HashMap<String, Cluster>();
- r.lock();
- try {
- for (String c : clusterSet) {
- if (c != null) {
- if (!clusters.containsKey(c)) {
- throw new ClusterNotFoundException(c);
- } else {
- clusterMap.put(c, clusters.get(c));
- }
- }
+ for (String hostName : hostSet) {
+ Host host = hosts.get(hostName);
+ if (null == hostName) {
+ throw new HostNotFoundException(hostName);
}
- } finally {
- r.unlock();
+
+ hostMap.put(hostName, host);
}
- return clusterMap;
+
+ return hostMap;
}
/**
@@ -533,14 +496,8 @@ public class ClustersImpl implements Clusters {
w.unlock();
}
- ReadWriteLock clusterLock = cluster.getClusterGlobalLock();
- clusterLock.writeLock().lock();
- try {
- host.refresh();
- cluster.refresh();
- } finally {
- clusterLock.writeLock().unlock();
- }
+ cluster.refresh();
+ host.refresh();
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java
index 89f9656..6669e7b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/state/services/AmbariServerAlertService.java
@@ -146,7 +146,7 @@ public class AmbariServerAlertService extends AbstractScheduledService {
protected void runOneIteration() throws Exception {
Map<String, Cluster> clusterMap = m_clustersProvider.get().getClusters();
for (Cluster cluster : clusterMap.values()) {
- // get all of the cluster alerts for the server
+ // get all of the cluster alerts for AMBARI/AMBARI_SERVER
List<AlertDefinitionEntity> entities = m_dao.findByServiceComponent(
cluster.getClusterId(), Services.AMBARI.name(),
Components.AMBARI_SERVER.name());
@@ -155,17 +155,25 @@ public class AmbariServerAlertService extends AbstractScheduledService {
for (AlertDefinitionEntity entity : entities) {
String definitionName = entity.getDefinitionName();
ScheduledAlert scheduledAlert = m_futureMap.get(definitionName);
- ScheduledFuture<?> scheduledFuture = scheduledAlert.getScheduledFuture();
+
+ // disabled or new alerts may not have anything mapped yet
+ ScheduledFuture<?> scheduledFuture = null;
+ if (null != scheduledAlert) {
+ scheduledFuture = scheduledAlert.getScheduledFuture();
+ }
// if the definition is not enabled, ensure it's not scheduled and
// then continue to the next one
if (!entity.getEnabled()) {
- unschedule(definitionName, scheduledFuture);
+ if (null != scheduledFuture) {
+ unschedule(definitionName, scheduledFuture);
+ }
+
continue;
}
- // if there is no future, then schedule it
- if (null == scheduledFuture) {
+ // if the definition hasn't been scheduled, then schedule it
+ if (null == scheduledAlert || null == scheduledFuture) {
scheduleRunnable(entity);
continue;
}
@@ -190,12 +198,14 @@ public class AmbariServerAlertService extends AbstractScheduledService {
*
* @param scheduledFuture
*/
- private void unschedule(String definitionName,
- ScheduledFuture<?> scheduledFuture) {
- scheduledFuture.cancel(true);
+ private void unschedule(String definitionName, ScheduledFuture<?> scheduledFuture) {
+
m_futureMap.remove(definitionName);
- LOG.info("Unscheduled server alert {}", definitionName);
+ if (null != scheduledFuture) {
+ scheduledFuture.cancel(true);
+ LOG.info("Unscheduled server alert {}", definitionName);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
index fe4ba60..9cf7a99 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/agent/TestHeartbeatHandler.java
@@ -105,6 +105,7 @@ import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostInstallEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
@@ -2423,6 +2424,9 @@ public class TestHeartbeatHandler {
@Test
public void testInstallPackagesWithVersion() throws Exception {
+ // required since this test method checks the DAO result of handling a
+ // heartbeat which performs some async tasks
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
final HostRoleCommand command = new HostRoleCommand(DummyHostname1,
Role.DATANODE, null, null);
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
index 09c335a..6073677 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/EventsTest.java
@@ -17,14 +17,12 @@
*/
package org.apache.ambari.server.events;
-import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.OrmTestHelper;
@@ -42,6 +40,7 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory;
import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -66,7 +65,6 @@ public class EventsTest {
private ServiceFactory m_serviceFactory;
private ServiceComponentFactory m_componentFactory;
private ServiceComponentHostFactory m_schFactory;
- private AmbariEventPublisher m_eventPublisher;
private MockEventListener m_listener;
private OrmTestHelper m_helper;
@@ -78,20 +76,13 @@ public class EventsTest {
m_injector = Guice.createInjector(new InMemoryDefaultTestModule());
m_injector.getInstance(GuiceJpaInitializer.class);
- m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class);
- EventBus synchronizedBus = new EventBus();
-
m_helper = m_injector.getInstance(OrmTestHelper.class);
// register mock listener
+ EventBus synchronizedBus = EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
m_listener = m_injector.getInstance(MockEventListener.class);
synchronizedBus.register(m_listener);
- // !!! need a synchronous op for testing
- Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
- field.setAccessible(true);
- field.set(m_eventPublisher, synchronizedBus);
-
m_clusters = m_injector.getInstance(Clusters.class);
m_serviceFactory = m_injector.getInstance(ServiceFactory.class);
m_componentFactory = m_injector.getInstance(ServiceComponentFactory.class);
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
index 8768ffc..92866d7 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertDispatchDAOTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -44,8 +43,6 @@ import org.apache.ambari.server.controller.spi.SortRequest;
import org.apache.ambari.server.controller.spi.SortRequest.Order;
import org.apache.ambari.server.controller.spi.SortRequestProperty;
import org.apache.ambari.server.controller.utilities.PredicateBuilder;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.AlertDaoHelper;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -64,11 +61,11 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory;
import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.alert.Scope;
import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.eventbus.EventBus;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.persist.PersistService;
@@ -92,8 +89,6 @@ public class AlertDispatchDAOTest {
private ServiceComponentFactory m_componentFactory;
private ServiceComponentHostFactory m_schFactory;
private AlertDaoHelper m_alertHelper;
- private AmbariEventPublisher m_eventPublisher;
- private EventBus m_synchronizedBus;
/**
*
@@ -111,13 +106,9 @@ public class AlertDispatchDAOTest {
m_schFactory = m_injector.getInstance(ServiceComponentHostFactory.class);
m_clusters = m_injector.getInstance(Clusters.class);
m_alertHelper = m_injector.getInstance(AlertDaoHelper.class);
- m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class);
// !!! need a synchronous op for testing
- m_synchronizedBus = new EventBus();
- Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
- field.setAccessible(true);
- field.set(m_eventPublisher, m_synchronizedBus);
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
m_cluster = m_clusters.getClusterById(m_helper.createCluster());
m_helper.initializeClusterWithStack(m_cluster);
@@ -844,8 +835,6 @@ public class AlertDispatchDAOTest {
*/
@Test
public void testFindDefaultGroup() throws Exception {
- m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class));
-
List<AlertGroupEntity> groups = m_dao.findAllGroups();
assertNotNull(groups);
assertEquals(10, groups.size());
@@ -870,8 +859,6 @@ public class AlertDispatchDAOTest {
*/
@Test
public void testDefaultGroupAutomaticCreation() throws Exception {
- m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class));
-
List<AlertGroupEntity> groups = m_dao.findAllGroups();
assertNotNull(groups);
assertEquals(10, groups.size());
@@ -916,8 +903,6 @@ public class AlertDispatchDAOTest {
*/
@Test(expected = AmbariException.class)
public void testDefaultGroupInvalidServiceNoCreation() throws Exception {
- m_synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class));
-
List<AlertGroupEntity> groups = m_dao.findAllGroups();
assertNotNull(groups);
assertEquals(10, groups.size());
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
index 9c8ea7d..e6a95ae 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/orm/dao/AlertsDAOTest.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
@@ -45,8 +44,6 @@ import org.apache.ambari.server.controller.spi.SortRequest;
import org.apache.ambari.server.controller.spi.SortRequest.Order;
import org.apache.ambari.server.controller.spi.SortRequestProperty;
import org.apache.ambari.server.controller.utilities.PredicateBuilder;
-import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.AlertDaoHelper;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -66,11 +63,11 @@ import org.apache.ambari.server.state.ServiceComponentHostFactory;
import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.alert.Scope;
import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.eventbus.EventBus;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.persist.PersistService;
@@ -93,8 +90,6 @@ public class AlertsDAOTest {
private ServiceFactory m_serviceFactory;
private ServiceComponentFactory m_componentFactory;
private ServiceComponentHostFactory m_schFactory;
- private AmbariEventPublisher m_eventPublisher;
- private EventBus m_synchronizedBus;
private AlertDaoHelper m_alertHelper;
@@ -111,15 +106,11 @@ public class AlertsDAOTest {
m_serviceFactory = m_injector.getInstance(ServiceFactory.class);
m_componentFactory = m_injector.getInstance(ServiceComponentFactory.class);
m_schFactory = m_injector.getInstance(ServiceComponentHostFactory.class);
- m_eventPublisher = m_injector.getInstance(AmbariEventPublisher.class);
m_clusters = m_injector.getInstance(Clusters.class);
m_alertHelper = m_injector.getInstance(AlertDaoHelper.class);
// !!! need a synchronous op for testing
- m_synchronizedBus = new EventBus();
- Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
- field.setAccessible(true);
- field.set(m_eventPublisher, m_synchronizedBus);
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(m_injector);
// install YARN so there is at least 1 service installed and no
// unexpected alerts since the test YARN service doesn't have any alerts
@@ -908,8 +899,6 @@ public class AlertsDAOTest {
*/
@Test
public void testMaintenanceMode() throws Exception {
- m_synchronizedBus.register(m_injector.getInstance(AlertMaintenanceModeListener.class));
-
m_helper.installHdfsService(m_cluster, m_serviceFactory,
m_componentFactory, m_schFactory, HOSTNAME);
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
index 19b5d46..1c4567f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertEventPublisherTest.java
@@ -17,19 +17,13 @@
*/
package org.apache.ambari.server.state.alerts;
-import java.lang.reflect.Field;
import java.util.UUID;
import junit.framework.Assert;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.events.AlertDefinitionChangedEvent;
import org.apache.ambari.server.events.AlertDefinitionDeleteEvent;
import org.apache.ambari.server.events.AmbariEvent;
-import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.OrmTestHelper;
@@ -53,11 +47,11 @@ import org.apache.ambari.server.state.alert.Reporting;
import org.apache.ambari.server.state.alert.Reporting.ReportTemplate;
import org.apache.ambari.server.state.alert.Scope;
import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.eventbus.EventBus;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -77,10 +71,8 @@ public class AlertEventPublisherTest {
private String clusterName;
private Injector injector;
private ServiceFactory serviceFactory;
- private AmbariMetaInfo metaInfo;
private OrmTestHelper ormHelper;
private AggregateDefinitionMapping aggregateMapping;
- private AmbariEventPublisher eventPublisher;
/**
*
@@ -90,18 +82,7 @@ public class AlertEventPublisherTest {
injector = Guice.createInjector(new InMemoryDefaultTestModule());
injector.getInstance(GuiceJpaInitializer.class);
- eventPublisher = injector.getInstance(AmbariEventPublisher.class);
- EventBus synchronizedBus = new EventBus();
-
- // force singleton init via Guice so the listener registers with the bus
- synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
- synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
- synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
-
- // !!! need a synchronous op for testing
- Field field = AmbariEventPublisher.class.getDeclaredField("m_eventBus");
- field.setAccessible(true);
- field.set(eventPublisher, synchronizedBus);
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
dispatchDao = injector.getInstance(AlertDispatchDAO.class);
definitionDao = injector.getInstance(AlertDefinitionDAO.class);
@@ -111,8 +92,6 @@ public class AlertEventPublisherTest {
ormHelper = injector.getInstance(OrmTestHelper.class);
aggregateMapping = injector.getInstance(AggregateDefinitionMapping.class);
- metaInfo = injector.getInstance(AmbariMetaInfo.class);
-
clusterName = "foo";
clusters.addCluster(clusterName);
cluster = clusters.getCluster(clusterName);
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
index b64afed..7144625 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/AlertStateChangedEventTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.ambari.server.state.alerts;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashSet;
@@ -25,8 +24,6 @@ import java.util.List;
import java.util.Set;
import org.apache.ambari.server.events.AlertStateChangeEvent;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
import org.apache.ambari.server.events.publishers.AlertEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -38,12 +35,12 @@ import org.apache.ambari.server.orm.entities.AlertNoticeEntity;
import org.apache.ambari.server.orm.entities.AlertTargetEntity;
import org.apache.ambari.server.state.Alert;
import org.apache.ambari.server.state.AlertState;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.eventbus.EventBus;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
@@ -71,20 +68,13 @@ public class AlertStateChangedEventTest {
injector.getInstance(GuiceJpaInitializer.class);
- // force singleton init via Guice so the listener registers with the bus
- injector.getInstance(AlertServiceStateListener.class);
- injector.getInstance(AlertStateChangedListener.class);
-
dispatchDao = injector.getInstance(AlertDispatchDAO.class);
- eventPublisher = injector.getInstance(AlertEventPublisher.class);
-
- EventBus synchronizedBus = new EventBus();
- synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
// !!! need a synchronous op for testing
- Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
- field.setAccessible(true);
- field.set(eventPublisher, synchronizedBus);
+ EventBusSynchronizer.synchronizeAlertEventPublisher(injector);
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(injector);
+
+ eventPublisher = injector.getInstance(AlertEventPublisher.class);
}
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java
index 4e55c49..73bf6c4 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/alerts/InitialAlertEventTest.java
@@ -17,17 +17,11 @@
*/
package org.apache.ambari.server.state.alerts;
-import java.lang.reflect.Field;
-
import junit.framework.Assert;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.events.AlertReceivedEvent;
import org.apache.ambari.server.events.InitialAlertEvent;
import org.apache.ambari.server.events.MockEventListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
import org.apache.ambari.server.events.publishers.AlertEventPublisher;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
@@ -41,6 +35,7 @@ import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Service;
import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -68,7 +63,6 @@ public class InitialAlertEventTest {
private Cluster m_cluster;
private String m_clusterName;
private ServiceFactory m_serviceFactory;
- private AmbariMetaInfo m_metaInfo;
/**
*
@@ -83,28 +77,18 @@ public class InitialAlertEventTest {
// get a mock listener
m_listener = m_injector.getInstance(MockEventListener.class);
- m_alertsDao = m_injector.getInstance(AlertsDAO.class);
-
// create the publisher and mock listener
m_eventPublisher = m_injector.getInstance(AlertEventPublisher.class);
- EventBus synchronizedBus = new EventBus();
// register listeners needed
+ EventBus synchronizedBus = EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector);
synchronizedBus.register(m_listener);
- synchronizedBus.register(m_injector.getInstance(AlertLifecycleListener.class));
- synchronizedBus.register(m_injector.getInstance(AlertServiceStateListener.class));
- synchronizedBus.register(m_injector.getInstance(AlertReceivedListener.class));
-
- // !!! need a synchronous op for testing
- Field field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
- field.setAccessible(true);
- field.set(m_eventPublisher, synchronizedBus);
m_definitionDao = m_injector.getInstance(AlertDefinitionDAO.class);
m_clusters = m_injector.getInstance(Clusters.class);
m_serviceFactory = m_injector.getInstance(ServiceFactory.class);
- m_metaInfo = m_injector.getInstance(AmbariMetaInfo.class);
+ m_alertsDao = m_injector.getInstance(AlertsDAO.class);
m_clusterName = "c1";
m_clusters.addCluster(m_clusterName);
@@ -190,12 +174,14 @@ public class InitialAlertEventTest {
/**
*
*/
- private class MockModule implements Module {
+ private static class MockModule implements Module {
/**
* {@inheritDoc}
*/
@Override
public void configure(Binder binder) {
+ // sychronize on the ambari event bus for this test to work properly
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(binder);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
index acf7911..c289bcc 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/AlertDataManagerTest.java
@@ -65,11 +65,11 @@ import org.apache.ambari.server.state.alert.Reporting.ReportTemplate;
import org.apache.ambari.server.state.alert.Scope;
import org.apache.ambari.server.state.alert.Source;
import org.apache.ambari.server.state.alert.SourceType;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.gson.Gson;
import com.google.inject.Guice;
@@ -396,12 +396,9 @@ public class AlertDataManagerTest {
m_dao.merge(current);
}
- AlertEventPublisher publisher = m_injector.getInstance(AlertEventPublisher.class);
-
// !!! need a synchronous op for testing
- field = AlertEventPublisher.class.getDeclaredField("m_eventBus");
- field.setAccessible(true);
- field.set(publisher, new EventBus());
+ AlertEventPublisher publisher = m_injector.getInstance(AlertEventPublisher.class);
+ EventBusSynchronizer.synchronizeAlertEventPublisher(m_injector);
final AtomicReference<Alert> ref = new AtomicReference<Alert>();
publisher.register(new TestListener() {
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
index 766105d..ff039a9 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClusterDeadlockTest.java
@@ -27,7 +27,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ServiceComponentNotFoundException;
import org.apache.ambari.server.ServiceNotFoundException;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.OrmTestHelper;
@@ -80,9 +79,6 @@ public class ClusterDeadlockTest {
private ServiceComponentHostFactory serviceComponentHostFactory;
@Inject
- private AmbariMetaInfo metaInfo;
-
- @Inject
private OrmTestHelper helper;
private StackId stackId = new StackId("HDP-0.1");
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
index 839b25f..d771eba 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/state/cluster/ClustersDeadlockTest.java
@@ -26,7 +26,9 @@ import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
-import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.ServiceComponentNotFoundException;
+import org.apache.ambari.server.ServiceNotFoundException;
import org.apache.ambari.server.orm.GuiceJpaInitializer;
import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
import org.apache.ambari.server.orm.OrmTestHelper;
@@ -34,7 +36,14 @@ import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.RepositoryVersionState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
+import org.apache.ambari.server.state.ServiceComponentFactory;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.apache.ambari.server.state.ServiceComponentHostFactory;
+import org.apache.ambari.server.state.ServiceFactory;
import org.apache.ambari.server.state.StackId;
+import org.apache.ambari.server.state.State;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -55,6 +64,8 @@ public class ClustersDeadlockTest {
private final AtomicInteger hostNameCounter = new AtomicInteger(0);
+ private final StackId stackId = new StackId("HDP-0.1");
+
@Inject
private Injector injector;
@@ -62,7 +73,13 @@ public class ClustersDeadlockTest {
private Clusters clusters;
@Inject
- private AmbariMetaInfo metaInfo;
+ private ServiceFactory serviceFactory;
+
+ @Inject
+ private ServiceComponentFactory serviceComponentFactory;
+
+ @Inject
+ private ServiceComponentHostFactory serviceComponentHostFactory;
@Inject
private OrmTestHelper helper;
@@ -81,6 +98,9 @@ public class ClustersDeadlockTest {
cluster.setDesiredStackVersion(stackId);
helper.getOrCreateRepositoryVersion(stackId.getStackName(), stackId.getStackVersion());
cluster.createClusterVersion(stackId.getStackName(), stackId.getStackVersion(), "admin", RepositoryVersionState.UPGRADING);
+
+ // install HDFS
+ installService("HDFS");
}
@After
@@ -89,7 +109,7 @@ public class ClustersDeadlockTest {
}
/**
- * Tests that no deadlock exists when adding hosts from reading from the
+ * Tests that no deadlock exists when adding hosts while reading from the
* cluster.
*
* @throws Exception
@@ -117,7 +137,34 @@ public class ClustersDeadlockTest {
}
/**
- * Tests that no deadlock exists when adding hosts from reading from the
+ * Tests that no deadlock exists when adding hosts while reading from the
+ * cluster. This test ensures that there are service components installed on
+ * the hosts so that the cluster health report does some more work.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 35000)
+ public void testDeadlockWhileMappingHostsWithExistingServices()
+ throws Exception {
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < NUMBER_OF_THREADS; i++) {
+ ClusterReaderThread readerThread = new ClusterReaderThread();
+ ClustersHostAndComponentMapperThread writerThread = new ClustersHostAndComponentMapperThread();
+
+ threads.add(readerThread);
+ threads.add(writerThread);
+
+ readerThread.start();
+ writerThread.start();
+ }
+
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ }
+
+ /**
+ * Tests that no deadlock exists when adding hosts while reading from the
* cluster.
*
* @throws Exception
@@ -194,6 +241,38 @@ public class ClustersDeadlockTest {
}
/**
+ * The {@link ClustersHostAndComponentMapperThread} is used to map hosts to a
+ * cluster over and over. This will also add components to the hosts that are
+ * being mapped to further exercise the cluster health report concurrency.
+ */
+ private final class ClustersHostAndComponentMapperThread extends Thread {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < NUMBER_OF_HOSTS; i++) {
+ String hostName = "c64-" + hostNameCounter.getAndIncrement();
+ clusters.addHost(hostName);
+ setOsFamily(clusters.getHost(hostName), "redhat", "6.4");
+ clusters.getHost(hostName).persist();
+ clusters.mapHostToCluster(hostName, CLUSTER_NAME);
+
+ // create DATANODE on this host so that we end up exercising the
+ // cluster health report since we need a service component host
+ createNewServiceComponentHost("HDFS", "DATANODE", hostName);
+
+ Thread.sleep(10);
+ }
+ } catch (Exception exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+ }
+
+ /**
* The {@link ClustersHostUnMapperThread} is used to unmap hosts to a cluster
* over and over.
*/
@@ -235,4 +314,53 @@ public class ClustersDeadlockTest {
hostAttributes.put("os_release_version", osVersion);
host.setHostAttributes(hostAttributes);
}
+
+ private Service installService(String serviceName) throws AmbariException {
+ Service service = null;
+
+ try {
+ service = cluster.getService(serviceName);
+ } catch (ServiceNotFoundException e) {
+ service = serviceFactory.createNew(cluster, serviceName);
+ cluster.addService(service);
+ service.persist();
+ }
+
+ return service;
+ }
+
+ private ServiceComponent addServiceComponent(Service service,
+ String componentName) throws AmbariException {
+ ServiceComponent serviceComponent = null;
+ try {
+ serviceComponent = service.getServiceComponent(componentName);
+ } catch (ServiceComponentNotFoundException e) {
+ serviceComponent = serviceComponentFactory.createNew(service,
+ componentName);
+ service.addServiceComponent(serviceComponent);
+ serviceComponent.setDesiredState(State.INSTALLED);
+ serviceComponent.persist();
+ }
+
+ return serviceComponent;
+ }
+
+ private ServiceComponentHost createNewServiceComponentHost(String svc,
+ String svcComponent, String hostName) throws AmbariException {
+ Assert.assertNotNull(cluster.getConfigGroups());
+ Service s = installService(svc);
+ ServiceComponent sc = addServiceComponent(s, svcComponent);
+
+ ServiceComponentHost sch = serviceComponentHostFactory.createNew(sc,
+ hostName);
+
+ sc.addServiceComponentHost(sch);
+ sch.setDesiredState(State.INSTALLED);
+ sch.setState(State.INSTALLED);
+ sch.setDesiredStackVersion(stackId);
+ sch.setStackVersion(stackId);
+
+ sch.persist();
+ return sch;
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/f11e2f06/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
new file mode 100644
index 0000000..4b0c031
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.lang.reflect.Field;
+
+import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+
+/**
+ * The {@link EventBusSynchronizer} is used to replace the {@link AsyncEventBus}
+ * used by Guava with a synchronous, serial {@link EventBus} instance. This
+ * enables testing that relies on testing the outcome of asynchronous events by
+ * executing the events on the current thread serially.
+ */
+public class EventBusSynchronizer {
+
+ /**
+ * Force the {@link EventBus} from {@link AmbariEventPublisher} to be serial
+ * and synchronous.
+ *
+ * @param binder
+ */
+ public static void synchronizeAmbariEventPublisher(Binder binder) {
+ EventBus synchronizedBus = new EventBus();
+ AmbariEventPublisher ambariEventPublisher = new AmbariEventPublisher();
+
+ replaceEventBus(AmbariEventPublisher.class, ambariEventPublisher,
+ synchronizedBus);
+
+ binder.bind(AmbariEventPublisher.class).toInstance(ambariEventPublisher);
+ }
+
+ /**
+ * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
+ * and synchronous. Also register the known listeners. Registering known
+ * listeners is necessary since the event bus was replaced.
+ *
+ * @param injector
+ */
+ public static EventBus synchronizeAmbariEventPublisher(Injector injector) {
+ EventBus synchronizedBus = new EventBus();
+ AmbariEventPublisher publisher = injector.getInstance(AmbariEventPublisher.class);
+
+ replaceEventBus(AmbariEventPublisher.class, publisher, synchronizedBus);
+
+ // register common ambari event listeners
+ registerAmbariListeners(injector, synchronizedBus);
+
+ return synchronizedBus;
+ }
+
+ /**
+ * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
+ * and synchronous. Also register the known listeners. Registering known
+ * listeners is necessary since the event bus was replaced.
+ *
+ * @param injector
+ */
+ public static EventBus synchronizeAlertEventPublisher(Injector injector) {
+ EventBus synchronizedBus = new EventBus();
+ AlertEventPublisher publisher = injector.getInstance(AlertEventPublisher.class);
+
+ replaceEventBus(AlertEventPublisher.class, publisher, synchronizedBus);
+
+ // register common alert event listeners
+ registerAlertListeners(injector, synchronizedBus);
+
+ return synchronizedBus;
+ }
+
+ /**
+ * Register the normal listeners with the replaced synchronous bus.
+ *
+ * @param injector
+ * @param synchronizedBus
+ */
+ private static void registerAmbariListeners(Injector injector,
+ EventBus synchronizedBus) {
+ synchronizedBus.register(injector.getInstance(AlertMaintenanceModeListener.class));
+ synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
+ synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
+ synchronizedBus.register(injector.getInstance(DistributeRepositoriesActionListener.class));
+ }
+
+ /**
+ * Register the normal listeners with the replaced synchronous bus.
+ *
+ * @param injector
+ * @param synchronizedBus
+ */
+ private static void registerAlertListeners(Injector injector,
+ EventBus synchronizedBus) {
+ synchronizedBus.register(injector.getInstance(AlertAggregateListener.class));
+ synchronizedBus.register(injector.getInstance(AlertReceivedListener.class));
+ synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
+ }
+
+ private static void replaceEventBus(Class<?> eventPublisherClass,
+ Object instance, EventBus eventBus) {
+
+ try {
+ Field field = eventPublisherClass.getDeclaredField("m_eventBus");
+ field.setAccessible(true);
+ field.set(instance, eventBus);
+ } catch (Exception exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+}