You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2018/01/09 18:47:56 UTC

[ambari] branch branch-3.0-perf updated: AMBARI-22742. Add event for changes in upgrade. (mpapirkovskyy)

This is an automated email from the ASF dual-hosted git repository.

mpapirkovskyy pushed a commit to branch branch-3.0-perf
in repository https://gitbox.apache.org/repos/asf/ambari.git


The following commit(s) were added to refs/heads/branch-3.0-perf by this push:
     new 2fc4855  AMBARI-22742. Add event for changes in upgrade. (mpapirkovskyy)
2fc4855 is described below

commit 2fc4855cda19acd0b4f9f9f45a7edc3a9620a09a
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
AuthorDate: Mon Dec 11 15:08:27 2017 +0200

    AMBARI-22742. Add event for changes in upgrade. (mpapirkovskyy)
---
 .../internal/UpgradeResourceProvider.java          |  22 +-
 .../ambari/server/events/AmbariUpdateEvent.java    |   1 +
 .../server/events/DefaultMessageEmitter.java       |   1 +
 .../ambari/server/events/RequestUpdateEvent.java   |   4 +-
 .../ambari/server/events/UpgradeUpdateEvent.java   | 277 +++++++++++++++++++++
 .../listeners/upgrade/UpgradeUpdateListener.java   |  63 +++++
 6 files changed, 363 insertions(+), 5 deletions(-)

diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 1fbf130..08f1392 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -62,6 +62,9 @@ import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
 import org.apache.ambari.server.controller.spi.SystemException;
 import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.events.UpdateEventType;
+import org.apache.ambari.server.events.UpgradeUpdateEvent;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
 import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
 import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
 import org.apache.ambari.server.orm.dao.RequestDAO;
@@ -253,6 +256,15 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
   @Inject
   private static UpgradeContextFactory s_upgradeContextFactory;
 
+  @Inject
+  private StateUpdateEventPublisher stateUpdateEventPublisher;
+
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Inject
+  private RequestDAO requestDAO;
+
   static {
     // properties
     PROPERTY_IDS.add(UPGRADE_CLUSTER_NAME);
@@ -442,7 +454,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
    *
    * @return the percent complete, counting ABORTED as zero percent.
    */
-  private double calculateAbortedProgress(Map<Long, HostRoleCommandStatusSummaryDTO> summary) {
+  public static double calculateAbortedProgress(Map<Long, HostRoleCommandStatusSummaryDTO> summary) {
     // !!! use the raw states to determine percent completes
     Map<HostRoleStatus, Integer> countTotals = new HashMap<>();
     int totalTasks = 0;
@@ -879,6 +891,8 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
     upgradeEntity.setRequestEntity(requestEntity);
     s_upgradeDAO.create(upgradeEntity);
 
+    stateUpdateEventPublisher.publish(UpgradeUpdateEvent
+        .formFullEvent(s_hostRoleCommandDAO, s_requestDAO, upgradeEntity, UpdateEventType.CREATE));
     cluster.setUpgradeEntity(upgradeEntity);
 
     return upgradeEntity;
@@ -1510,7 +1524,8 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
       if (suspended) {
         // set the upgrade to suspended
         upgradeEntity.setSuspended(suspended);
-        s_upgradeDAO.merge(upgradeEntity);
+        upgradeEntity = s_upgradeDAO.merge(upgradeEntity);
+        stateUpdateEventPublisher.publish(UpgradeUpdateEvent.formUpdateEvent(hostRoleCommandDAO,requestDAO, upgradeEntity));
       } else {
         // otherwise remove the association with the cluster since it's being
         // full aborted
@@ -1530,7 +1545,8 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
 
       UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeOrDowngradeForCluster(cluster.getClusterId());
       lastUpgradeItemForCluster.setSuspended(false);
-      s_upgradeDAO.merge(lastUpgradeItemForCluster);
+      lastUpgradeItemForCluster = s_upgradeDAO.merge(lastUpgradeItemForCluster);
+      stateUpdateEventPublisher.publish(UpgradeUpdateEvent.formUpdateEvent(hostRoleCommandDAO, requestDAO, lastUpgradeItemForCluster));
     }
   }
 
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
index 4a522ba..58dcecf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
@@ -59,6 +59,7 @@ public abstract class AmbariUpdateEvent {
     HOST("events.hosts"),
     UI_ALERT_DEFINITIONS("events.alert_definitions"),
     ALERT_DEFINITIONS("alert_definitions"),
+    UPGRADE("events.upgrade"),
     COMMAND("events.commands");
 
     /**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
index 254a652..88ebc86 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -44,6 +44,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
         put(AmbariUpdateEvent.Type.COMMAND, "/commands");
         put(AmbariUpdateEvent.Type.ALERT_DEFINITIONS, "/alert_definitions");
         put(AmbariUpdateEvent.Type.UI_ALERT_DEFINITIONS, "/events/alert_definitions");
+        put(AmbariUpdateEvent.Type.UPGRADE, "/events/upgrade");
   }});
 
   public DefaultMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
index 4133c62..7426b11 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
@@ -197,14 +197,14 @@ public class RequestUpdateEvent extends AmbariUpdateEvent {
 
       if (!id.equals(that.id)) return false;
       if (!requestId.equals(that.requestId)) return false;
-      return hostName.equals(that.hostName);
+      return hostName != null ? hostName.equals(that.hostName) : that.hostName == null;
     }
 
     @Override
     public int hashCode() {
       int result = id.hashCode();
       result = 31 * result + requestId.hashCode();
-      result = 31 * result + hostName.hashCode();
+      result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
       return result;
     }
   }
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/UpgradeUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/UpgradeUpdateEvent.java
new file mode 100644
index 0000000..7fbd40d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/UpgradeUpdateEvent.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.Map;
+
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.controller.internal.UpgradeResourceProvider;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.UpgradeEntity;
+import org.apache.ambari.server.state.stack.upgrade.Direction;
+import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Contains information about cluster upgrade state update. This update will be sent to all subscribed recipients.
+ * Used for messaging to UI.
+ */
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+public class UpgradeUpdateEvent extends AmbariUpdateEvent {
+
+  @JsonProperty("associated_version")
+  private String associatedVersion;
+
+  @JsonProperty("cluster_id")
+  private Long clusterId;
+
+  @JsonProperty("direction")
+  private Direction direction;
+
+  @JsonProperty("downgrade_allowed")
+  private Boolean downgradeAllowed;
+
+  @JsonProperty("request_id")
+  private Long requestId;
+
+  @JsonProperty("request_status")
+  private HostRoleStatus requestStatus;
+
+  @JsonProperty("skip_failures")
+  private Boolean skipFailures;
+
+  @JsonProperty("skip_service_check_failures")
+  private Boolean skipServiceCheckFailures;
+
+  @JsonProperty("upgrade_type")
+  private UpgradeType upgradeType;
+
+  @JsonProperty("start_time")
+  private Long startTime;
+
+  @JsonProperty("end_time")
+  private Long endTime;
+
+  @JsonProperty("upgrade_id")
+  private Long upgradeId;
+
+  @JsonProperty("suspended")
+  private Boolean suspended;
+
+  @JsonProperty("progress_percent")
+  private Double progressPercent;
+
+  @JsonProperty("revert_allowed")
+  private Boolean revertAllowed;
+
+  @JsonProperty("type")
+  private UpdateEventType type;
+
+  private UpgradeUpdateEvent(UpdateEventType type) {
+    super(Type.UPGRADE);
+    this.type = type;
+  }
+
+  public static UpgradeUpdateEvent formFullEvent(HostRoleCommandDAO hostRoleCommandDAO, RequestDAO requestDAO, UpgradeEntity upgradeEntity,
+                                                 UpdateEventType type) {
+    UpgradeUpdateEvent upgradeUpdateEvent = new UpgradeUpdateEvent(UpdateEventType.CREATE);
+    Map<Long, HostRoleCommandStatusSummaryDTO> summary = hostRoleCommandDAO.findAggregateCounts(
+        upgradeEntity.getRequestId());
+    CalculatedStatus calc = CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
+    double progressPercent;
+    if (calc.getStatus() == HostRoleStatus.ABORTED && upgradeEntity.isSuspended()) {
+      double percent = UpgradeResourceProvider.calculateAbortedProgress(summary);
+      progressPercent = percent*100;
+    } else {
+      progressPercent = calc.getPercent();
+    }
+
+    RequestEntity rentity = requestDAO.findByPK(upgradeEntity.getRequestId());
+
+    upgradeUpdateEvent.setUpgradeId(upgradeEntity.getId());
+    upgradeUpdateEvent.setAssociatedVersion(upgradeEntity.getRepositoryVersion().getVersion());
+    upgradeUpdateEvent.setClusterId(upgradeEntity.getClusterId());
+    upgradeUpdateEvent.setDirection(upgradeEntity.getDirection());
+    upgradeUpdateEvent.setDowngradeAllowed(upgradeEntity.isDowngradeAllowed());
+    upgradeUpdateEvent.setRequestId(upgradeEntity.getRequestId());
+    upgradeUpdateEvent.setRequestStatus(calc.getStatus());
+    upgradeUpdateEvent.setSkipFailures(upgradeEntity.isComponentFailureAutoSkipped());
+    upgradeUpdateEvent.setSkipServiceCheckFailures(upgradeEntity.isServiceCheckFailureAutoSkipped());
+    upgradeUpdateEvent.setUpgradeType(upgradeEntity.getUpgradeType());
+    upgradeUpdateEvent.setStartTime(rentity.getStartTime());
+    upgradeUpdateEvent.setEndTime(rentity.getEndTime());
+    upgradeUpdateEvent.setSuspended(upgradeEntity.isSuspended());
+    upgradeUpdateEvent.setProgressPercent(progressPercent);
+    upgradeUpdateEvent.setRevertAllowed(upgradeEntity.isRevertAllowed());
+
+    return upgradeUpdateEvent;
+  }
+
+  public static UpgradeUpdateEvent formUpdateEvent(HostRoleCommandDAO hostRoleCommandDAO, RequestDAO requestDAO,
+                                                   UpgradeEntity upgradeEntity) {
+    Map<Long, HostRoleCommandStatusSummaryDTO> summary = hostRoleCommandDAO.findAggregateCounts(
+        upgradeEntity.getRequestId());
+    CalculatedStatus calc = CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
+    double progressPercent;
+    if (calc.getStatus() == HostRoleStatus.ABORTED && upgradeEntity.isSuspended()) {
+      double percent = UpgradeResourceProvider.calculateAbortedProgress(summary);
+      progressPercent = percent*100;
+    } else {
+      progressPercent = calc.getPercent();
+    }
+    RequestEntity rentity = requestDAO.findByPK(upgradeEntity.getRequestId());
+
+    UpgradeUpdateEvent upgradeUpdateEvent = new UpgradeUpdateEvent(UpdateEventType.UPDATE);
+    upgradeUpdateEvent.setRequestId(upgradeEntity.getRequestId());
+    upgradeUpdateEvent.setProgressPercent(progressPercent);
+    upgradeUpdateEvent.setSuspended(upgradeEntity.isSuspended());
+    upgradeUpdateEvent.setStartTime(rentity.getStartTime());
+    upgradeUpdateEvent.setEndTime(rentity.getEndTime());
+    upgradeUpdateEvent.setClusterId(upgradeEntity.getClusterId());
+    upgradeUpdateEvent.setRequestStatus(calc.getStatus());
+
+    return upgradeUpdateEvent;
+  }
+
+  public String getAssociatedVersion() {
+    return associatedVersion;
+  }
+
+  public void setAssociatedVersion(String associatedVersion) {
+    this.associatedVersion = associatedVersion;
+  }
+
+  public Long getClusterId() {
+    return clusterId;
+  }
+
+  public void setClusterId(Long clusterId) {
+    this.clusterId = clusterId;
+  }
+
+  public Direction getDirection() {
+    return direction;
+  }
+
+  public void setDirection(Direction direction) {
+    this.direction = direction;
+  }
+
+  public Boolean getDowngradeAllowed() {
+    return downgradeAllowed;
+  }
+
+  public void setDowngradeAllowed(Boolean downgradeAllowed) {
+    this.downgradeAllowed = downgradeAllowed;
+  }
+
+  public Long getRequestId() {
+    return requestId;
+  }
+
+  public void setRequestId(Long requestId) {
+    this.requestId = requestId;
+  }
+
+  public HostRoleStatus getRequestStatus() {
+    return requestStatus;
+  }
+
+  public void setRequestStatus(HostRoleStatus requestStatus) {
+    this.requestStatus = requestStatus;
+  }
+
+  public Boolean getSkipFailures() {
+    return skipFailures;
+  }
+
+  public void setSkipFailures(Boolean skipFailures) {
+    this.skipFailures = skipFailures;
+  }
+
+  public Boolean getSkipServiceCheckFailures() {
+    return skipServiceCheckFailures;
+  }
+
+  public void setSkipServiceCheckFailures(Boolean skipServiceCheckFailures) {
+    this.skipServiceCheckFailures = skipServiceCheckFailures;
+  }
+
+  public UpgradeType getUpgradeType() {
+    return upgradeType;
+  }
+
+  public void setUpgradeType(UpgradeType upgradeType) {
+    this.upgradeType = upgradeType;
+  }
+
+  public Long getStartTime() {
+    return startTime;
+  }
+
+  public void setStartTime(Long startTime) {
+    this.startTime = startTime;
+  }
+
+  public Long getEndTime() {
+    return endTime;
+  }
+
+  public void setEndTime(Long endTime) {
+    this.endTime = endTime;
+  }
+
+  public Long getUpgradeId() {
+    return upgradeId;
+  }
+
+  public void setUpgradeId(Long upgradeId) {
+    this.upgradeId = upgradeId;
+  }
+
+  public Boolean getSuspended() {
+    return suspended;
+  }
+
+  public void setSuspended(Boolean suspended) {
+    this.suspended = suspended;
+  }
+
+  public Double getProgressPercent() {
+    return progressPercent;
+  }
+
+  public void setProgressPercent(Double progressPercent) {
+    this.progressPercent = progressPercent;
+  }
+
+  public Boolean getRevertAllowed() {
+    return revertAllowed;
+  }
+
+  public void setRevertAllowed(Boolean revertAllowed) {
+    this.revertAllowed = revertAllowed;
+  }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
new file mode 100644
index 0000000..f1e3fd4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.upgrade;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.RequestUpdateEvent;
+import org.apache.ambari.server.events.UpgradeUpdateEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.UpgradeDAO;
+import org.apache.ambari.server.orm.entities.UpgradeEntity;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+@EagerSingleton
+public class UpgradeUpdateListener {
+
+  private StateUpdateEventPublisher stateUpdateEventPublisher;
+
+  @Inject
+  private UpgradeDAO upgradeDAO;
+
+  @Inject
+  private HostRoleCommandDAO hostRoleCommandDAO;
+
+  @Inject
+  private RequestDAO requestDAO;
+
+  @Inject
+  public UpgradeUpdateListener(StateUpdateEventPublisher stateUpdateEventPublisher, AmbariEventPublisher ambariEventPublisher) {
+    stateUpdateEventPublisher.register(this);
+
+    this.stateUpdateEventPublisher = stateUpdateEventPublisher;
+  }
+
+  @Subscribe
+  public void onRequestUpdate(RequestUpdateEvent requestUpdateEvent) {
+    UpgradeEntity upgradeEntity = upgradeDAO.findUpgradeByRequestId(requestUpdateEvent.getRequestId());
+    if (upgradeEntity != null) {
+      stateUpdateEventPublisher.publish(UpgradeUpdateEvent.formUpdateEvent(hostRoleCommandDAO, requestDAO, upgradeEntity));
+    }
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@ambari.apache.org" <co...@ambari.apache.org>'].