You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by tb...@apache.org on 2015/03/07 15:49:13 UTC
ambari git commit: AMBARI-9965 - RU - Improve performance for large
cluster in StackVersionListener (tbeerbower)
Repository: ambari
Updated Branches:
refs/heads/trunk d451aced8 -> e1d4d9766
AMBARI-9965 - RU - Improve performance for large cluster in StackVersionListener (tbeerbower)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e1d4d976
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e1d4d976
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e1d4d976
Branch: refs/heads/trunk
Commit: e1d4d9766a222e9375a114e6f959499cabf05ea8
Parents: d451ace
Author: tbeerbower <tb...@hortonworks.com>
Authored: Sat Mar 7 09:48:13 2015 -0500
Committer: tbeerbower <tb...@hortonworks.com>
Committed: Sat Mar 7 09:48:28 2015 -0500
----------------------------------------------------------------------
.../ambari/server/agent/HeartBeatHandler.java | 69 ++++++++-------
.../listeners/upgrade/StackVersionListener.java | 8 +-
.../publishers/VersionEventPublisher.java | 65 ++++++++++++++
.../upgrade/StackVersionListenerTest.java | 60 +++++++++++++
.../publishers/VersionEventPublisherTest.java | 90 ++++++++++++++++++++
5 files changed, 259 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 539af00..8833148 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -31,7 +31,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
-import com.google.common.reflect.TypeToken;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.HostNotFoundException;
import org.apache.ambari.server.Role;
@@ -51,6 +50,7 @@ import org.apache.ambari.server.events.AlertReceivedEvent;
import org.apache.ambari.server.events.HostComponentVersionEvent;
import org.apache.ambari.server.events.publishers.AlertEventPublisher;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
import org.apache.ambari.server.metadata.ActionMetadata;
import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
import org.apache.ambari.server.serveraction.kerberos.KerberosActionDataFile;
@@ -153,6 +153,9 @@ public class HeartBeatHandler {
@Inject
private AmbariEventPublisher ambariEventPublisher;
+ @Inject
+ private VersionEventPublisher versionEventPublisher;
+
/**
* KerberosPrincipalHostDAO used to set and get Kerberos principal details
*/
@@ -523,14 +526,13 @@ public class HeartBeatHandler {
//Json structure for component version was incorrect
//do nothing, pass this data further for processing
}
- if (structuredOutput != null && StringUtils.isNotBlank(structuredOutput.getVersion())) {
- handleComponentVersionReceived(scHost, structuredOutput.getVersion());
- }
- // Safer to recalculate the version even if we don't detect a difference in the value.
- // This is useful in case that a manual database edit is done while ambari-server is stopped.
- // TODO should be included into handleComponentVersionReceived() after RU becomes stable
- HostComponentVersionEvent event = new HostComponentVersionEvent(cl, scHost);
- ambariEventPublisher.publish(event);
+
+ String newVersion = structuredOutput == null ? null : structuredOutput.getVersion();
+
+ // Pass true to always publish a version event. It is safer to recalculate the version even if we don't
+ // detect a difference in the value. This is useful in case that a manual database edit is done while
+ // ambari-server is stopped.
+ handleComponentVersionReceived(cl, scHost, newVersion, true);
}
// Updating stack version, if needed
@@ -676,12 +678,9 @@ public class HeartBeatHandler {
scHost.setProcesses(list);
}
if (extra.containsKey("version")) {
- boolean versionWasUpdated = handleComponentVersionReceived(scHost, extra.get("version").toString());
- if (versionWasUpdated) {
- // TODO should be included into handleComponentVersionReceived() after RU becomes stable
- HostComponentVersionEvent event = new HostComponentVersionEvent(cl, scHost);
- ambariEventPublisher.publish(event);
- }
+ String version = extra.get("version").toString();
+
+ handleComponentVersionReceived(cl, scHost, version, false);
}
} catch (Exception e) {
@@ -734,27 +733,39 @@ public class HeartBeatHandler {
}
/**
- * Updates version of service component and sets upgrade state if needed.
+ * Updates the version of the given service component, sets the upgrade state (if needed)
+ * and publishes a version event through the version event publisher.
*
- * @param scHost service component host
- * @param newVersion new version of service component
- *
- * @return true if component version was updated to new one
+ * @param cluster the cluster
+ * @param scHost service component host
+ * @param newVersion new version of service component
+ * @param alwaysPublish if true, always publish a version event; if false,
+ * only publish if the component version was updated
*/
- private boolean handleComponentVersionReceived(ServiceComponentHost scHost, String newVersion) {
- final String previousVersion = scHost.getVersion();
- if (!StringUtils.equals(previousVersion, newVersion)) {
- scHost.setVersion(newVersion);
- if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
- scHost.setUpgradeState(UpgradeState.COMPLETE);
+ private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
+ String newVersion, boolean alwaysPublish) {
+
+ boolean updated = false;
+
+ if (StringUtils.isNotBlank(newVersion)) {
+ final String previousVersion = scHost.getVersion();
+ if (!StringUtils.equals(previousVersion, newVersion)) {
+ scHost.setVersion(newVersion);
+ if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
+ scHost.setUpgradeState(UpgradeState.COMPLETE);
+ }
+ updated = true;
}
- return true;
}
- return false;
+
+ if (updated || alwaysPublish) {
+ HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
+ versionEventPublisher.publish(event);
+ }
}
/**
- * Adds commands from action queue to a heartbeat responce
+ * Adds commands from action queue to a heartbeat response.
*/
protected void sendCommands(String hostname, HeartBeatResponse response)
throws AmbariException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
index 5460092..b09a273 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
@@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.ambari.server.EagerSingleton;
import org.apache.ambari.server.events.HostComponentVersionEvent;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.slf4j.Logger;
@@ -58,11 +58,11 @@ public class StackVersionListener {
/**
* Constructor.
*
- * @param ambariEventPublisher
+ * @param eventPublisher the publisher
*/
@Inject
- public StackVersionListener(AmbariEventPublisher ambariEventPublisher) {
- ambariEventPublisher.register(this);
+ public StackVersionListener(VersionEventPublisher eventPublisher) {
+ eventPublisher.register(this);
}
@Subscribe
http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java
new file mode 100644
index 0000000..3a11f38
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Singleton;
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+
+/**
+ * The {@link VersionEventPublisher} is used to publish instances of
+ * {@link HostComponentVersionEvent} to any {@link com.google.common.eventbus.Subscribe} interested.
+ * It uses a single-threaded, serial {@link EventBus}.
+ */
+@Singleton
+public class VersionEventPublisher {
+ /**
+ * A single threaded event bus for processing version events serially.
+ */
+ private final EventBus m_eventBus;
+
+ /**
+ * Constructor.
+ */
+ public VersionEventPublisher() {
+ m_eventBus = new EventBus("version-event-bus");
+ }
+
+ /**
+ * Publishes the specified event to all registered listeners that
+ * {@link com.google.common.eventbus.Subscribe} to any of the
+ * {@link org.apache.ambari.server.events.HostComponentVersionEvent} instances.
+ *
+ * @param event the event
+ */
+ public void publish(HostComponentVersionEvent event) {
+ m_eventBus.post(event);
+ }
+
+ /**
+ * Register a listener to receive events. The listener should use the
+ * {@link com.google.common.eventbus.Subscribe} annotation.
+ *
+ * @param object
+ * the listener to receive events.
+ */
+ public void register(Object object) {
+ m_eventBus.register(object);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
new file mode 100644
index 0000000..b44ac30
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.upgrade;
+
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.*;
+
+/**
+ * StackVersionListener tests.
+ */
+public class StackVersionListenerTest {
+
+ @Test
+ public void testOnAmbariEvent() throws Exception {
+
+ VersionEventPublisher publisher = createNiceMock(VersionEventPublisher.class);
+
+ Cluster cluster = createNiceMock(Cluster.class);
+ ServiceComponentHost sch = createNiceMock(ServiceComponentHost.class);
+
+ expect(cluster.getClusterId()).andReturn(99L);
+ cluster.recalculateClusterVersionState("1.0.0");
+
+ expect(sch.recalculateHostVersionState()).andReturn("1.0.0").anyTimes();
+
+ replay(cluster, sch);
+
+ HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, sch);
+
+ StackVersionListener listener = new StackVersionListener(publisher);
+
+ listener.onAmbariEvent(event);
+
+ verify(cluster, sch);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java
new file mode 100644
index 0000000..071c6f0
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * VersionEventPublisher tests.
+ */
+public class VersionEventPublisherTest {
+
+ private Injector injector;
+
+ @Before
+ public void setup() throws Exception {
+ injector = Guice.createInjector();
+ }
+
+ @Test
+ public void testPublish() throws Exception {
+
+ Cluster cluster = createNiceMock(Cluster.class);
+ ServiceComponentHost sch = createNiceMock(ServiceComponentHost.class);
+
+ expect(cluster.getClusterId()).andReturn(99L);
+
+ replay(cluster, sch);
+
+ VersionEventPublisher publisher = injector.getInstance(VersionEventPublisher.class);
+
+ Listener listener = injector.getInstance(Listener.class);
+
+ HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, sch);
+
+ publisher.publish(event);
+
+ assertEquals(event, listener.getLastEvent());
+
+ verify(cluster, sch);
+ }
+
+ private static class Listener {
+
+ private HostComponentVersionEvent lastEvent = null;
+
+ @Inject
+ private Listener(VersionEventPublisher eventPublisher) {
+ eventPublisher.register(this);
+ }
+
+ @Subscribe
+ public void onEvent(HostComponentVersionEvent event) {
+ lastEvent = event;
+ }
+
+ public HostComponentVersionEvent getLastEvent() {
+ return lastEvent;
+ }
+ }
+}
\ No newline at end of file