You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2015/10/12 16:47:14 UTC
ambari git commit: AMBARI-13355. Journal node went in critical state
on Ambari : message on UI : Connection failed: [Errno 111] Connection refused
to 0.0.0.0:8480 (aonishuk)
Repository: ambari
Updated Branches:
refs/heads/branch-2.1 7ba85ef21 -> f0fec9578
AMBARI-13355. Journal node went in critical state on Ambari : message on UI : Connection failed: [Errno 111] Connection refused to 0.0.0.0:8480 (aonishuk)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/f0fec957
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/f0fec957
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/f0fec957
Branch: refs/heads/branch-2.1
Commit: f0fec9578b0aed69f8426b01aa32c5a586360563
Parents: 7ba85ef
Author: Andrew Onishuk <ao...@hortonworks.com>
Authored: Mon Oct 12 17:46:18 2015 +0300
Committer: Andrew Onishuk <ao...@hortonworks.com>
Committed: Mon Oct 12 17:47:06 2015 +0300
----------------------------------------------------------------------
.../server/upgrade/AbstractUpgradeCatalog.java | 14 ++
.../server/upgrade/SchemaUpgradeHelper.java | 3 +
.../server/upgrade/UpgradeCatalog213.java | 80 ++++++++++-
.../server/utils/EventBusSynchronizer.java | 139 +++++++++++++++++++
.../common-services/HDFS/2.1.0.2.0/alerts.json | 25 ++--
.../server/upgrade/UpgradeCatalog213Test.java | 23 +++
.../server/utils/EventBusSynchronizer.java | 139 -------------------
7 files changed, 273 insertions(+), 150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/f0fec957/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
index 62cd868..ed68313 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/AbstractUpgradeCatalog.java
@@ -341,6 +341,20 @@ public abstract class AbstractUpgradeCatalog implements UpgradeCatalog {
}
/**
+ * This method returns Map of clusters.
+ * Map can be empty or with some objects, but never be null.
+ */
+ protected Map<String, Cluster> getCheckedClusterMap(Clusters clusters) {
+ if (clusters != null) {
+ Map<String, Cluster> clusterMap = clusters.getClusters();
+ if (clusterMap != null) {
+ return clusterMap;
+ }
+ }
+ return new HashMap<>();
+ }
+
+ /**
* Create a new cluster scoped configuration with the new properties added
* with the values from the coresponding xml files.
*
http://git-wip-us.apache.org/repos/asf/ambari/blob/f0fec957/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
index cd57df1..7453d4b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/SchemaUpgradeHelper.java
@@ -30,6 +30,7 @@ import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.ControllerModule;
import org.apache.ambari.server.orm.DBAccessor;
+import org.apache.ambari.server.utils.EventBusSynchronizer;
import org.apache.ambari.server.utils.VersionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,6 +182,8 @@ public class SchemaUpgradeHelper {
catalogBinder.addBinding().to(UpgradeCatalog212.class);
catalogBinder.addBinding().to(UpgradeCatalog213.class);
catalogBinder.addBinding().to(FinalUpgradeCatalog.class);
+
+ EventBusSynchronizer.synchronizeAmbariEventPublisher(binder());
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/f0fec957/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
index dcdbb85..2c152e4 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog213.java
@@ -18,20 +18,28 @@
package org.apache.ambari.server.upgrade;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.orm.dao.AlertDefinitionDAO;
import org.apache.ambari.server.orm.dao.DaoUtils;
+import org.apache.ambari.server.orm.entities.AlertDefinitionEntity;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Config;
+import org.apache.ambari.server.state.alert.SourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
/**
* Upgrade catalog for version 2.1.3.
@@ -46,7 +54,7 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
/**
* Logger.
*/
- private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog212.class);
+ private static final Logger LOG = LoggerFactory.getLogger(UpgradeCatalog213.class);
@Inject
DaoUtils daoUtils;
@@ -104,6 +112,76 @@ public class UpgradeCatalog213 extends AbstractUpgradeCatalog {
protected void executeDMLUpdates() throws AmbariException, SQLException {
addMissingConfigs();
updateAMSConfigs();
+ updateAlertDefinitions();
+ }
+
+ /**
+ * Modifies the JSON of some of the alert definitions which have changed
+ * between Ambari versions.
+ */
+ protected void updateAlertDefinitions() {
+ LOG.info("Updating alert definitions.");
+ AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class);
+ AlertDefinitionDAO alertDefinitionDAO = injector.getInstance(AlertDefinitionDAO.class);
+ Clusters clusters = ambariManagementController.getClusters();
+
+ Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters);
+ for (final Cluster cluster : clusterMap.values()) {
+ final AlertDefinitionEntity alertDefinitionEntity = alertDefinitionDAO.findByName(
+ cluster.getClusterId(), "journalnode_process");
+
+ if (alertDefinitionEntity != null) {
+ String source = alertDefinitionEntity.getSource();
+
+ alertDefinitionEntity.setSource(modifyJournalnodeProcessAlertSource(source));
+ alertDefinitionEntity.setSourceType(SourceType.WEB);
+ alertDefinitionEntity.setHash(UUID.randomUUID().toString());
+
+ alertDefinitionDAO.merge(alertDefinitionEntity);
+ LOG.info("journalnode_process alert definition was updated.");
+ }
+ }
+ }
+
+ /**
+ * Modifies type of the journalnode_process alert to WEB.
+ * Changes reporting text and uri according to the WEB type.
+ * Removes default_port property.
+ */
+ String modifyJournalnodeProcessAlertSource(String source) {
+ JsonObject rootJson = new JsonParser().parse(source).getAsJsonObject();
+
+ rootJson.remove("type");
+ rootJson.addProperty("type", "WEB");
+
+ rootJson.remove("default_port");
+
+ rootJson.remove("uri");
+ JsonObject uriJson = new JsonObject();
+ uriJson.addProperty("http", "{{hdfs-site/dfs.journalnode.http-address}}");
+ uriJson.addProperty("https", "{{hdfs-site/dfs.journalnode.https-address}}");
+ uriJson.addProperty("kerberos_keytab", "{{hdfs-site/dfs.web.authentication.kerberos.keytab}}");
+ uriJson.addProperty("kerberos_principal", "{{hdfs-site/dfs.web.authentication.kerberos.principal}}");
+ uriJson.addProperty("https_property", "{{hdfs-site/dfs.http.policy}}");
+ uriJson.addProperty("https_property_value", "HTTPS_ONLY");
+ uriJson.addProperty("connection_timeout", 5.0);
+ rootJson.add("uri", uriJson);
+
+ rootJson.getAsJsonObject("reporting").getAsJsonObject("ok").remove("text");
+ rootJson.getAsJsonObject("reporting").getAsJsonObject("ok").addProperty(
+ "text", "HTTP {0} response in {2:.3f}s");
+
+ rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").remove("text");
+ rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").addProperty(
+ "text", "HTTP {0} response from {1} in {2:.3f}s ({3})");
+ rootJson.getAsJsonObject("reporting").getAsJsonObject("warning").remove("value");
+
+ rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").remove("text");
+ rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").addProperty("text",
+ "Connection failed to {1} ({3})");
+ rootJson.getAsJsonObject("reporting").getAsJsonObject("critical").remove("value");
+
+ return rootJson.toString();
}
protected void addMissingConfigs() throws AmbariException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/f0fec957/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
new file mode 100644
index 0000000..d9be622
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.lang.reflect.Field;
+
+import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
+import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
+import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
+import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
+import org.apache.ambari.server.events.publishers.AlertEventPublisher;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+
+import com.google.common.eventbus.AsyncEventBus;
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Binder;
+import com.google.inject.Injector;
+
+/**
+ * The {@link EventBusSynchronizer} is used to replace the {@link AsyncEventBus}
+ * used by Guava with a synchronous, serial {@link EventBus} instance. This
+ * enables testing that relies on testing the outcome of asynchronous events by
+ * executing the events on the current thread serially.
+ */
+public class EventBusSynchronizer {
+
+ /**
+ * Force the {@link EventBus} from {@link AmbariEventPublisher} to be serial
+ * and synchronous.
+ *
+ * @param binder
+ */
+ public static void synchronizeAmbariEventPublisher(Binder binder) {
+ EventBus synchronizedBus = new EventBus();
+ AmbariEventPublisher ambariEventPublisher = new AmbariEventPublisher();
+
+ replaceEventBus(AmbariEventPublisher.class, ambariEventPublisher,
+ synchronizedBus);
+
+ binder.bind(AmbariEventPublisher.class).toInstance(ambariEventPublisher);
+ }
+
+ /**
+ * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
+ * and synchronous. Also register the known listeners. Registering known
+ * listeners is necessary since the event bus was replaced.
+ *
+ * @param injector
+ */
+ public static EventBus synchronizeAmbariEventPublisher(Injector injector) {
+ EventBus synchronizedBus = new EventBus();
+ AmbariEventPublisher publisher = injector.getInstance(AmbariEventPublisher.class);
+
+ replaceEventBus(AmbariEventPublisher.class, publisher, synchronizedBus);
+
+ // register common ambari event listeners
+ registerAmbariListeners(injector, synchronizedBus);
+
+ return synchronizedBus;
+ }
+
+ /**
+ * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
+ * and synchronous. Also register the known listeners. Registering known
+ * listeners is necessary since the event bus was replaced.
+ *
+ * @param injector
+ */
+ public static EventBus synchronizeAlertEventPublisher(Injector injector) {
+ EventBus synchronizedBus = new EventBus();
+ AlertEventPublisher publisher = injector.getInstance(AlertEventPublisher.class);
+
+ replaceEventBus(AlertEventPublisher.class, publisher, synchronizedBus);
+
+ // register common alert event listeners
+ registerAlertListeners(injector, synchronizedBus);
+
+ return synchronizedBus;
+ }
+
+ /**
+ * Register the normal listeners with the replaced synchronous bus.
+ *
+ * @param injector
+ * @param synchronizedBus
+ */
+ private static void registerAmbariListeners(Injector injector,
+ EventBus synchronizedBus) {
+ synchronizedBus.register(injector.getInstance(AlertMaintenanceModeListener.class));
+ synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
+ synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
+ synchronizedBus.register(injector.getInstance(DistributeRepositoriesActionListener.class));
+ synchronizedBus.register(injector.getInstance(HostVersionOutOfSyncListener.class));
+ }
+
+ /**
+ * Register the normal listeners with the replaced synchronous bus.
+ *
+ * @param injector
+ * @param synchronizedBus
+ */
+ private static void registerAlertListeners(Injector injector,
+ EventBus synchronizedBus) {
+ synchronizedBus.register(injector.getInstance(AlertAggregateListener.class));
+ synchronizedBus.register(injector.getInstance(AlertReceivedListener.class));
+ synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
+ }
+
+ private static void replaceEventBus(Class<?> eventPublisherClass,
+ Object instance, EventBus eventBus) {
+
+ try {
+ Field field = eventPublisherClass.getDeclaredField("m_eventBus");
+ field.setAccessible(true);
+ field.set(instance, eventBus);
+ } catch (Exception exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/f0fec957/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json
index 2ea9446..1eda00f 100644
--- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json
+++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/alerts.json
@@ -585,26 +585,31 @@
"JOURNALNODE": [
{
"name": "journalnode_process",
- "label": "JournalNode Process",
- "description": "This host-level alert is triggered if the JournalNode process cannot be confirmed to be up and listening on the network.",
+ "label": "JournalNode Web UI",
+ "description": "This host-level alert is triggered if the JournalNode Web UI is unreachable.",
"interval": 1,
"scope": "HOST",
"enabled": true,
"source": {
- "type": "PORT",
- "uri": "{{hdfs-site/dfs.journalnode.http-address}}",
- "default_port": 8480,
+ "type": "WEB",
+ "uri": {
+ "http": "{{hdfs-site/dfs.journalnode.http-address}}",
+ "https": "{{hdfs-site/dfs.journalnode.https-address}}",
+ "kerberos_keytab": "{{hdfs-site/dfs.web.authentication.kerberos.keytab}}",
+ "kerberos_principal": "{{hdfs-site/dfs.web.authentication.kerberos.principal}}",
+ "https_property": "{{hdfs-site/dfs.http.policy}}",
+ "https_property_value": "HTTPS_ONLY",
+ "connection_timeout": 5.0
+ },
"reporting": {
"ok": {
- "text": "TCP OK - {0:.3f}s response on port {1}"
+ "text": "HTTP {0} response in {2:.3f}s"
},
"warning": {
- "text": "TCP OK - {0:.3f}s response on port {1}",
- "value": 1.5
+ "text": "HTTP {0} response from {1} in {2:.3f}s ({3})"
},
"critical": {
- "text": "Connection failed: {0} to {1}:{2}",
- "value": 5.0
+ "text": "Connection failed to {1} ({3})"
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/f0fec957/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
index a54dee2..73b3a18 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog213Test.java
@@ -97,16 +97,20 @@ public class UpgradeCatalog213Test {
public void testExecuteDMLUpdates() throws Exception {
Method addMissingConfigs = UpgradeCatalog213.class.getDeclaredMethod("addMissingConfigs");
Method updateAMSConfigs = UpgradeCatalog213.class.getDeclaredMethod("updateAMSConfigs");
+ Method updateAlertDefinitions = UpgradeCatalog213.class.getDeclaredMethod("updateAlertDefinitions");
UpgradeCatalog213 upgradeCatalog213 = createMockBuilder(UpgradeCatalog213.class)
.addMockedMethod(addMissingConfigs)
.addMockedMethod(updateAMSConfigs)
+ .addMockedMethod(updateAlertDefinitions)
.createMock();
upgradeCatalog213.addMissingConfigs();
expectLastCall().once();
upgradeCatalog213.updateAMSConfigs();
expectLastCall().once();
+ upgradeCatalog213.updateAlertDefinitions();
+ expectLastCall().once();
replay(upgradeCatalog213);
@@ -258,6 +262,25 @@ public class UpgradeCatalog213Test {
easyMockSupport.verifyAll();
}
+ @Test
+ public void testModifyJournalnodeProcessAlertSource() throws Exception {
+ UpgradeCatalog213 upgradeCatalog213 = new UpgradeCatalog213(injector);
+ String alertSource = "{\"uri\":\"{{hdfs-site/dfs.journalnode.http-address}}\",\"default_port\":8480," +
+ "\"type\":\"PORT\",\"reporting\":{\"ok\":{\"text\":\"TCP OK - {0:.3f}s response on port {1}\"}," +
+ "\"warning\":{\"text\":\"TCP OK - {0:.3f}s response on port {1}\",\"value\":1.5}," +
+ "\"critical\":{\"text\":\"Connection failed: {0} to {1}:{2}\",\"value\":5.0}}}";
+ String expected = "{\"reporting\":{\"ok\":{\"text\":\"HTTP {0} response in {2:.3f}s\"}," +
+ "\"warning\":{\"text\":\"HTTP {0} response from {1} in {2:.3f}s ({3})\"}," +
+ "\"critical\":{\"text\":\"Connection failed to {1} ({3})\"}},\"type\":\"WEB\"," +
+ "\"uri\":{\"http\":\"{{hdfs-site/dfs.journalnode.http-address}}\"," +
+ "\"https\":\"{{hdfs-site/dfs.journalnode.https-address}}\"," +
+ "\"kerberos_keytab\":\"{{hdfs-site/dfs.web.authentication.kerberos.keytab}}\"," +
+ "\"kerberos_principal\":\"{{hdfs-site/dfs.web.authentication.kerberos.principal}}\"," +
+ "\"https_property\":\"{{hdfs-site/dfs.http.policy}}\"," +
+ "\"https_property_value\":\"HTTPS_ONLY\",\"connection_timeout\":5.0}}";
+ Assert.assertEquals(expected, upgradeCatalog213.modifyJournalnodeProcessAlertSource(alertSource));
+ }
+
/**
* @param dbAccessor
* @return
http://git-wip-us.apache.org/repos/asf/ambari/blob/f0fec957/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
deleted file mode 100644
index d9be622..0000000
--- a/ambari-server/src/test/java/org/apache/ambari/server/utils/EventBusSynchronizer.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ambari.server.utils;
-
-import java.lang.reflect.Field;
-
-import org.apache.ambari.server.events.listeners.alerts.AlertAggregateListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertLifecycleListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertMaintenanceModeListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertServiceStateListener;
-import org.apache.ambari.server.events.listeners.alerts.AlertStateChangedListener;
-import org.apache.ambari.server.events.listeners.upgrade.DistributeRepositoriesActionListener;
-import org.apache.ambari.server.events.listeners.upgrade.HostVersionOutOfSyncListener;
-import org.apache.ambari.server.events.publishers.AlertEventPublisher;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
-
-import com.google.common.eventbus.AsyncEventBus;
-import com.google.common.eventbus.EventBus;
-import com.google.inject.Binder;
-import com.google.inject.Injector;
-
-/**
- * The {@link EventBusSynchronizer} is used to replace the {@link AsyncEventBus}
- * used by Guava with a synchronous, serial {@link EventBus} instance. This
- * enables testing that relies on testing the outcome of asynchronous events by
- * executing the events on the current thread serially.
- */
-public class EventBusSynchronizer {
-
- /**
- * Force the {@link EventBus} from {@link AmbariEventPublisher} to be serial
- * and synchronous.
- *
- * @param binder
- */
- public static void synchronizeAmbariEventPublisher(Binder binder) {
- EventBus synchronizedBus = new EventBus();
- AmbariEventPublisher ambariEventPublisher = new AmbariEventPublisher();
-
- replaceEventBus(AmbariEventPublisher.class, ambariEventPublisher,
- synchronizedBus);
-
- binder.bind(AmbariEventPublisher.class).toInstance(ambariEventPublisher);
- }
-
- /**
- * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
- * and synchronous. Also register the known listeners. Registering known
- * listeners is necessary since the event bus was replaced.
- *
- * @param injector
- */
- public static EventBus synchronizeAmbariEventPublisher(Injector injector) {
- EventBus synchronizedBus = new EventBus();
- AmbariEventPublisher publisher = injector.getInstance(AmbariEventPublisher.class);
-
- replaceEventBus(AmbariEventPublisher.class, publisher, synchronizedBus);
-
- // register common ambari event listeners
- registerAmbariListeners(injector, synchronizedBus);
-
- return synchronizedBus;
- }
-
- /**
- * Force the {@link EventBus} from {@link AlertEventPublisher} to be serial
- * and synchronous. Also register the known listeners. Registering known
- * listeners is necessary since the event bus was replaced.
- *
- * @param injector
- */
- public static EventBus synchronizeAlertEventPublisher(Injector injector) {
- EventBus synchronizedBus = new EventBus();
- AlertEventPublisher publisher = injector.getInstance(AlertEventPublisher.class);
-
- replaceEventBus(AlertEventPublisher.class, publisher, synchronizedBus);
-
- // register common alert event listeners
- registerAlertListeners(injector, synchronizedBus);
-
- return synchronizedBus;
- }
-
- /**
- * Register the normal listeners with the replaced synchronous bus.
- *
- * @param injector
- * @param synchronizedBus
- */
- private static void registerAmbariListeners(Injector injector,
- EventBus synchronizedBus) {
- synchronizedBus.register(injector.getInstance(AlertMaintenanceModeListener.class));
- synchronizedBus.register(injector.getInstance(AlertLifecycleListener.class));
- synchronizedBus.register(injector.getInstance(AlertServiceStateListener.class));
- synchronizedBus.register(injector.getInstance(DistributeRepositoriesActionListener.class));
- synchronizedBus.register(injector.getInstance(HostVersionOutOfSyncListener.class));
- }
-
- /**
- * Register the normal listeners with the replaced synchronous bus.
- *
- * @param injector
- * @param synchronizedBus
- */
- private static void registerAlertListeners(Injector injector,
- EventBus synchronizedBus) {
- synchronizedBus.register(injector.getInstance(AlertAggregateListener.class));
- synchronizedBus.register(injector.getInstance(AlertReceivedListener.class));
- synchronizedBus.register(injector.getInstance(AlertStateChangedListener.class));
- }
-
- private static void replaceEventBus(Class<?> eventPublisherClass,
- Object instance, EventBus eventBus) {
-
- try {
- Field field = eventPublisherClass.getDeclaredField("m_eventBus");
- field.setAccessible(true);
- field.set(instance, eventBus);
- } catch (Exception exception) {
- throw new RuntimeException(exception);
- }
- }
-}