You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by tb...@apache.org on 2015/03/07 15:49:13 UTC

ambari git commit: AMBARI-9965 - RU - Improve performance for large cluster in StackVersionListener (tbeerbower)

Repository: ambari
Updated Branches:
  refs/heads/trunk d451aced8 -> e1d4d9766


AMBARI-9965 - RU - Improve performance for large cluster in StackVersionListener (tbeerbower)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e1d4d976
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e1d4d976
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e1d4d976

Branch: refs/heads/trunk
Commit: e1d4d9766a222e9375a114e6f959499cabf05ea8
Parents: d451ace
Author: tbeerbower <tb...@hortonworks.com>
Authored: Sat Mar 7 09:48:13 2015 -0500
Committer: tbeerbower <tb...@hortonworks.com>
Committed: Sat Mar 7 09:48:28 2015 -0500

----------------------------------------------------------------------
 .../ambari/server/agent/HeartBeatHandler.java   | 69 ++++++++-------
 .../listeners/upgrade/StackVersionListener.java |  8 +-
 .../publishers/VersionEventPublisher.java       | 65 ++++++++++++++
 .../upgrade/StackVersionListenerTest.java       | 60 +++++++++++++
 .../publishers/VersionEventPublisherTest.java   | 90 ++++++++++++++++++++
 5 files changed, 259 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 539af00..8833148 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -31,7 +31,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Pattern;
 
-import com.google.common.reflect.TypeToken;
 import org.apache.ambari.server.AmbariException;
 import org.apache.ambari.server.HostNotFoundException;
 import org.apache.ambari.server.Role;
@@ -51,6 +50,7 @@ import org.apache.ambari.server.events.AlertReceivedEvent;
 import org.apache.ambari.server.events.HostComponentVersionEvent;
 import org.apache.ambari.server.events.publishers.AlertEventPublisher;
 import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
 import org.apache.ambari.server.metadata.ActionMetadata;
 import org.apache.ambari.server.orm.dao.KerberosPrincipalHostDAO;
 import org.apache.ambari.server.serveraction.kerberos.KerberosActionDataFile;
@@ -153,6 +153,9 @@ public class HeartBeatHandler {
   @Inject
   private AmbariEventPublisher ambariEventPublisher;
 
+  @Inject
+  private VersionEventPublisher versionEventPublisher;
+
   /**
    * KerberosPrincipalHostDAO used to set and get Kerberos principal details
    */
@@ -523,14 +526,13 @@ public class HeartBeatHandler {
                 //Json structure for component version was incorrect
                 //do nothing, pass this data further for processing
               }
-              if (structuredOutput != null && StringUtils.isNotBlank(structuredOutput.getVersion())) {
-                handleComponentVersionReceived(scHost, structuredOutput.getVersion());
-              }
-              // Safer to recalculate the version even if we don't detect a difference in the value.
-              // This is useful in case that a manual database edit is done while ambari-server is stopped.
-              // TODO should be included into handleComponentVersionReceived() after RU becomes stable
-              HostComponentVersionEvent event = new HostComponentVersionEvent(cl, scHost);
-              ambariEventPublisher.publish(event);
+
+              String newVersion = structuredOutput == null ? null : structuredOutput.getVersion();
+
+              // Pass true to always publish a version event.  It is safer to recalculate the version even if we don't
+              // detect a difference in the value.  This is useful in case that a manual database edit is done while
+              // ambari-server is stopped.
+              handleComponentVersionReceived(cl, scHost, newVersion, true);
             }
 
             // Updating stack version, if needed
@@ -676,12 +678,9 @@ public class HeartBeatHandler {
                     scHost.setProcesses(list);
                   }
                   if (extra.containsKey("version")) {
-                    boolean versionWasUpdated = handleComponentVersionReceived(scHost, extra.get("version").toString());
-                    if (versionWasUpdated) {
-                      // TODO should be included into handleComponentVersionReceived() after RU becomes stable
-                      HostComponentVersionEvent event = new HostComponentVersionEvent(cl, scHost);
-                      ambariEventPublisher.publish(event);
-                    }
+                    String version = extra.get("version").toString();
+
+                    handleComponentVersionReceived(cl, scHost, version, false);
                   }
 
                 } catch (Exception e) {
@@ -734,27 +733,39 @@ public class HeartBeatHandler {
   }
 
   /**
-   * Updates version of service component and sets upgrade state if needed.
+   * Updates the version of the given service component, sets the upgrade state (if needed)
+   * and publishes a version event through the version event publisher.
    *
-   * @param scHost service component host
-   * @param newVersion new version of service component
-   *
-   * @return true if component version was updated to new one
+   * @param cluster        the cluster
+   * @param scHost         service component host
+   * @param newVersion     new version of service component
+   * @param alwaysPublish  if true, always publish a version event; if false,
+   *                       only publish if the component version was updated
    */
-  private boolean handleComponentVersionReceived(ServiceComponentHost scHost, String newVersion) {
-    final String previousVersion = scHost.getVersion();
-    if (!StringUtils.equals(previousVersion, newVersion)) {
-      scHost.setVersion(newVersion);
-      if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
-        scHost.setUpgradeState(UpgradeState.COMPLETE);
+  private void handleComponentVersionReceived(Cluster cluster, ServiceComponentHost scHost,
+                                              String newVersion, boolean alwaysPublish) {
+
+    boolean updated = false;
+
+    if (StringUtils.isNotBlank(newVersion)) {
+      final String previousVersion = scHost.getVersion();
+      if (!StringUtils.equals(previousVersion, newVersion)) {
+        scHost.setVersion(newVersion);
+        if (previousVersion != null && !previousVersion.equalsIgnoreCase(State.UNKNOWN.toString())) {
+          scHost.setUpgradeState(UpgradeState.COMPLETE);
+        }
+        updated = true;
       }
-      return true;
     }
-    return false;
+
+    if (updated || alwaysPublish) {
+      HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, scHost);
+      versionEventPublisher.publish(event);
+    }
   }
 
   /**
-   * Adds commands from action queue to a heartbeat responce
+   * Adds commands from action queue to a heartbeat response.
    */
   protected void sendCommands(String hostname, HeartBeatResponse response)
       throws AmbariException {

http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
index 5460092..b09a273 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListener.java
@@ -22,7 +22,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.ambari.server.EagerSingleton;
 import org.apache.ambari.server.events.HostComponentVersionEvent;
-import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
 import org.apache.ambari.server.state.Cluster;
 import org.apache.ambari.server.state.ServiceComponentHost;
 import org.slf4j.Logger;
@@ -58,11 +58,11 @@ public class StackVersionListener {
   /**
    * Constructor.
    *
-   * @param ambariEventPublisher
+   * @param eventPublisher  the publisher
    */
   @Inject
-  public StackVersionListener(AmbariEventPublisher ambariEventPublisher) {
-    ambariEventPublisher.register(this);
+  public StackVersionListener(VersionEventPublisher eventPublisher) {
+    eventPublisher.register(this);
   }
 
   @Subscribe

http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java
new file mode 100644
index 0000000..3a11f38
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/publishers/VersionEventPublisher.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Singleton;
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+
+/**
+ * The {@link VersionEventPublisher} is used to publish instances of
+ * {@link HostComponentVersionEvent} to any {@link com.google.common.eventbus.Subscribe} interested.
+ * It uses a single-threaded, serial {@link EventBus}.
+ */
+@Singleton
+public class VersionEventPublisher {
+  /**
+   * A single threaded event bus for processing version events serially.
+   */
+  private final EventBus m_eventBus;
+
+  /**
+   * Constructor.
+   */
+  public VersionEventPublisher() {
+    m_eventBus = new EventBus("version-event-bus");
+  }
+
+  /**
+   * Publishes the specified event to all registered listeners that
+   * {@link com.google.common.eventbus.Subscribe} to any of the
+   * {@link org.apache.ambari.server.events.HostComponentVersionEvent} instances.
+   *
+   * @param event the event
+   */
+  public void publish(HostComponentVersionEvent event) {
+    m_eventBus.post(event);
+  }
+
+  /**
+   * Register a listener to receive events. The listener should use the
+   * {@link com.google.common.eventbus.Subscribe} annotation.
+   *
+   * @param object
+   *          the listener to receive events.
+   */
+  public void register(Object object) {
+    m_eventBus.register(object);
+  }
+}

http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
new file mode 100644
index 0000000..b44ac30
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/listeners/upgrade/StackVersionListenerTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.upgrade;
+
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+import org.apache.ambari.server.events.publishers.VersionEventPublisher;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.*;
+
+/**
+ * StackVersionListener tests.
+ */
+public class StackVersionListenerTest {
+
+  @Test
+  public void testOnAmbariEvent() throws Exception {
+
+    VersionEventPublisher publisher = createNiceMock(VersionEventPublisher.class);
+
+    Cluster cluster = createNiceMock(Cluster.class);
+    ServiceComponentHost sch = createNiceMock(ServiceComponentHost.class);
+
+    expect(cluster.getClusterId()).andReturn(99L);
+    cluster.recalculateClusterVersionState("1.0.0");
+
+    expect(sch.recalculateHostVersionState()).andReturn("1.0.0").anyTimes();
+
+    replay(cluster, sch);
+
+    HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, sch);
+
+    StackVersionListener listener = new StackVersionListener(publisher);
+
+    listener.onAmbariEvent(event);
+
+    verify(cluster, sch);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/e1d4d976/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java b/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java
new file mode 100644
index 0000000..071c6f0
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/events/publishers/VersionEventPublisherTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events.publishers;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import org.apache.ambari.server.events.HostComponentVersionEvent;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.ServiceComponentHost;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.createNiceMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * VersionEventPublisher tests.
+ */
+public class VersionEventPublisherTest {
+
+  private Injector injector;
+
+  @Before
+  public void setup() throws Exception {
+    injector = Guice.createInjector();
+  }
+
+  @Test
+  public void testPublish() throws Exception {
+
+    Cluster cluster = createNiceMock(Cluster.class);
+    ServiceComponentHost sch = createNiceMock(ServiceComponentHost.class);
+
+    expect(cluster.getClusterId()).andReturn(99L);
+
+    replay(cluster, sch);
+
+    VersionEventPublisher publisher = injector.getInstance(VersionEventPublisher.class);
+
+    Listener listener = injector.getInstance(Listener.class);
+
+    HostComponentVersionEvent event = new HostComponentVersionEvent(cluster, sch);
+
+    publisher.publish(event);
+
+    assertEquals(event, listener.getLastEvent());
+
+    verify(cluster, sch);
+  }
+
+  private static class Listener {
+
+    private HostComponentVersionEvent lastEvent = null;
+
+    @Inject
+    private Listener(VersionEventPublisher eventPublisher) {
+      eventPublisher.register(this);
+    }
+
+    @Subscribe
+    public void onEvent(HostComponentVersionEvent event) {
+      lastEvent = event;
+    }
+
+    public HostComponentVersionEvent getLastEvent() {
+      return lastEvent;
+    }
+  }
+}
\ No newline at end of file