You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by mp...@apache.org on 2017/06/07 15:51:41 UTC
[3/3] ambari git commit: AMBARI-21193. Add TopologyChange event to
the server (mpapirkovskyy)
AMBARI-21193. Add TopologyChange event to the server (mpapirkovskyy)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c2ab4a3c
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c2ab4a3c
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c2ab4a3c
Branch: refs/heads/branch-3.0-perf
Commit: c2ab4a3ce19fba934f3766f30a4738d8ebbb061e
Parents: 5d09624
Author: Myroslav Papirkovskyi <mp...@hortonworks.com>
Authored: Wed Jun 7 18:50:54 2017 +0300
Committer: Myroslav Papirkovskyi <mp...@hortonworks.com>
Committed: Wed Jun 7 18:50:54 2017 +0300
----------------------------------------------------------------------
.../server/HostNotRegisteredException.java | 33 +++
.../actionmanager/ActionDBAccessorImpl.java | 10 +
.../server/agent/AgentSessionManager.java | 50 +++++
.../ambari/server/agent/HeartBeatHandler.java | 66 +++++-
.../ambari/server/agent/HeartbeatProcessor.java | 45 +++--
.../agent/stomp/AgentReportsController.java | 82 ++++++++
.../server/agent/stomp/HeartbeatController.java | 16 +-
.../agent/stomp/dto/ComponentStatusReport.java | 84 ++++++++
.../agent/stomp/dto/ComponentStatusReports.java | 45 +++++
.../server/agent/stomp/dto/TopologyCluster.java | 61 ++++++
.../agent/stomp/dto/TopologyComponent.java | 136 +++++++++++++
.../server/agent/stomp/dto/TopologyHost.java | 89 +++++++++
.../stomp/dto/TopologyStatusCommandParams.java | 47 +++++
.../server/api/services/AmbariMetaInfo.java | 25 +++
.../configuration/spring/ApiStompConfig.java | 9 +
.../configuration/spring/GuiceBeansConfig.java | 2 -
.../AmbariManagementControllerImpl.java | 73 +++++--
.../controller/internal/CalculatedStatus.java | 33 +++
.../internal/ComponentResourceProvider.java | 24 ++-
.../DeleteHostComponentStatusMetaData.java | 98 +++++++++
.../internal/HostResourceProvider.java | 62 ++++++
.../internal/RequestResourceProvider.java | 24 +--
.../internal/ServiceResourceProvider.java | 9 +-
.../ambari/server/events/AlertUpdateEvent.java | 45 +++++
.../ambari/server/events/AmbariUpdateEvent.java | 26 +++
.../ambari/server/events/CommandEvent.java | 25 +++
.../server/events/ConfigsUpdateEvent.java | 200 +++++++++++++++++++
.../server/events/HostComponentUpdateEvent.java | 94 +++++++++
.../server/events/MetadataUpdateEvent.java | 25 +++
.../server/events/RequestUpdateEvent.java | 108 ++++++++++
.../server/events/TopologyUpdateEvent.java | 63 ++++++
.../listeners/alerts/AlertReceivedListener.java | 10 +
.../listeners/requests/StateUpdateListener.java | 52 +++++
.../listeners/tasks/TaskStatusListener.java | 23 ++-
.../publishers/StateUpdateEventPublisher.java | 45 +++++
.../ambari/server/orm/dao/RequestDAO.java | 4 +-
.../server/orm/entities/ResourceEntity.java | 2 -
.../org/apache/ambari/server/state/Cluster.java | 3 +-
.../org/apache/ambari/server/state/Service.java | 7 +-
.../ambari/server/state/ServiceComponent.java | 7 +-
.../server/state/ServiceComponentHost.java | 3 +-
.../server/state/ServiceComponentImpl.java | 21 +-
.../apache/ambari/server/state/ServiceImpl.java | 33 ++-
.../server/state/cluster/ClusterImpl.java | 52 ++++-
.../svccomphost/ServiceComponentHostImpl.java | 33 ++-
.../server/topology/TopologyDeleteFormer.java | 98 +++++++++
.../server/upgrade/UpgradeCatalog200.java | 1 +
.../configuration/RecoveryConfigHelperTest.java | 3 +-
.../internal/ComponentResourceProviderTest.java | 2 +-
.../internal/ServiceResourceProviderTest.java | 6 +-
.../apache/ambari/server/events/EventsTest.java | 9 +-
.../listeners/tasks/TaskStatusListenerTest.java | 9 +-
.../HostVersionOutOfSyncListenerTest.java | 3 +-
.../server/state/ServiceComponentTest.java | 18 +-
.../apache/ambari/server/state/ServiceTest.java | 3 +-
.../state/alerts/AlertEventPublisherTest.java | 3 +-
.../server/state/cluster/ClusterImplTest.java | 3 +-
.../server/state/cluster/ClusterTest.java | 5 +-
.../server/testing/DBInconsistencyTests.java | 3 +-
59 files changed, 2036 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java b/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
new file mode 100644
index 0000000..9e7b3a7
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/HostNotRegisteredException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server;
+
+/**
+ * Thrown when an attempt is made to retrieve host not registered in cluster.
+ */
+public class HostNotRegisteredException extends AmbariException {
+
+ /**
+ * Constructor
+ * @param sessionId sessionId of websocket message
+ */
+ public HostNotRegisteredException(String sessionId) {
+ super(String.format("Host with [%s] sessionId not registered", sessionId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
index 8c4eae8..d372844 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/actionmanager/ActionDBAccessorImpl.java
@@ -45,8 +45,10 @@ import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.internal.CalculatedStatus;
import org.apache.ambari.server.events.HostsRemovedEvent;
import org.apache.ambari.server.events.RequestFinishedEvent;
+import org.apache.ambari.server.events.RequestUpdateEvent;
import org.apache.ambari.server.events.TaskCreateEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
import org.apache.ambari.server.events.publishers.TaskEventPublisher;
import org.apache.ambari.server.orm.dao.ClusterDAO;
import org.apache.ambari.server.orm.dao.ExecutionCommandDAO;
@@ -67,6 +69,7 @@ import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.security.authorization.AuthorizationHelper;
import org.apache.ambari.server.state.Clusters;
import org.apache.ambari.server.state.Host;
+import org.apache.ambari.server.topology.TopologyManager;
import org.apache.ambari.server.utils.LoopBody;
import org.apache.ambari.server.utils.Parallel;
import org.apache.ambari.server.utils.ParallelLoopResult;
@@ -137,6 +140,12 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
@Inject
AuditLogger auditLogger;
+ @Inject
+ StateUpdateEventPublisher stateUpdateEventPublisher;
+
+ @Inject
+ TopologyManager topologyManager;
+
/**
* Cache for auditlog. It stores a {@link RequestDetails} object for every requests.
* {@link RequestDetails} contains the previous status of the request and a map for tasks.
@@ -360,6 +369,7 @@ public class ActionDBAccessorImpl implements ActionDBAccessor {
requestEntity.setClusterId(clusterId);
requestDAO.create(requestEntity);
+ stateUpdateEventPublisher.publish(new RequestUpdateEvent(requestEntity, hostRoleCommandDAO, topologyManager));
//TODO wire request to cluster
List<StageEntity> stageEntities = new ArrayList<>(request.getStages().size());
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
new file mode 100644
index 0000000..d9b595a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/AgentSessionManager.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.server.HostNotRegisteredException;
+import org.apache.ambari.server.state.Host;
+
+import com.google.inject.Singleton;
+
+@Singleton
+public class AgentSessionManager {
+ private static ConcurrentHashMap<String, Host> registeredHosts = new ConcurrentHashMap<>();
+
+ public void register(String sessionId, Host registeredHost) {
+ String existKey = registeredHosts.entrySet().stream()
+ .filter(e -> e.getValue().getHostName().equals(registeredHost.getHostName())).map(Map.Entry::getKey)
+ .findAny().orElse(null);
+ if (existKey != null) {
+ registeredHosts.remove(existKey);
+ }
+ registeredHosts.put(sessionId, registeredHost);
+ }
+
+ public Host getHost(String sessionId) throws HostNotRegisteredException {
+ if (registeredHosts.containsKey(sessionId)) {
+ return registeredHosts.get(sessionId);
+ }
+ throw new HostNotRegisteredException(sessionId);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
index 3601528..67ec87d 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartBeatHandler.java
@@ -22,18 +22,25 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.HostNotFoundException;
import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
+import org.apache.ambari.server.agent.stomp.dto.TopologyComponent;
+import org.apache.ambari.server.agent.stomp.dto.TopologyHost;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.configuration.Configuration;
+import org.apache.ambari.server.events.TopologyUpdateEvent;
import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileReader;
import org.apache.ambari.server.serveraction.kerberos.KerberosIdentityDataFileReaderFactory;
import org.apache.ambari.server.serveraction.kerberos.KerberosServerAction;
@@ -44,6 +51,8 @@ import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.ConfigHelper;
import org.apache.ambari.server.state.Host;
import org.apache.ambari.server.state.HostState;
+import org.apache.ambari.server.state.Service;
+import org.apache.ambari.server.state.ServiceComponent;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceInfo;
import org.apache.ambari.server.state.StackId;
@@ -260,7 +269,9 @@ public class HeartBeatHandler {
return response;
}
-
+ public void handleComponentReportStatus(List<ComponentStatus> componentStatuses, String hostname) throws AmbariException {
+ heartbeatProcessor.processStatusReports(componentStatuses, hostname);
+ }
protected void processRecoveryReport(RecoveryReport recoveryReport, String hostname) throws AmbariException {
LOG.debug("Received recovery report: " + recoveryReport.toString());
@@ -470,6 +481,59 @@ public class HeartBeatHandler {
}
/**
+ * Is used during agent registering to provide base info about clusters topology.
+ * @return filled TopologyUpdateEvent with info about all components and hosts in all clusters
+ * @throws InvalidStateTransitionException
+ * @throws AmbariException
+ */
+ //TODO need move to better place
+ public TopologyUpdateEvent getInitialClusterTopology()
+ throws InvalidStateTransitionException, AmbariException {
+ Map<String, TopologyCluster> topologyClusters = new HashMap<>();
+ for (Cluster cl : clusterFsm.getClusters().values()) {
+ Collection<Host> clusterHosts = cl.getHosts();
+ Set<TopologyComponent> topologyComponents = new HashSet<>();
+ Set<TopologyHost> topologyHosts = new HashSet<>();
+ for (Host host : clusterHosts) {
+ topologyHosts.add(new TopologyHost(host.getHostId(), host.getHostName(),
+ host.getRackInfo(), host.getIPv4()));
+ }
+ for (Service service : cl.getServices().values()) {
+ for (ServiceComponent component : service.getServiceComponents().values()) {
+ Map<String, ServiceComponentHost> componentsMap = component.getServiceComponentHosts();
+ if (!componentsMap.isEmpty()) {
+
+ //TODO will be a need to change to multi-instance usage
+ ServiceComponentHost sch = componentsMap.entrySet().iterator().next().getValue();
+
+ Set<String> hostNames = cl.getHosts(sch.getServiceName(), sch.getServiceComponentName());
+ Set<Long> hostOrderIds = clusterHosts.stream()
+ .filter(h -> hostNames.contains(h.getHostName()))
+ .map(h -> h.getHostId()).collect(Collectors.toSet());
+ String serviceName = sch.getServiceName();
+ String componentName = sch.getServiceComponentName();
+ StackId stackId = cl.getDesiredStackVersion();
+
+ TopologyComponent topologyComponent = TopologyComponent.newBuilder()
+ .setComponentName(sch.getServiceComponentName())
+ .setServiceName(sch.getServiceName())
+ .setVersion(sch.getVersion())
+ .setHostIds(hostOrderIds)
+ .setStatusCommandParams(ambariMetaInfo.getStatusCommandParams(stackId, serviceName, componentName))
+ .build();
+ topologyComponents.add(topologyComponent);
+ }
+ }
+ }
+ topologyClusters.put(Long.toString(cl.getClusterId()),
+ new TopologyCluster(topologyComponents, topologyHosts));
+ }
+ TopologyUpdateEvent topologyUpdateEvent = new TopologyUpdateEvent(topologyClusters,
+ TopologyUpdateEvent.EventType.UPDATE);
+ return topologyUpdateEvent;
+ }
+
+ /**
* Annotate the response with some housekeeping details.
* hasMappedComponents - indicates if any components are mapped to the host
* hasPendingTasks - indicates if any tasks are pending for the host (they may not be sent yet)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
index c1028dc..47109df 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/HeartbeatProcessor.java
@@ -583,10 +583,17 @@ public class HeartbeatProcessor extends AbstractService{
* @throws AmbariException
*/
protected void processStatusReports(HeartBeat heartbeat) throws AmbariException {
- String hostname = heartbeat.getHostname();
+ processStatusReports(heartbeat.getComponentStatus(), heartbeat.getHostname());
+ }
+
+ /**
+ * Process reports of status commands
+ * @throws AmbariException
+ */
+ public void processStatusReports(List<ComponentStatus> componentStatuses, String hostname) throws AmbariException {
Set<Cluster> clusters = clusterFsm.getClustersForHost(hostname);
for (Cluster cl : clusters) {
- for (ComponentStatus status : heartbeat.componentStatus) {
+ for (ComponentStatus status : componentStatuses) {
if (status.getClusterName().equals(cl.getClusterName())) {
try {
Service svc = cl.getService(status.getServiceName());
@@ -597,22 +604,24 @@ public class HeartbeatProcessor extends AbstractService{
componentName);
ServiceComponentHost scHost = svcComp.getServiceComponentHost(
hostname);
- org.apache.ambari.server.state.State prevState = scHost.getState();
- org.apache.ambari.server.state.State liveState =
- org.apache.ambari.server.state.State.valueOf(org.apache.ambari.server.state.State.class,
- status.getStatus());
- //ignore reports from status commands if component is in INIT or any "in progress" state
- if (prevState.equals(org.apache.ambari.server.state.State.INSTALLED)
- || prevState.equals(org.apache.ambari.server.state.State.STARTED)
- || prevState.equals(org.apache.ambari.server.state.State.UNKNOWN)) {
- scHost.setState(liveState);
- if (!prevState.equals(liveState)) {
- LOG.info("State of service component " + componentName
- + " of service " + status.getServiceName()
- + " of cluster " + status.getClusterName()
- + " has changed from " + prevState + " to " + liveState
- + " at host " + hostname
- + " according to STATUS_COMMAND report");
+ if (status.getStatus() != null) {
+ org.apache.ambari.server.state.State prevState = scHost.getState();
+ org.apache.ambari.server.state.State liveState =
+ org.apache.ambari.server.state.State.valueOf(org.apache.ambari.server.state.State.class,
+ status.getStatus());
+ //ignore reports from status commands if component is in INIT or any "in progress" state
+ if (prevState.equals(org.apache.ambari.server.state.State.INSTALLED)
+ || prevState.equals(org.apache.ambari.server.state.State.STARTED)
+ || prevState.equals(org.apache.ambari.server.state.State.UNKNOWN)) {
+ scHost.setState(liveState);
+ if (!prevState.equals(liveState)) {
+ LOG.info("State of service component " + componentName
+ + " of service " + status.getServiceName()
+ + " of cluster " + status.getClusterName()
+ + " has changed from " + prevState + " to " + liveState
+ + " at host " + hostname
+ + " according to STATUS_COMMAND report");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
new file mode 100644
index 0000000..68b7f3b
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/AgentReportsController.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent.stomp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.ws.rs.WebApplicationException;
+
+import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.AgentSessionManager;
+import org.apache.ambari.server.agent.ComponentStatus;
+import org.apache.ambari.server.agent.HeartBeatHandler;
+import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReport;
+import org.apache.ambari.server.agent.stomp.dto.ComponentStatusReports;
+import org.apache.ambari.server.state.cluster.ClustersImpl;
+import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.MessageMapping;
+import org.springframework.messaging.simp.annotation.SendToUser;
+import org.springframework.messaging.simp.annotation.SubscribeMapping;
+import org.springframework.stereotype.Controller;
+
+import com.google.inject.Injector;
+
+@Controller
+@SendToUser("/")
+@MessageMapping("/reports")
+public class AgentReportsController {
+ private static Log LOG = LogFactory.getLog(AgentReportsController.class);
+ private final HeartBeatHandler hh;
+ private final ClustersImpl clusters;
+ private final AgentSessionManager agentSessionManager;
+
+ public AgentReportsController(Injector injector) {
+ hh = injector.getInstance(HeartBeatHandler.class);
+ clusters = injector.getInstance(ClustersImpl.class);
+ agentSessionManager = injector.getInstance(AgentSessionManager.class);
+ }
+
+ @SubscribeMapping("/component_status")
+ public void handleComponentReportStatus(@Header String simpSessionId, ComponentStatusReports message)
+ throws WebApplicationException, InvalidStateTransitionException, AmbariException {
+ List<ComponentStatus> statuses = new ArrayList<>();
+ for (Map.Entry<String, List<ComponentStatusReport>> clusterReport : message.getComponentStatusReports().entrySet()) {
+ for (ComponentStatusReport report : clusterReport.getValue()) {
+ ComponentStatus componentStatus = new ComponentStatus();
+ componentStatus.setClusterName(clusters.getCluster(report.getClusterId()).getClusterName());
+ componentStatus.setComponentName(report.getComponentName());
+ componentStatus.setServiceName(report.getServiceName());
+ if (report.getCommand().equals(ComponentStatusReport.CommandStatusCommand.STATUS)) {
+ componentStatus.setStatus(report.getStatus().toString());
+ } else {
+ componentStatus.setSecurityState(report.getStatus().toString());
+ }
+ statuses.add(componentStatus);
+ }
+ }
+
+ hh.handleComponentReportStatus(statuses,
+ agentSessionManager.getHost(simpSessionId).getHostName());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java
index 510f200..b56449e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/HeartbeatController.java
@@ -20,15 +20,19 @@ package org.apache.ambari.server.agent.stomp;
import javax.ws.rs.WebApplicationException;
import org.apache.ambari.server.AmbariException;
+import org.apache.ambari.server.agent.AgentSessionManager;
import org.apache.ambari.server.agent.HeartBeat;
import org.apache.ambari.server.agent.HeartBeatHandler;
import org.apache.ambari.server.agent.HeartBeatResponse;
import org.apache.ambari.server.agent.Register;
import org.apache.ambari.server.agent.RegistrationResponse;
import org.apache.ambari.server.agent.RegistrationStatus;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
+import org.apache.ambari.server.state.cluster.ClustersImpl;
import org.apache.ambari.server.state.fsm.InvalidStateTransitionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.annotation.SendToUser;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
@@ -42,14 +46,21 @@ import com.google.inject.Injector;
public class HeartbeatController {
private static Log LOG = LogFactory.getLog(HeartbeatController.class);
private final HeartBeatHandler hh;
+ private final ClustersImpl clusters;
+ private final AgentSessionManager agentSessionManager;
+ private final StateUpdateEventPublisher stateUpdateEventPublisher;
public HeartbeatController(Injector injector) {
hh = injector.getInstance(HeartBeatHandler.class);
+ clusters = injector.getInstance(ClustersImpl.class);
+ agentSessionManager = injector.getInstance(AgentSessionManager.class);
+ stateUpdateEventPublisher = injector.getInstance(StateUpdateEventPublisher.class);
}
@SubscribeMapping("/register")
- public RegistrationResponse register(Register message)
- throws WebApplicationException, InvalidStateTransitionException {
+ public RegistrationResponse register(@Header String simpSessionId, Register message)
+ throws WebApplicationException, InvalidStateTransitionException, AmbariException {
+
/* Call into the heartbeat handler */
RegistrationResponse response = null;
@@ -64,6 +75,7 @@ public class HeartbeatController {
response.setLog(ex.getMessage());
return response;
}
+ stateUpdateEventPublisher.publish(hh.getInitialClusterTopology());
return response;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentStatusReport.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentStatusReport.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentStatusReport.java
new file mode 100644
index 0000000..b430510
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentStatusReport.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp.dto;
+
+public class ComponentStatusReport {
+ private String componentName;
+ private CommandStatusCommand command;
+ private String status;
+ private String serviceName;
+ private Long clusterId;
+
+ public ComponentStatusReport() {
+ }
+
+ public ComponentStatusReport(String componentName, CommandStatusCommand command, String status, String serviceName, Long clusterId) {
+ this.componentName = componentName;
+ this.command = command;
+ this.status = status;
+ this.serviceName = serviceName;
+ this.clusterId = clusterId;
+ }
+
+ public String getComponentName() {
+ return componentName;
+ }
+
+ public void setComponentName(String componentName) {
+ this.componentName = componentName;
+ }
+
+ public CommandStatusCommand getCommand() {
+ return command;
+ }
+
+ public void setCommand(CommandStatusCommand command) {
+ this.command = command;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public Long getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(Long clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public enum CommandStatusCommand {
+ STATUS,
+ SECURITY_STATUS
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentStatusReports.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentStatusReports.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentStatusReports.java
new file mode 100644
index 0000000..2a3e4bc
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/ComponentStatusReports.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp.dto;
+
+import java.util.List;
+import java.util.Map;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class ComponentStatusReports {
+
+ @JsonProperty("clustersReports")
+ private Map<String, List<ComponentStatusReport>> componentStatusReports;
+
+ public ComponentStatusReports() {
+ }
+
+ public ComponentStatusReports(Map<String, List<ComponentStatusReport>> componentStatusReports) {
+ this.componentStatusReports = componentStatusReports;
+ }
+
+ public Map<String, List<ComponentStatusReport>> getComponentStatusReports() {
+ return componentStatusReports;
+ }
+
+ public void setComponentStatusReports(Map<String, List<ComponentStatusReport>> componentStatusReports) {
+ this.componentStatusReports = componentStatusReports;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java
new file mode 100644
index 0000000..4ddb8a6
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyCluster.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent.stomp.dto;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+@JsonInclude(JsonInclude.Include.NON_EMPTY)
+public class TopologyCluster {
+ private Set<TopologyComponent> topologyComponents = new HashSet<>();
+ private Set<TopologyHost> topologyHosts = new HashSet<>();
+
+ public TopologyCluster() {
+ }
+
+ public TopologyCluster(Set<TopologyComponent> topologyComponents, Set<TopologyHost> topologyHosts) {
+ this.topologyComponents = topologyComponents;
+ this.topologyHosts = topologyHosts;
+ }
+
+ public Set<TopologyComponent> getTopologyComponents() {
+ return topologyComponents;
+ }
+
+ public void setTopologyComponents(Set<TopologyComponent> topologyComponents) {
+ this.topologyComponents = topologyComponents;
+ }
+
+ public Set<TopologyHost> getTopologyHosts() {
+ return topologyHosts;
+ }
+
+ public void setTopologyHosts(Set<TopologyHost> topologyHosts) {
+ this.topologyHosts = topologyHosts;
+ }
+
+ public void addTopologyHost(TopologyHost topologyHost) {
+ topologyHosts.add(topologyHost);
+ }
+
+ public void addTopologyComponent(TopologyComponent topologyComponent) {
+ topologyComponents.add(topologyComponent);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java
new file mode 100644
index 0000000..019c933
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyComponent.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.agent.stomp.dto;
+
+import java.util.Set;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class TopologyComponent {
+ private String componentName;
+ private String serviceName;
+ private String version;
+ private Set<Long> hostIds;
+ private TopologyStatusCommandParams statusCommandParams;
+
+ private TopologyComponent() {
+ }
+
+ public static Builder newBuilder() {
+ return new TopologyComponent().new Builder();
+ }
+
+ public class Builder {
+ private Builder() {
+
+ }
+
+ public Builder setComponentName(String componentName) {
+ TopologyComponent.this.setComponentName(componentName);
+ return this;
+ }
+
+ public Builder setServiceName(String serviceName) {
+ TopologyComponent.this.setServiceName(serviceName);
+ return this;
+ }
+
+ public Builder setVersion(String version) {
+ TopologyComponent.this.setVersion(version);
+ return this;
+ }
+
+ public Builder setHostIds(Set<Long> hostIds) {
+ TopologyComponent.this.setHostIds(hostIds);
+ return this;
+ }
+
+ public Builder setStatusCommandParams(TopologyStatusCommandParams statusCommandParams) {
+ TopologyComponent.this.setStatusCommandParams(statusCommandParams);
+ return this;
+ }
+
+ public TopologyComponent build() {
+ return TopologyComponent.this;
+ }
+ }
+
+ public String getComponentName() {
+ return componentName;
+ }
+
+ public void setComponentName(String componentName) {
+ this.componentName = componentName;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public void setServiceName(String serviceName) {
+ this.serviceName = serviceName;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public Set<Long> getHostIds() {
+ return hostIds;
+ }
+
+ public void setHostIds(Set<Long> hostIds) {
+ this.hostIds = hostIds;
+ }
+
+ public void addHostId(Long hostId) {
+ this.hostIds.add(hostId);
+ }
+
+ public TopologyStatusCommandParams getStatusCommandParams() {
+ return statusCommandParams;
+ }
+
+ public void setStatusCommandParams(TopologyStatusCommandParams statusCommandParams) {
+ this.statusCommandParams = statusCommandParams;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TopologyComponent that = (TopologyComponent) o;
+
+ if (!componentName.equals(that.componentName)) return false;
+ return version.equals(that.version);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = componentName.hashCode();
+ result = 31 * result + version.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java
new file mode 100644
index 0000000..1718711
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyHost.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent.stomp.dto;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class TopologyHost {
+ private Long hostId;
+ private String hostName;
+ private String rackName;
+ private String ipv4;
+
+ public TopologyHost() {
+ }
+
+ public TopologyHost(Long hostId) {
+ this.hostId = hostId;
+ }
+
+ public TopologyHost(Long hostId, String hostName, String rackName, String ipv4) {
+ this.hostId = hostId;
+ this.hostName = hostName;
+ this.rackName = rackName;
+ this.ipv4 = ipv4;
+ }
+
+ public Long getHostId() {
+ return hostId;
+ }
+
+ public void setHostId(Long hostId) {
+ this.hostId = hostId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public String getRackName() {
+ return rackName;
+ }
+
+ public void setRackName(String rackName) {
+ this.rackName = rackName;
+ }
+
+ public String getIpv4() {
+ return ipv4;
+ }
+
+ public void setIpv4(String ipv4) {
+ this.ipv4 = ipv4;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ TopologyHost that = (TopologyHost) o;
+
+ return hostId.equals(that.hostId);
+ }
+
+ @Override
+ public int hashCode() {
+ return hostId.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyStatusCommandParams.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyStatusCommandParams.java b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyStatusCommandParams.java
new file mode 100644
index 0000000..1574066
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/agent/stomp/dto/TopologyStatusCommandParams.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.agent.stomp.dto;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class TopologyStatusCommandParams {
+ private String script;
+ private String servicePackageFolder;
+
+ public TopologyStatusCommandParams(String script, String servicePackageFolder) {
+ this.script = script;
+ this.servicePackageFolder = servicePackageFolder;
+ }
+
+ public String getScript() {
+ return script;
+ }
+
+ public void setScript(String script) {
+ this.script = script;
+ }
+
+ public String getServicePackageFolder() {
+ return servicePackageFolder;
+ }
+
+ public void setServicePackageFolder(String servicePackageFolder) {
+ this.servicePackageFolder = servicePackageFolder;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
index c655c62..4887d52 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/api/services/AmbariMetaInfo.java
@@ -43,6 +43,7 @@ import javax.xml.bind.JAXBException;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.ParentObjectNotFoundException;
import org.apache.ambari.server.StackAccessException;
+import org.apache.ambari.server.agent.stomp.dto.TopologyStatusCommandParams;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.controller.RootServiceResponseFactory.Services;
import org.apache.ambari.server.controller.utilities.PropertyHelper;
@@ -61,6 +62,7 @@ import org.apache.ambari.server.stack.StackManager;
import org.apache.ambari.server.stack.StackManagerFactory;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
+import org.apache.ambari.server.state.CommandScriptDefinition;
import org.apache.ambari.server.state.ComponentInfo;
import org.apache.ambari.server.state.DependencyInfo;
import org.apache.ambari.server.state.ExtensionInfo;
@@ -1486,5 +1488,28 @@ public class AmbariMetaInfo {
return versionDefinitions;
}
+ //TODO will be a need to change to multi-instance usage
+ public TopologyStatusCommandParams getStatusCommandParams(StackId stackId, String serviceName, String componentName) throws AmbariException {
+ ServiceInfo serviceInfo = getService(stackId.getStackName(),
+ stackId.getStackVersion(), serviceName);
+ ComponentInfo componentInfo = getComponent(
+ stackId.getStackName(), stackId.getStackVersion(),
+ serviceName, componentName);
+
+ String scriptName = null;
+ CommandScriptDefinition script = componentInfo.getCommandScript();
+ if (serviceInfo.getSchemaVersion().equals(AmbariMetaInfo.SCHEMA_VERSION_2)) {
+ if (script != null) {
+ scriptName = script.getScript();
+ } else {
+ String message = String.format("Component %s of service %s has not " +
+ "command script defined", componentName, serviceName);
+ throw new AmbariException(message);
+ }
+ }
+ String servicePackageFolder = serviceInfo.getServicePackageFolder();
+ return new TopologyStatusCommandParams(scriptName, servicePackageFolder);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
index f248e17..0d8a08b 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/ApiStompConfig.java
@@ -18,8 +18,10 @@
package org.apache.ambari.server.configuration.spring;
import org.apache.ambari.server.api.stomp.TestController;
+import org.apache.ambari.server.events.listeners.requests.StateUpdateListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
@@ -27,6 +29,8 @@ import org.springframework.web.socket.config.annotation.AbstractWebSocketMessage
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
+import com.google.inject.Injector;
+
@Configuration
@EnableWebSocketMessageBroker
@ComponentScan(basePackageClasses = {TestController.class})
@@ -34,6 +38,11 @@ import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
public class ApiStompConfig extends AbstractWebSocketMessageBrokerConfigurer {
private static final Logger LOG = LoggerFactory.getLogger(ApiStompConfig.class);
+ @Bean
+ public StateUpdateListener requestStatusListener(Injector injector) {
+ return new StateUpdateListener(injector);
+ }
+
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/v1")
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java
index 52e5a98..50c3aba 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/spring/GuiceBeansConfig.java
@@ -23,13 +23,11 @@ import org.apache.ambari.server.security.authorization.AmbariLocalUserProvider;
import org.apache.ambari.server.security.authorization.AmbariPamAuthenticationProvider;
import org.apache.ambari.server.security.authorization.AmbariUserAuthorizationFilter;
import org.apache.ambari.server.security.authorization.PermissionHelper;
-import org.apache.ambari.server.security.authorization.Users;
import org.apache.ambari.server.security.authorization.internal.AmbariInternalAuthenticationProvider;
import org.apache.ambari.server.security.ldap.AmbariLdapDataPopulator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.crypto.password.PasswordEncoder;
import com.google.inject.Injector;
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
index 751ce08..52497f2 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AmbariManagementControllerImpl.java
@@ -65,6 +65,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import javax.persistence.RollbackException;
@@ -88,10 +89,13 @@ import org.apache.ambari.server.actionmanager.Stage;
import org.apache.ambari.server.actionmanager.StageFactory;
import org.apache.ambari.server.agent.ExecutionCommand;
import org.apache.ambari.server.agent.ExecutionCommand.KeyNames;
+import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
+import org.apache.ambari.server.agent.stomp.dto.TopologyComponent;
import org.apache.ambari.server.api.services.AmbariMetaInfo;
import org.apache.ambari.server.api.services.LoggingService;
import org.apache.ambari.server.configuration.Configuration;
import org.apache.ambari.server.configuration.Configuration.DatabaseType;
+import org.apache.ambari.server.controller.internal.DeleteHostComponentStatusMetaData;
import org.apache.ambari.server.controller.internal.DeleteStatusMetaData;
import org.apache.ambari.server.controller.internal.RequestOperationLevel;
import org.apache.ambari.server.controller.internal.RequestResourceFilter;
@@ -105,7 +109,9 @@ import org.apache.ambari.server.controller.metrics.MetricsCollectorHAManager;
import org.apache.ambari.server.controller.metrics.timeline.cache.TimelineMetricCacheProvider;
import org.apache.ambari.server.controller.spi.Resource;
import org.apache.ambari.server.customactions.ActionDefinition;
+import org.apache.ambari.server.events.TopologyUpdateEvent;
import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
import org.apache.ambari.server.metadata.ActionMetadata;
import org.apache.ambari.server.metadata.RoleCommandOrder;
import org.apache.ambari.server.metadata.RoleCommandOrderProvider;
@@ -200,6 +206,7 @@ import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStartEvent
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostStopEvent;
import org.apache.ambari.server.state.svccomphost.ServiceComponentHostUpgradeEvent;
import org.apache.ambari.server.topology.Setting;
+import org.apache.ambari.server.topology.TopologyDeleteFormer;
import org.apache.ambari.server.utils.SecretReference;
import org.apache.ambari.server.utils.StageUtils;
import org.apache.commons.collections.CollectionUtils;
@@ -316,6 +323,12 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
@Inject
private StackDAO stackDAO;
+ @Inject
+ private StateUpdateEventPublisher stateUpdateEventPublisher;
+
+ @Inject
+ TopologyDeleteFormer topologyDeleteFormer;
+
/**
* The KerberosHelper to help setup for enabling for disabling Kerberos
*/
@@ -676,6 +689,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
setMonitoringServicesRestartRequired(requests);
// now doing actual work
persistServiceComponentHosts(requests);
+ stateUpdateEventPublisher.publish(getAddedComponentsTopologyEvent(requests));
}
void persistServiceComponentHosts(Set<ServiceComponentHostRequest> requests)
@@ -707,6 +721,40 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
}
}
+ private TopologyUpdateEvent getAddedComponentsTopologyEvent(Set<ServiceComponentHostRequest> requests)
+ throws AmbariException {
+ Map<String, TopologyCluster> topologyUpdates = new HashMap<>();
+ for (ServiceComponentHostRequest request : requests) {
+ String serviceName = request.getServiceName();
+ String componentName = request.getComponentName();
+ Cluster cluster = clusters.getCluster(request.getClusterName());
+ Collection<Host> clusterHosts = cluster.getHosts();
+ Service s = cluster.getService(serviceName);
+ ServiceComponent sc = s.getServiceComponent(componentName);
+ Set<String> hostNames = cluster.getHosts(serviceName, componentName);
+ Set<Long> hostIds = clusterHosts.stream()
+ .filter(h -> hostNames.contains(h.getHostName()))
+ .map(h -> h.getHostId()).collect(Collectors.toSet());
+ ServiceComponentHost sch = sc.getServiceComponentHost(request.getHostname());
+
+ StackId stackId = cluster.getDesiredStackVersion();
+
+ TopologyComponent newComponent = TopologyComponent.newBuilder()
+ .setComponentName(sch.getServiceComponentName())
+ .setServiceName(sch.getServiceName())
+ .setVersion(sch.getVersion())
+ .setHostIds(hostIds)
+ .setStatusCommandParams(ambariMetaInfo.getStatusCommandParams(stackId, serviceName, componentName))
+ .build();
+ String clusterId = Long.toString(cluster.getClusterId());
+ if (!topologyUpdates.containsKey(clusterId)) {
+ topologyUpdates.put(clusterId, new TopologyCluster());
+ }
+ topologyUpdates.get(clusterId).addTopologyComponent(newComponent);
+ }
+ return new TopologyUpdateEvent(topologyUpdates, TopologyUpdateEvent.EventType.ADD);
+ }
+
private void setMonitoringServicesRestartRequired(
Set<ServiceComponentHostRequest> requests) throws AmbariException {
@@ -3434,7 +3482,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
}
Map<ServiceComponent, Set<ServiceComponentHost>> safeToRemoveSCHs = new HashMap<>();
- DeleteStatusMetaData deleteStatusMetaData = new DeleteStatusMetaData();
+ DeleteHostComponentStatusMetaData deleteMetaData = new DeleteHostComponentStatusMetaData();
for (ServiceComponentHostRequest request : expanded) {
@@ -3465,29 +3513,28 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
}
safeToRemoveSCHs.get(component).add(componentHost);
} catch (Exception ex) {
- deleteStatusMetaData.addException(request.getHostname() + "/" + request.getComponentName(), ex);
+ deleteMetaData.addException(request.getHostname() + "/" + request.getComponentName(), ex);
}
}
for (Entry<ServiceComponent, Set<ServiceComponentHost>> entry : safeToRemoveSCHs.entrySet()) {
for (ServiceComponentHost componentHost : entry.getValue()) {
try {
- deleteHostComponent(entry.getKey(), componentHost);
- deleteStatusMetaData.addDeletedKey(componentHost.getHostName() + "/" + componentHost.getServiceComponentName());
-
+ deleteHostComponent(entry.getKey(), componentHost, deleteMetaData);
} catch (Exception ex) {
- deleteStatusMetaData.addException(componentHost.getHostName() + "/" + componentHost.getServiceComponentName(), ex);
+ deleteMetaData.addException(componentHost.getHostName() + "/" + componentHost.getServiceComponentName(), ex);
}
}
}
//Do not break behavior for existing clients where delete request contains only 1 host component.
//Response for these requests will have empty body with appropriate error code.
- if (deleteStatusMetaData.getDeletedKeys().size() + deleteStatusMetaData.getExceptionForKeys().size() == 1) {
- if (deleteStatusMetaData.getDeletedKeys().size() == 1) {
+ if (deleteMetaData.getDeletedKeys().size() + deleteMetaData.getExceptionForKeys().size() == 1) {
+ if (deleteMetaData.getDeletedKeys().size() == 1) {
+ topologyDeleteFormer.processDeleteMetaData(deleteMetaData);
return null;
}
- Exception ex = deleteStatusMetaData.getExceptionForKeys().values().iterator().next();
+ Exception ex = deleteMetaData.getExceptionForKeys().values().iterator().next();
if (ex instanceof AmbariException) {
throw (AmbariException)ex;
} else {
@@ -3499,10 +3546,12 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
if (!safeToRemoveSCHs.isEmpty()) {
setMonitoringServicesRestartRequired(requests);
}
- return deleteStatusMetaData;
+ topologyDeleteFormer.processDeleteMetaData(deleteMetaData);
+ return deleteMetaData;
}
- private void deleteHostComponent(ServiceComponent serviceComponent, ServiceComponentHost componentHost) throws AmbariException {
+ private void deleteHostComponent(ServiceComponent serviceComponent, ServiceComponentHost componentHost,
+ DeleteHostComponentStatusMetaData deleteMetaData) throws AmbariException {
String included_hostname = componentHost.getHostName();
String serviceName = serviceComponent.getServiceName();
String master_component_name = null;
@@ -3510,7 +3559,7 @@ public class AmbariManagementControllerImpl implements AmbariManagementControlle
HostComponentAdminState desiredAdminState = componentHost.getComponentAdminState();
State slaveState = componentHost.getState();
//Delete hostcomponents
- serviceComponent.deleteServiceComponentHosts(componentHost.getHostName());
+ serviceComponent.deleteServiceComponentHosts(componentHost.getHostName(), deleteMetaData);
// If deleted hostcomponents support decomission and were decommited and stopped
if (AmbariCustomCommandExecutionHelper.masterToSlaveMappingForDecom.containsValue(slave_component_name)
&& desiredAdminState.equals(HostComponentAdminState.DECOMMISSIONED)
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
index 3c0164c..081737a 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/CalculatedStatus.java
@@ -32,10 +32,13 @@ import org.apache.ambari.server.actionmanager.HostRoleStatus;
import org.apache.ambari.server.actionmanager.Request;
import org.apache.ambari.server.actionmanager.Stage;
import org.apache.ambari.server.events.listeners.tasks.TaskStatusListener;
+import org.apache.ambari.server.orm.dao.HostRoleCommandDAO;
import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.orm.entities.StageEntityPK;
+import org.apache.ambari.server.topology.LogicalRequest;
+import org.apache.ambari.server.topology.TopologyManager;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
@@ -424,6 +427,36 @@ public class CalculatedStatus {
}
/**
+ * Calculates the status for specified request by id.
+ * @param s_hostRoleCommandDAO is used to retrieve the map of stage-to-summary value objects
+ * @param topologyManager topology manager
+ * @param requestId the request id
+ * @return the calculated status
+ */
+ public static CalculatedStatus statusFromRequest(HostRoleCommandDAO s_hostRoleCommandDAO,
+ TopologyManager topologyManager, Long requestId) {
+ Map<Long, HostRoleCommandStatusSummaryDTO> summary = s_hostRoleCommandDAO.findAggregateCounts(requestId);
+
+ // get summaries from TopologyManager for logical requests
+ summary.putAll(topologyManager.getStageSummaries(requestId));
+
+ // summary might be empty due to delete host have cleared all
+ // HostRoleCommands or due to hosts haven't registered yet with the cluster
+ // when the cluster is provisioned with a Blueprint
+ LogicalRequest logicalRequest = topologyManager.getRequest(requestId);
+ if (summary.isEmpty() && null != logicalRequest) {
+ // in this case, it appears that there are no tasks but this is a logical
+ // topology request, so it's a matter of hosts simply not registering yet
+ // for tasks to be created
+ return CalculatedStatus.PENDING;
+ } else {
+ // there are either tasks or this is not a logical request, so do normal
+ // status calculations
+ return CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
+ }
+ }
+
+ /**
* Calculates the overall status of an upgrade. If there are no tasks, then a
* status of {@link HostRoleStatus#COMPLETED} is returned.
*
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
index 65cfcaa..5a56919 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ComponentResourceProvider.java
@@ -61,9 +61,11 @@ import org.apache.ambari.server.state.ServiceComponentFactory;
import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.topology.TopologyDeleteFormer;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
+import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.google.inject.persist.Transactional;
@@ -103,6 +105,9 @@ public class ComponentResourceProvider extends AbstractControllerResourceProvide
private MaintenanceStateHelper maintenanceStateHelper;
+ @Inject
+ private TopologyDeleteFormer topologyDeleteFormer;
+
// ----- Constructors ----------------------------------------------------
/**
@@ -709,6 +714,7 @@ public class ComponentResourceProvider extends AbstractControllerResourceProvide
protected RequestStatusResponse deleteComponents(Set<ServiceComponentRequest> requests) throws AmbariException, AuthorizationException {
Clusters clusters = getManagementController().getClusters();
AmbariMetaInfo ambariMetaInfo = getManagementController().getAmbariMetaInfo();
+ DeleteHostComponentStatusMetaData deleteMetaData = new DeleteHostComponentStatusMetaData();
for (ServiceComponentRequest request : requests) {
Validate.notEmpty(request.getComponentName(), "component name should be non-empty");
@@ -721,28 +727,32 @@ public class ComponentResourceProvider extends AbstractControllerResourceProvide
ServiceComponent sc = s.getServiceComponent(request.getComponentName());
if (sc != null) {
- deleteHostComponentsForServiceComponent(sc, request);
+ deleteHostComponentsForServiceComponent(sc, request, deleteMetaData);
+ topologyDeleteFormer.processDeleteMetaDataException(deleteMetaData);
sc.setDesiredState(State.DISABLED);
- s.deleteServiceComponent(request.getComponentName());
+ s.deleteServiceComponent(request.getComponentName(), deleteMetaData);
+ topologyDeleteFormer.processDeleteMetaDataException(deleteMetaData);
}
}
+ topologyDeleteFormer.processDeleteMetaData(deleteMetaData);
return null;
}
- private void deleteHostComponentsForServiceComponent(ServiceComponent sc, ServiceComponentRequest request) throws AmbariException {
+ private void deleteHostComponentsForServiceComponent(ServiceComponent sc, ServiceComponentRequest request,
+ DeleteHostComponentStatusMetaData deleteMetaData) throws AmbariException {
for (ServiceComponentHost sch : sc.getServiceComponentHosts().values()) {
if (!sch.getDesiredState().isRemovableState()) {
- throw new AmbariException("Found non removable host component when trying to delete service component." +
+ deleteMetaData.setAmbariException(new AmbariException("Found non removable host component when trying to delete service component." +
" To remove host component, it must be in DISABLED/INIT/INSTALLED/INSTALL_FAILED/UNKNOWN" +
"/UNINSTALLED/INSTALLING state."
+ ", request=" + request.toString()
- + ", current state=" + sc.getDesiredState() + ".");
-
+ + ", current state=" + sc.getDesiredState() + "."));
+ return;
}
}
for (ServiceComponentHost sch : sc.getServiceComponentHosts().values()) {
- sch.delete();
+ sch.delete(deleteMetaData);
}
}
private Cluster getClusterForRequest(final ServiceComponentRequest request, final Clusters clusters) throws AmbariException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/DeleteHostComponentStatusMetaData.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/DeleteHostComponentStatusMetaData.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/DeleteHostComponentStatusMetaData.java
new file mode 100644
index 0000000..1a4e34a
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/DeleteHostComponentStatusMetaData.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.controller.internal;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.apache.ambari.server.AmbariException;
+
+@NotThreadSafe
+public class DeleteHostComponentStatusMetaData extends DeleteStatusMetaData {
+ private Set<HostComponent> removedHostComponents;
+ private AmbariException ambariException;
+ public DeleteHostComponentStatusMetaData() {
+ removedHostComponents = new HashSet<>();
+ }
+
+ public void addDeletedHostComponent(String componentName, String hostName, Long hostId,
+ String clusterId, String version) {
+ removedHostComponents.add(new HostComponent(componentName, hostId, clusterId, version));
+ addDeletedKey(componentName + "/" + hostName);
+ }
+
+ public Set<HostComponent> getRemovedHostComponents() {
+ return removedHostComponents;
+ }
+
+ public AmbariException getAmbariException() {
+ return ambariException;
+ }
+
+ public void setAmbariException(AmbariException ambariException) {
+ this.ambariException = ambariException;
+ }
+
+ public class HostComponent {
+ private String componentName;
+ private Long hostId;
+ private String clusterId;
+ private String version;
+
+ public HostComponent(String componentName, Long hostId, String clusterId, String version) {
+ this.componentName = componentName;
+ this.hostId = hostId;
+ this.clusterId = clusterId;
+ this.version = version;
+ }
+
+ public String getComponentName() {
+ return componentName;
+ }
+
+ public void setComponentName(String componentName) {
+ this.componentName = componentName;
+ }
+
+ public Long getHostId() {
+ return hostId;
+ }
+
+ public void setHostName(Long hostId) {
+ this.hostId = hostId;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId) {
+ this.clusterId = clusterId;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
index f604a7f..1b16b5e 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostResourceProvider.java
@@ -35,6 +35,8 @@ import org.apache.ambari.server.DuplicateResourceException;
import org.apache.ambari.server.HostNotFoundException;
import org.apache.ambari.server.ObjectNotFoundException;
import org.apache.ambari.server.ParentObjectNotFoundException;
+import org.apache.ambari.server.agent.stomp.dto.TopologyCluster;
+import org.apache.ambari.server.agent.stomp.dto.TopologyHost;
import org.apache.ambari.server.controller.AmbariManagementController;
import org.apache.ambari.server.controller.ConfigurationRequest;
import org.apache.ambari.server.controller.HostRequest;
@@ -52,6 +54,8 @@ import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException;
import org.apache.ambari.server.controller.spi.SystemException;
import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
import org.apache.ambari.server.controller.utilities.PropertyHelper;
+import org.apache.ambari.server.events.TopologyUpdateEvent;
+import org.apache.ambari.server.events.publishers.StateUpdateEventPublisher;
import org.apache.ambari.server.security.authorization.AuthorizationException;
import org.apache.ambari.server.security.authorization.AuthorizationHelper;
import org.apache.ambari.server.security.authorization.ResourceType;
@@ -167,6 +171,9 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
@Inject
private static TopologyManager topologyManager;
+ @Inject
+ private StateUpdateEventPublisher stateUpdateEventPublisher;
+
// ----- Constructors ----------------------------------------------------
/**
@@ -508,6 +515,7 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
Map<String, Map<String, String>> hostAttributes = new HashMap<>();
Set<String> allClusterSet = new HashSet<>();
+ Map<String, TopologyCluster> addedTopologies = new HashMap<>();
for (HostRequest hostRequest : hostRequests) {
if (hostRequest.getHostname() != null &&
!hostRequest.getHostname().isEmpty() &&
@@ -521,6 +529,15 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
if (hostRequest.getHostAttributes() != null) {
hostAttributes.put(hostRequest.getHostname(), hostRequest.getHostAttributes());
}
+ String clusterId = Long.toString(clusters.getCluster(hostRequest.getClusterName()).getClusterId());
+ if (!addedTopologies.containsKey(clusterId)) {
+ addedTopologies.put(clusterId, new TopologyCluster());
+ }
+ Host addedHost = clusters.getHost(hostRequest.getHostname());
+ addedTopologies.get(clusterId).addTopologyHost(new TopologyHost(addedHost.getHostId(),
+ addedHost.getHostName(),
+ addedHost.getRackInfo(),
+ addedHost.getIPv4()));
}
}
clusters.updateHostWithClusterAndAttributes(hostClustersMap, hostAttributes);
@@ -528,6 +545,8 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
for (String clusterName : allClusterSet) {
clusters.getCluster(clusterName).recalculateAllClusterVersionStates();
}
+ TopologyUpdateEvent topologyUpdateEvent = new TopologyUpdateEvent(addedTopologies, TopologyUpdateEvent.EventType.ADD);
+ stateUpdateEventPublisher.publish(topologyUpdateEvent);
}
private void createHostResource(Clusters clusters, Set<String> duplicates,
@@ -704,12 +723,14 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
}
}
+ Map<String, TopologyCluster> topologyUpdates = new HashMap<>();
for (HostRequest request : requests) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received an updateHost request"
+ ", hostname=" + request.getHostname()
+ ", request=" + request);
}
+ TopologyHost topologyHost = new TopologyHost();
Host host = clusters.getHost(request.getHostname());
@@ -717,6 +738,7 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
Cluster cluster = clusters.getCluster(clusterName);
Long clusterId = cluster.getClusterId();
Long resourceId = cluster.getResourceId();
+ topologyHost.setHostId(host.getHostId());
try {
// The below method call throws an exception when trying to create a duplicate mapping in the clusterhostmapping
@@ -742,6 +764,7 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
throw new AuthorizationException("The authenticated user is not authorized to update host rack information");
}
host.setRackInfo(requestRackInfo);
+ topologyHost.setRackName(requestRackInfo);
}
if (null != request.getPublicHostName()) {
@@ -749,6 +772,7 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
throw new AuthorizationException("The authenticated user is not authorized to update host attributes");
}
host.setPublicHostName(request.getPublicHostName());
+ topologyHost.setHostName(request.getPublicHostName());
}
if (null != clusterName && null != request.getMaintenanceState()) {
@@ -811,6 +835,13 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
}
}
+ if (!topologyUpdates.containsKey(clusterId.toString())) {
+ topologyUpdates.put(clusterId.toString(), new TopologyCluster());
+ }
+ topologyUpdates.get(clusterId.toString()).addTopologyHost(topologyHost);
+ TopologyUpdateEvent topologyUpdateEvent = new TopologyUpdateEvent(topologyUpdates,
+ TopologyUpdateEvent.EventType.UPDATE);
+ stateUpdateEventPublisher.publish(topologyUpdateEvent);
//todo: if attempt was made to update a property other than those
//todo: that are allowed above, should throw exception
}
@@ -874,9 +905,11 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
Set<String> hostsClusters = new HashSet<>();
Set<String> hostNames = new HashSet<>();
Set<Cluster> allClustersWithHosts = new HashSet<>();
+ Map<String, TopologyCluster> topologyUpdates = new HashMap<>();
for (HostRequest hostRequest : requests) {
// Assume the user also wants to delete it entirely, including all clusters.
String hostname = hostRequest.getHostname();
+ Long hostId = clusters.getHost(hostname).getHostId();
hostNames.add(hostname);
if (hostRequest.getClusterName() != null) {
@@ -911,6 +944,24 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
for (String key : componentDeleteStatus.getDeletedKeys()) {
deleteStatusMetaData.addDeletedKey(key);
}
+ /*for (DeleteHostComponentStatusMetaData.HostComponent hostComponent : componentDeleteStatus.getRemovedHostComponents()) {
+ String clusterId = hostComponent.getClusterId();
+ if (!topologyUpdates.containsKey(clusterId)) {
+ topologyUpdates.put(clusterId, new TopologyCluster());
+ }
+ TopologyComponent deletedComponent = new TopologyComponent(hostComponent.getComponentName(),
+ null,
+ hostComponent.getVersion(),
+ new HashSet<>(Arrays.asList(hostId)),
+ null);
+ if (!topologyUpdates.get(clusterId).getTopologyComponents().contains(deletedComponent)) {
+ topologyUpdates.get(clusterId).addTopologyComponent(deletedComponent);
+ } else {
+ topologyUpdates.get(clusterId).getTopologyComponents()
+ .stream().filter(t -> t.equals(deletedComponent))
+ .forEach(t -> t.addHostId(hostId));
+ }
+ }*/
for (String key : componentDeleteStatus.getExceptionForKeys().keySet()) {
deleteStatusMetaData.addException(key, componentDeleteStatus.getExceptionForKeys().get(key));
}
@@ -920,7 +971,15 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
hostsClusters.add(hostRequest.getClusterName());
}
try {
+ Set<Cluster> hostClusters = new HashSet<>(clusters.getClustersForHost(hostname));
clusters.deleteHost(hostname);
+ for (Cluster cluster : hostClusters) {
+ String clusterId = Long.toString(cluster.getClusterId());
+ if (!topologyUpdates.containsKey(clusterId)) {
+ topologyUpdates.put(clusterId, new TopologyCluster());
+ }
+ topologyUpdates.get(clusterId).getTopologyHosts().add(new TopologyHost(hostId));
+ }
deleteStatusMetaData.addDeletedKey(hostname);
} catch (Exception ex) {
deleteStatusMetaData.addException(hostname, ex);
@@ -934,6 +993,9 @@ public class HostResourceProvider extends AbstractControllerResourceProvider {
for (String clustername : hostsClusters) {
clusters.getCluster(clustername).recalculateAllClusterVersionStates();
}
+ TopologyUpdateEvent topologyUpdateEvent = new TopologyUpdateEvent(topologyUpdates,
+ TopologyUpdateEvent.EventType.DELETE);
+ stateUpdateEventPublisher.publish(topologyUpdateEvent);
}
private void validateHostInDeleteFriendlyState(HostRequest hostRequest, Clusters clusters, boolean forceDelete) throws AmbariException {
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
index 9704b33..81dc340 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/RequestResourceProvider.java
@@ -67,7 +67,6 @@ import org.apache.ambari.server.security.authorization.ResourceType;
import org.apache.ambari.server.security.authorization.RoleAuthorization;
import org.apache.ambari.server.state.Cluster;
import org.apache.ambari.server.state.Clusters;
-import org.apache.ambari.server.topology.LogicalRequest;
import org.apache.ambari.server.topology.TopologyManager;
import org.apache.commons.lang.StringUtils;
@@ -738,32 +737,13 @@ public class RequestResourceProvider extends AbstractControllerResourceProvider
setResourceProperty(resource, REQUEST_SOURCE_SCHEDULE, null, requestedPropertyIds);
}
-
- Map<Long, HostRoleCommandStatusSummaryDTO> summary = s_hostRoleCommandDAO.findAggregateCounts(entity.getRequestId());
-
- // get summaries from TopologyManager for logical requests
- summary.putAll(topologyManager.getStageSummaries(entity.getRequestId()));
-
- // summary might be empty due to delete host have cleared all
- // HostRoleCommands or due to hosts haven't registered yet with the cluster
- // when the cluster is provisioned with a Blueprint
- final CalculatedStatus status;
- LogicalRequest logicalRequest = topologyManager.getRequest(entity.getRequestId());
- if (summary.isEmpty() && null != logicalRequest) {
- // in this case, it appears that there are no tasks but this is a logical
- // topology request, so it's a matter of hosts simply not registering yet
- // for tasks to be created
- status = CalculatedStatus.PENDING;
- } else {
- // there are either tasks or this is not a logical request, so do normal
- // status calculations
- status = CalculatedStatus.statusFromStageSummary(summary, summary.keySet());
- }
+ final CalculatedStatus status = CalculatedStatus.statusFromRequest(s_hostRoleCommandDAO, topologyManager, entity.getRequestId());
setResourceProperty(resource, REQUEST_STATUS_PROPERTY_ID, status.getStatus().toString(), requestedPropertyIds);
setResourceProperty(resource, REQUEST_PROGRESS_PERCENT_ID, status.getPercent(), requestedPropertyIds);
int taskCount = 0;
+ Map<Long, HostRoleCommandStatusSummaryDTO> summary = s_hostRoleCommandDAO.findAggregateCounts(entity.getRequestId());
for (HostRoleCommandStatusSummaryDTO dto : summary.values()) {
taskCount += dto.getTaskTotal();
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/c2ab4a3c/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
index 6556852..33cab7c 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java
@@ -72,6 +72,7 @@ import org.apache.ambari.server.state.ServiceComponentHost;
import org.apache.ambari.server.state.ServiceInfo;
import org.apache.ambari.server.state.StackId;
import org.apache.ambari.server.state.State;
+import org.apache.ambari.server.topology.TopologyDeleteFormer;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
@@ -123,6 +124,9 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider
@Inject
private KerberosHelper kerberosHelper;
+ @Inject
+ private TopologyDeleteFormer topologyDeleteFormer;
+
// ----- Constructors ----------------------------------------------------
/**
@@ -884,9 +888,12 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider
}
}
+ DeleteHostComponentStatusMetaData deleteMetaData = new DeleteHostComponentStatusMetaData();
for (Service service : removable) {
- service.getCluster().deleteService(service.getName());
+ service.getCluster().deleteService(service.getName(), deleteMetaData);
+ topologyDeleteFormer.processDeleteMetaDataException(deleteMetaData);
}
+ topologyDeleteFormer.processDeleteMetaData(deleteMetaData);
return null;
}