You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2018/01/09 18:47:56 UTC
[ambari] branch branch-3.0-perf updated: AMBARI-22742. Add event
for changes in upgrade. (mpapirkovskyy)
This is an automated email from the ASF dual-hosted git repository.
mpapirkovskyy pushed a commit to branch branch-3.0-perf
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/branch-3.0-perf by this push:
new 2fc4855 AMBARI-22742. Add event for changes in upgrade. (mpapirkovskyy)
2fc4855 is described below
commit 2fc4855cda19acd0b4f9f9f45a7edc3a9620a09a
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
AuthorDate: Mon Dec 11 15:08:27 2017 +0200
AMBARI-22742. Add event for changes in upgrade. (mpapirkovskyy)
---
.../internal/UpgradeResourceProvider.java | 22 +-
.../ambari/server/events/AmbariUpdateEvent.java | 1 +
.../server/events/DefaultMessageEmitter.java | 1 +
.../ambari/server/events/RequestUpdateEvent.java | 4 +-
.../ambari/server/events/UpgradeUpdateEvent.java | 277 +++++++++++++++++++++
.../listeners/upgrade/UpgradeUpdateListener.java | 63 +++++
6 files changed, 363 insertions(+), 5 deletions(-)
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
index 1fbf130..08f1392 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UpgradeResourceProvider.java
@@ -62,6 +62,9 @@ import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
import org.apache.ambari.server.controller.spi.SystemException;
import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
+import org.apache.ambari.server.events.UpdateEventType;
+import org.apache.ambari.server.events.UpgradeUpdateEvent;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
import org.apache.ambari.server.orm.dao.RequestDAO;
@@ -253,6 +256,15 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
@Inject
private static UpgradeContextFactory s_upgradeContextFactory;
+ @Inject
+ private StateUpdateEventPublisher stateUpdateEventPublisher;
+
+ @Inject
+ private HostRoleCommandDAO hostRoleCommandDAO;
+
+ @Inject
+ private RequestDAO requestDAO;
+
static {
// properties
PROPERTY_IDS.add(UPGRADE_CLUSTER_NAME);
@@ -442,7 +454,7 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
*
* @return the percent complete, counting ABORTED as zero percent.
*/
- private double calculateAbortedProgress(Map<Long, HostRoleCommandStatusSummaryDTO> summary) {
+ public static double calculateAbortedProgress(Map<Long, HostRoleCommandStatusSummaryDTO> summary) {
// !!! use the raw states to determine percent completes
Map<HostRoleStatus, Integer> countTotals = new HashMap<>();
int totalTasks = 0;
@@ -879,6 +891,8 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
upgradeEntity.setRequestEntity(requestEntity);
s_upgradeDAO.create(upgradeEntity);
+ stateUpdateEventPublisher.publish(UpgradeUpdateEvent
+ .formFullEvent(s_hostRoleCommandDAO, s_requestDAO, upgradeEntity, UpdateEventType.CREATE));
cluster.setUpgradeEntity(upgradeEntity);
return upgradeEntity;
@@ -1510,7 +1524,8 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
if (suspended) {
// set the upgrade to suspended
upgradeEntity.setSuspended(suspended);
- s_upgradeDAO.merge(upgradeEntity);
+ upgradeEntity = s_upgradeDAO.merge(upgradeEntity);
+ stateUpdateEventPublisher.publish(UpgradeUpdateEvent.formUpdateEvent(hostRoleCommandDAO,requestDAO, upgradeEntity));
} else {
// otherwise remove the association with the cluster since it's being
// full aborted
@@ -1530,7 +1545,8 @@ public class UpgradeResourceProvider extends AbstractControllerResourceProvider
UpgradeEntity lastUpgradeItemForCluster = s_upgradeDAO.findLastUpgradeOrDowngradeForCluster(cluster.getClusterId());
lastUpgradeItemForCluster.setSuspended(false);
- s_upgradeDAO.merge(lastUpgradeItemForCluster);
+ lastUpgradeItemForCluster = s_upgradeDAO.merge(lastUpgradeItemForCluster);
+ stateUpdateEventPublisher.publish(UpgradeUpdateEvent.formUpdateEvent(hostRoleCommandDAO, requestDAO, lastUpgradeItemForCluster));
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
index 4a522ba..58dcecf 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/AmbariUpdateEvent.java
@@ -59,6 +59,7 @@ public abstract class AmbariUpdateEvent {
HOST("events.hosts"),
UI_ALERT_DEFINITIONS("events.alert_definitions"),
ALERT_DEFINITIONS("alert_definitions"),
+ UPGRADE("events.upgrade"),
COMMAND("events.commands");
/**
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
index 254a652..88ebc86 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/DefaultMessageEmitter.java
@@ -44,6 +44,7 @@ public class DefaultMessageEmitter extends MessageEmitter {
put(AmbariUpdateEvent.Type.COMMAND, "/commands");
put(AmbariUpdateEvent.Type.ALERT_DEFINITIONS, "/alert_definitions");
put(AmbariUpdateEvent.Type.UI_ALERT_DEFINITIONS, "/events/alert_definitions");
+ put(AmbariUpdateEvent.Type.UPGRADE, "/events/upgrade");
}});
public DefaultMessageEmitter(AgentSessionManager agentSessionManager, SimpMessagingTemplate simpMessagingTemplate) {
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
index 4133c62..7426b11 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/RequestUpdateEvent.java
@@ -197,14 +197,14 @@ public class RequestUpdateEvent extends AmbariUpdateEvent {
if (!id.equals(that.id)) return false;
if (!requestId.equals(that.requestId)) return false;
- return hostName.equals(that.hostName);
+ return hostName != null ? hostName.equals(that.hostName) : that.hostName == null;
}
@Override
public int hashCode() {
int result = id.hashCode();
result = 31 * result + requestId.hashCode();
- result = 31 * result + hostName.hashCode();
+ result = 31 * result + (hostName != null ? hostName.hashCode() : 0);
return result;
}
}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/UpgradeUpdateEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/UpgradeUpdateEvent.java
new file mode 100644
index 0000000..7fbd40d
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/UpgradeUpdateEvent.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.events;
+
+import java.util.Map;
+
+import org.apache.ambari.server.actionmanager.HostRoleStatus;
+import org.apache.ambari.server.controller.internal.CalculatedStatus;
+import org.apache.ambari.server.controller.internal.UpgradeResourceProvider;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.entities.RequestEntity;
+import org.apache.ambari.server.orm.entities.UpgradeEntity;
+import org.apache.ambari.server.state.stack.upgrade.Direction;
+import org.apache.ambari.server.state.stack.upgrade.UpgradeType;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Contains information about cluster upgrade state update. This update will be sent to all subscribed recipients.
+ * Used for messaging to UI.
+ */
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+public class UpgradeUpdateEvent extends AmbariUpdateEvent {
+
+ @JsonProperty("associated_version")
+ private String associatedVersion;
+
+ @JsonProperty("cluster_id")
+ private Long clusterId;
+
+ @JsonProperty("direction")
+ private Direction direction;
+
+ @JsonProperty("downgrade_allowed")
+ private Boolean downgradeAllowed;
+
+ @JsonProperty("request_id")
+ private Long requestId;
+
+ @JsonProperty("request_status")
+ private HostRoleStatus requestStatus;
+
+ @JsonProperty("skip_failures")
+ private Boolean skipFailures;
+
+ @JsonProperty("skip_service_check_failures")
+ private Boolean skipServiceCheckFailures;
+
+ @JsonProperty("upgrade_type")
+ private UpgradeType upgradeType;
+
+ @JsonProperty("start_time")
+ private Long startTime;
+
+ @JsonProperty("end_time")
+ private Long endTime;
+
+ @JsonProperty("upgrade_id")
+ private Long upgradeId;
+
+ @JsonProperty("suspended")
+ private Boolean suspended;
+
+ @JsonProperty("progress_percent")
+ private Double progressPercent;
+
+ @JsonProperty("revert_allowed")
+ private Boolean revertAllowed;
+
+ @JsonProperty("type")
+ private UpdateEventType type;
+
+ private UpgradeUpdateEvent(UpdateEventType type) {
+ super(Type.UPGRADE);
+ this.type = type;
+ }
+
+ public static UpgradeUpdateEvent formFullEvent(HostRoleCommandDAO hostRoleCommandDAO, RequestDAO requestDAO, UpgradeEntity upgradeEntity,
+ UpdateEventType type) {
+ UpgradeUpdateEvent upgradeUpdateEvent = new UpgradeUpdateEvent(UpdateEventType.CREATE);
+ Map<Long, HostRoleCommandStatusSummaryDTO> summary = hostRoleCommandDAO.findAggregateCounts(
+ upgradeEntity.getRequestId());
+ CalculatedStatus calc = CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
+ double progressPercent;
+ if (calc.getStatus() == HostRoleStatus.ABORTED && upgradeEntity.isSuspended()) {
+ double percent = UpgradeResourceProvider.calculateAbortedProgress(summary);
+ progressPercent = percent*100;
+ } else {
+ progressPercent = calc.getPercent();
+ }
+
+ RequestEntity rentity = requestDAO.findByPK(upgradeEntity.getRequestId());
+
+ upgradeUpdateEvent.setUpgradeId(upgradeEntity.getId());
+ upgradeUpdateEvent.setAssociatedVersion(upgradeEntity.getRepositoryVersion().getVersion());
+ upgradeUpdateEvent.setClusterId(upgradeEntity.getClusterId());
+ upgradeUpdateEvent.setDirection(upgradeEntity.getDirection());
+ upgradeUpdateEvent.setDowngradeAllowed(upgradeEntity.isDowngradeAllowed());
+ upgradeUpdateEvent.setRequestId(upgradeEntity.getRequestId());
+ upgradeUpdateEvent.setRequestStatus(calc.getStatus());
+ upgradeUpdateEvent.setSkipFailures(upgradeEntity.isComponentFailureAutoSkipped());
+ upgradeUpdateEvent.setSkipServiceCheckFailures(upgradeEntity.isServiceCheckFailureAutoSkipped());
+ upgradeUpdateEvent.setUpgradeType(upgradeEntity.getUpgradeType());
+ upgradeUpdateEvent.setStartTime(rentity.getStartTime());
+ upgradeUpdateEvent.setEndTime(rentity.getEndTime());
+ upgradeUpdateEvent.setSuspended(upgradeEntity.isSuspended());
+ upgradeUpdateEvent.setProgressPercent(progressPercent);
+ upgradeUpdateEvent.setRevertAllowed(upgradeEntity.isRevertAllowed());
+
+ return upgradeUpdateEvent;
+ }
+
+ public static UpgradeUpdateEvent formUpdateEvent(HostRoleCommandDAO hostRoleCommandDAO, RequestDAO requestDAO,
+ UpgradeEntity upgradeEntity) {
+ Map<Long, HostRoleCommandStatusSummaryDTO> summary = hostRoleCommandDAO.findAggregateCounts(
+ upgradeEntity.getRequestId());
+ CalculatedStatus calc = CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
+ double progressPercent;
+ if (calc.getStatus() == HostRoleStatus.ABORTED && upgradeEntity.isSuspended()) {
+ double percent = UpgradeResourceProvider.calculateAbortedProgress(summary);
+ progressPercent = percent*100;
+ } else {
+ progressPercent = calc.getPercent();
+ }
+ RequestEntity rentity = requestDAO.findByPK(upgradeEntity.getRequestId());
+
+ UpgradeUpdateEvent upgradeUpdateEvent = new UpgradeUpdateEvent(UpdateEventType.UPDATE);
+ upgradeUpdateEvent.setRequestId(upgradeEntity.getRequestId());
+ upgradeUpdateEvent.setProgressPercent(progressPercent);
+ upgradeUpdateEvent.setSuspended(upgradeEntity.isSuspended());
+ upgradeUpdateEvent.setStartTime(rentity.getStartTime());
+ upgradeUpdateEvent.setEndTime(rentity.getEndTime());
+ upgradeUpdateEvent.setClusterId(upgradeEntity.getClusterId());
+ upgradeUpdateEvent.setRequestStatus(calc.getStatus());
+
+ return upgradeUpdateEvent;
+ }
+
+ public String getAssociatedVersion() {
+ return associatedVersion;
+ }
+
+ public void setAssociatedVersion(String associatedVersion) {
+ this.associatedVersion = associatedVersion;
+ }
+
+ public Long getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(Long clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public Direction getDirection() {
+ return direction;
+ }
+
+ public void setDirection(Direction direction) {
+ this.direction = direction;
+ }
+
+ public Boolean getDowngradeAllowed() {
+ return downgradeAllowed;
+ }
+
+ public void setDowngradeAllowed(Boolean downgradeAllowed) {
+ this.downgradeAllowed = downgradeAllowed;
+ }
+
+ public Long getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(Long requestId) {
+ this.requestId = requestId;
+ }
+
+ public HostRoleStatus getRequestStatus() {
+ return requestStatus;
+ }
+
+ public void setRequestStatus(HostRoleStatus requestStatus) {
+ this.requestStatus = requestStatus;
+ }
+
+ public Boolean getSkipFailures() {
+ return skipFailures;
+ }
+
+ public void setSkipFailures(Boolean skipFailures) {
+ this.skipFailures = skipFailures;
+ }
+
+ public Boolean getSkipServiceCheckFailures() {
+ return skipServiceCheckFailures;
+ }
+
+ public void setSkipServiceCheckFailures(Boolean skipServiceCheckFailures) {
+ this.skipServiceCheckFailures = skipServiceCheckFailures;
+ }
+
+ public UpgradeType getUpgradeType() {
+ return upgradeType;
+ }
+
+ public void setUpgradeType(UpgradeType upgradeType) {
+ this.upgradeType = upgradeType;
+ }
+
+ public Long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(Long startTime) {
+ this.startTime = startTime;
+ }
+
+ public Long getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(Long endTime) {
+ this.endTime = endTime;
+ }
+
+ public Long getUpgradeId() {
+ return upgradeId;
+ }
+
+ public void setUpgradeId(Long upgradeId) {
+ this.upgradeId = upgradeId;
+ }
+
+ public Boolean getSuspended() {
+ return suspended;
+ }
+
+ public void setSuspended(Boolean suspended) {
+ this.suspended = suspended;
+ }
+
+ public Double getProgressPercent() {
+ return progressPercent;
+ }
+
+ public void setProgressPercent(Double progressPercent) {
+ this.progressPercent = progressPercent;
+ }
+
+ public Boolean getRevertAllowed() {
+ return revertAllowed;
+ }
+
+ public void setRevertAllowed(Boolean revertAllowed) {
+ this.revertAllowed = revertAllowed;
+ }
+}
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
new file mode 100644
index 0000000..f1e3fd4
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/UpgradeUpdateListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.events.listeners.upgrade;
+
+import org.apache.ambari.server.EagerSingleton;
+import org.apache.ambari.server.events.RequestUpdateEvent;
+import org.apache.ambari.server.events.UpgradeUpdateEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
+import org.apache.ambari.server.orm.dao.RequestDAO;
+import org.apache.ambari.server.orm.dao.UpgradeDAO;
+import org.apache.ambari.server.orm.entities.UpgradeEntity;
+
+import com.google.common.eventbus.Subscribe;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+@EagerSingleton
+public class UpgradeUpdateListener {
+
+ private StateUpdateEventPublisher stateUpdateEventPublisher;
+
+ @Inject
+ private UpgradeDAO upgradeDAO;
+
+ @Inject
+ private HostRoleCommandDAO hostRoleCommandDAO;
+
+ @Inject
+ private RequestDAO requestDAO;
+
+ @Inject
+ public UpgradeUpdateListener(StateUpdateEventPublisher stateUpdateEventPublisher, AmbariEventPublisher ambariEventPublisher) {
+ stateUpdateEventPublisher.register(this);
+
+ this.stateUpdateEventPublisher = stateUpdateEventPublisher;
+ }
+
+ @Subscribe
+ public void onRequestUpdate(RequestUpdateEvent requestUpdateEvent) {
+ UpgradeEntity upgradeEntity = upgradeDAO.findUpgradeByRequestId(requestUpdateEvent.getRequestId());
+ if (upgradeEntity != null) {
+ stateUpdateEventPublisher.publish(UpgradeUpdateEvent.formUpdateEvent(hostRoleCommandDAO, requestDAO, upgradeEntity));
+ }
+ }
+}
--
To stop receiving notification emails like this one, please contact
['"commits@ambari.apache.org" <co...@ambari.apache.org>'].