You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by st...@apache.org on 2016/04/12 09:27:39 UTC
ambari git commit: AMBARI-15803. Restarting ambari-server after
successful blueprint deploy of large cluster makes it unresponsive. (stoader)
Repository: ambari
Updated Branches:
refs/heads/trunk 1fed70cab -> 2d25706df
AMBARI-15803. Restarting ambari-server after successful blueprint deploy of large cluster makes it unresponsive. (stoader)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2d25706d
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2d25706d
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2d25706d
Branch: refs/heads/trunk
Commit: 2d25706dfb62fb557b6186d0a8c0f512724f39ea
Parents: 1fed70c
Author: Toader, Sebastian <st...@hortonworks.com>
Authored: Tue Apr 12 09:27:22 2016 +0200
Committer: Toader, Sebastian <st...@hortonworks.com>
Committed: Tue Apr 12 09:27:22 2016 +0200
----------------------------------------------------------------------
.../ambari/server/topology/LogicalRequest.java | 59 +++-
.../server/topology/LogicalRequestTest.java | 341 +++++++++++++++++++
2 files changed, 398 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/2d25706d/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
index 6de615a..7ec6088 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/LogicalRequest.java
@@ -25,9 +25,12 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
+import javax.annotation.Nullable;
+
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
import org.apache.ambari.server.actionmanager.HostRoleStatus;
@@ -40,12 +43,18 @@ import org.apache.ambari.server.orm.dao.HostRoleCommandStatusSummaryDTO;
import org.apache.ambari.server.orm.entities.HostRoleCommandEntity;
import org.apache.ambari.server.orm.entities.StageEntity;
import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostInfoEntity;
import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
import org.apache.ambari.server.state.host.HostImpl;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+
/**
* Logical Request implementation used to provision a cluster deployed by Blueprints.
*/
@@ -393,6 +402,25 @@ public class LogicalRequest extends Request {
private void createHostRequests(ClusterTopology topology,
TopologyLogicalRequestEntity requestEntity) {
+ Collection<TopologyHostGroupEntity> hostGroupEntities = requestEntity.getTopologyRequestEntity().getTopologyHostGroupEntities();
+ Map<String, Set<String>> allReservedHostNamesByHostGroups = getReservedHostNamesByHostGroupName(hostGroupEntities);
+ Map<String, Set<String>> pendingReservedHostNamesByHostGroups = new HashMap<>(allReservedHostNamesByHostGroups);
+
+ for (TopologyHostRequestEntity hostRequestEntity : requestEntity.getTopologyHostRequestEntities()) {
+ TopologyHostGroupEntity hostGroupEntity = hostRequestEntity.getTopologyHostGroupEntity();
+ String hostGroupName = hostGroupEntity.getName();
+ String hostName = hostRequestEntity.getHostName();
+
+ if (hostName != null && pendingReservedHostNamesByHostGroups.containsKey(hostGroupName)) {
+ Set<String> pendingReservedHostNamesInHostGroup = pendingReservedHostNamesByHostGroups.get(hostGroupName);
+
+ if (pendingReservedHostNamesInHostGroup != null) {
+ pendingReservedHostNamesInHostGroup.remove(hostName);
+ }
+ }
+ }
+
+
for (TopologyHostRequestEntity hostRequestEntity : requestEntity.getTopologyHostRequestEntities()) {
Long hostRequestId = hostRequestEntity.getId();
synchronized (hostIdCounter) {
@@ -401,9 +429,11 @@ public class LogicalRequest extends Request {
}
}
TopologyHostGroupEntity hostGroupEntity = hostRequestEntity.getTopologyHostGroupEntity();
+ Set<String> pendingReservedHostsInGroup = pendingReservedHostNamesByHostGroups.get(hostGroupEntity.getName());
+
+ // get next host name from pending host names
+ String reservedHostName = Iterables.getFirst(pendingReservedHostsInGroup, null);
- String reservedHostName = hostGroupEntity.
- getTopologyHostInfoEntities().iterator().next().getFqdn();
//todo: move predicate processing to host request
HostRequest hostRequest = new HostRequest(getRequestId(), hostRequestId,
@@ -413,6 +443,7 @@ public class LogicalRequest extends Request {
if (! hostRequest.isCompleted()) {
if (reservedHostName != null) {
requestsWithReservedHosts.put(reservedHostName, hostRequest);
+ pendingReservedHostsInGroup.remove(reservedHostName);
LOG.info("LogicalRequest.createHostRequests: created new request for a reserved request ID = {} for host name = {}",
hostRequest.getId(), reservedHostName);
} else {
@@ -423,6 +454,30 @@ public class LogicalRequest extends Request {
}
}
+ /**
+ * Returns a map where the keys are the host group names and the values the
+ * collection of FQDNs the hosts in the host group
+ * @param hostGroups
+ * @return
+ */
+ private Map<String, Set<String>> getReservedHostNamesByHostGroupName(Collection<TopologyHostGroupEntity> hostGroups) {
+ Map<String, Set<String>> reservedHostNamesByHostGroups = new HashMap<>();
+
+ for (TopologyHostGroupEntity hostGroupEntity: hostGroups) {
+ String hostGroupName = hostGroupEntity.getName();
+
+ if ( !reservedHostNamesByHostGroups.containsKey(hostGroupName) )
+ reservedHostNamesByHostGroups.put(hostGroupName, new HashSet<String>());
+
+ for (TopologyHostInfoEntity hostInfoEntity: hostGroupEntity.getTopologyHostInfoEntities()) {
+ if (StringUtils.isNotEmpty(hostInfoEntity.getFqdn())) {
+ reservedHostNamesByHostGroups.get(hostGroupName).add(hostInfoEntity.getFqdn());
+ }
+ }
+ }
+ return reservedHostNamesByHostGroups;
+ }
+
private synchronized static AmbariManagementController getController() {
if (controller == null) {
controller = AmbariServer.getController();
http://git-wip-us.apache.org/repos/asf/ambari/blob/2d25706d/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
new file mode 100644
index 0000000..e979173
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/LogicalRequestTest.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.topology;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import javax.annotation.Nullable;
+
+import org.apache.ambari.server.controller.AmbariManagementController;
+import org.apache.ambari.server.controller.AmbariServer;
+import org.apache.ambari.server.controller.internal.ProvisionAction;
+import org.apache.ambari.server.orm.entities.TopologyHostGroupEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostInfoEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyHostTaskEntity;
+import org.apache.ambari.server.orm.entities.TopologyLogicalRequestEntity;
+import org.apache.ambari.server.orm.entities.TopologyRequestEntity;
+import org.apache.ambari.server.state.Cluster;
+import org.apache.ambari.server.state.Clusters;
+import org.easymock.EasyMockRule;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.powermock.api.easymock.PowerMock.mockStatic;
+
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(AmbariServer.class)
+public class LogicalRequestTest extends EasyMockSupport {
+
+ @Rule
+ public EasyMockRule mocks = new EasyMockRule(this);
+
+ @Mock
+ private TopologyRequest replayedTopologyRequest;
+
+ @Mock
+ private ClusterTopology clusterTopology;
+
+ @Mock
+ private TopologyLogicalRequestEntity logicalRequestEntity;
+
+ @Mock
+ private AmbariManagementController controller;
+
+ @Mock
+ private AmbariContext ambariContext;
+
+ private final long clusterId = 2L;
+ private final String clusterName = "myCluster";
+
+ @Mock
+ private Clusters clusters;
+
+ @Mock
+ private Cluster cluster;
+
+ @Mock
+ private Blueprint blueprint;
+
+ @Mock
+ private HostGroup hostGroup1;
+
+ @Mock
+ private HostGroup hostGroup2;
+
+
+ @Before
+ public void setup() throws Exception {
+ resetAll();
+
+ expect(controller.getClusters()).andReturn(clusters).anyTimes();
+ expect(clusters.getClusterById(clusterId)).andReturn(cluster).anyTimes();
+ expect(cluster.getClusterName()).andReturn(clusterName).anyTimes();
+
+ String topologyReqestDescription = "Provision cluster";
+
+ expect(replayedTopologyRequest.getDescription()).andReturn(topologyReqestDescription).anyTimes();
+ expect(clusterTopology.getAmbariContext()).andReturn(ambariContext).anyTimes();
+ expect(clusterTopology.getClusterId()).andReturn(clusterId).anyTimes();
+ expect(clusterTopology.getProvisionAction()).andReturn(ProvisionAction.INSTALL_ONLY).anyTimes();
+ expect(clusterTopology.getBlueprint()).andReturn(blueprint).anyTimes();
+ expect(blueprint.getName()).andReturn("blueprintDef").anyTimes();
+
+ PowerMock.reset(AmbariServer.class);
+
+ mockStatic(AmbariServer.class);
+ expect(AmbariServer.getController()).andReturn(controller).anyTimes();
+
+ PowerMock.replay(AmbariServer.class);
+
+
+
+ }
+
+ @Test
+ public void testPersistedRequestsWithReservedHosts() throws Exception {
+ // Given
+ Long requestId = 1L;
+
+ TopologyHostInfoEntity host1 = new TopologyHostInfoEntity();
+ host1.setId(100L);
+ host1.setFqdn("host1");
+
+ TopologyHostInfoEntity host2 = new TopologyHostInfoEntity();
+ host2.setId(101L);
+ host2.setFqdn("host2");
+
+ TopologyHostInfoEntity host3 = new TopologyHostInfoEntity();
+ host3.setId(103L);
+ host3.setFqdn("host3");
+
+ TopologyHostInfoEntity host4 = new TopologyHostInfoEntity();
+ host4.setId(104L);
+ host4.setFqdn("host4");
+
+
+ TopologyHostGroupEntity hostGroupEntity1 = new TopologyHostGroupEntity();
+ hostGroupEntity1.setTopologyHostInfoEntities(ImmutableSet.of(host1, host2, host3, host4));
+ hostGroupEntity1.setName("host_group_1");
+
+ // host request matched to a registered host
+ TopologyHostRequestEntity hostRequestEntityHost1Matched = new TopologyHostRequestEntity();
+ hostRequestEntityHost1Matched.setId(1L);
+ hostRequestEntityHost1Matched.setHostName(host1.getFqdn()); //host request matched host1
+ hostRequestEntityHost1Matched.setTopologyHostGroupEntity(hostGroupEntity1);
+ hostRequestEntityHost1Matched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host1.getFqdn()))).andReturn(true).anyTimes();
+
+
+ TopologyHostRequestEntity hostRequestEntityHost2Matched = new TopologyHostRequestEntity();
+ hostRequestEntityHost2Matched.setId(2L);
+ hostRequestEntityHost2Matched.setHostName(host2.getFqdn()); // host request matched host2
+ hostRequestEntityHost2Matched.setTopologyHostGroupEntity(hostGroupEntity1);
+ hostRequestEntityHost2Matched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host2.getFqdn()))).andReturn(true).anyTimes();
+
+ // host request that hasn't been matched to any registered host yet
+ TopologyHostRequestEntity hostRequestEntityHost3NotMatched = new TopologyHostRequestEntity();
+ hostRequestEntityHost3NotMatched.setId(3L);
+ hostRequestEntityHost3NotMatched.setTopologyHostGroupEntity(hostGroupEntity1);
+ hostRequestEntityHost3NotMatched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host3.getFqdn()))).andReturn(false).anyTimes();
+
+ TopologyHostRequestEntity hostRequestEntityHost4NotMatched = new TopologyHostRequestEntity();
+ hostRequestEntityHost4NotMatched.setId(4L);
+ hostRequestEntityHost4NotMatched.setTopologyHostGroupEntity(hostGroupEntity1);
+ hostRequestEntityHost4NotMatched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq(host4.getFqdn()))).andReturn(false).anyTimes();
+
+ Collection<TopologyHostRequestEntity> reservedHostRequestEntities = ImmutableSet.of(
+ hostRequestEntityHost1Matched,
+ hostRequestEntityHost2Matched,
+ hostRequestEntityHost3NotMatched,
+ hostRequestEntityHost4NotMatched);
+
+ hostGroupEntity1.setTopologyHostRequestEntities(reservedHostRequestEntities);
+
+ TopologyRequestEntity topologyRequestEntity = new TopologyRequestEntity();
+ topologyRequestEntity.setTopologyHostGroupEntities(Collections.singleton(hostGroupEntity1));
+
+ expect(logicalRequestEntity.getTopologyHostRequestEntities()).andReturn(reservedHostRequestEntities).atLeastOnce();
+
+
+ expect(logicalRequestEntity.getTopologyRequestEntity()).andReturn(topologyRequestEntity).atLeastOnce();
+ expect(blueprint.getHostGroup(eq("host_group_1"))).andReturn(hostGroup1).atLeastOnce();
+ expect(hostGroup1.containsMasterComponent()).andReturn(false).atLeastOnce();
+
+ replayAll();
+
+
+ // When
+
+ LogicalRequest req = new LogicalRequest(requestId, replayedTopologyRequest, clusterTopology, logicalRequestEntity);
+
+ // Then
+ verifyAll();
+
+ // reserved hosts names are the ones that have no
+ // matching hosts among the registered hosts
+ Collection<String> actualReservedHosts = req.getReservedHosts();
+
+ Collection<String> expectedReservedHosts = ImmutableSet.of(host3.getFqdn(), host4.getFqdn());
+
+ assertEquals(expectedReservedHosts, actualReservedHosts);
+
+ Collection<HostRequest> actualCompletedHostReqs = req.getCompletedHostRequests();
+
+ assertEquals(2, actualCompletedHostReqs.size());
+
+ Optional<HostRequest> completedHostReq1 = Iterables.tryFind(actualCompletedHostReqs, new Predicate<HostRequest>() {
+ @Override
+ public boolean apply(@Nullable HostRequest input) {
+ return "host1".equals(input.getHostName());
+ }
+ });
+
+ Optional<HostRequest> completedHostReq2 = Iterables.tryFind(actualCompletedHostReqs, new Predicate<HostRequest>() {
+ @Override
+ public boolean apply(@Nullable HostRequest input) {
+ return "host2".equals(input.getHostName());
+ }
+ });
+
+ assertTrue(completedHostReq1.isPresent() && completedHostReq2.isPresent());
+
+ }
+
+
+
+ @Test
+ public void testPersistedRequestsWithHostPredicate() throws Exception {
+ // Given
+ Long requestId = 1L;
+
+ TopologyHostInfoEntity host = new TopologyHostInfoEntity();
+ host.setId(800L);
+ host.setPredicate("Hosts/host_name.in(host[1-4])");
+
+
+ TopologyHostGroupEntity hostGroupEntity2 = new TopologyHostGroupEntity();
+ hostGroupEntity2.setTopologyHostInfoEntities(ImmutableSet.of(host));
+ hostGroupEntity2.setName("host_group_2");
+
+ // host request matched to a registered host
+ TopologyHostRequestEntity hostRequestEntityHost1Matched = new TopologyHostRequestEntity();
+ hostRequestEntityHost1Matched.setId(1L);
+ hostRequestEntityHost1Matched.setHostName("host1"); //host request matched host1
+ hostRequestEntityHost1Matched.setTopologyHostGroupEntity(hostGroupEntity2);
+ hostRequestEntityHost1Matched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq("host1"))).andReturn(true).anyTimes();
+
+
+ TopologyHostRequestEntity hostRequestEntityHost2Matched = new TopologyHostRequestEntity();
+ hostRequestEntityHost2Matched.setId(2L);
+ hostRequestEntityHost2Matched.setHostName("host2"); // host request matched host2
+ hostRequestEntityHost2Matched.setTopologyHostGroupEntity(hostGroupEntity2);
+ hostRequestEntityHost2Matched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq("host2"))).andReturn(true).anyTimes();
+
+ // host request that hasn't been matched to any registered host yet
+ TopologyHostRequestEntity hostRequestEntityHost3NotMatched = new TopologyHostRequestEntity();
+ hostRequestEntityHost3NotMatched.setId(3L);
+ hostRequestEntityHost3NotMatched.setTopologyHostGroupEntity(hostGroupEntity2);
+ hostRequestEntityHost3NotMatched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq("host3"))).andReturn(false).anyTimes();
+
+ TopologyHostRequestEntity hostRequestEntityHost4NotMatched = new TopologyHostRequestEntity();
+ hostRequestEntityHost4NotMatched.setId(4L);
+ hostRequestEntityHost4NotMatched.setTopologyHostGroupEntity(hostGroupEntity2);
+ hostRequestEntityHost4NotMatched.setTopologyHostTaskEntities(Collections.<TopologyHostTaskEntity>emptySet());
+ expect(ambariContext.isHostRegisteredWithCluster(eq(clusterId), eq("host4"))).andReturn(false).anyTimes();
+
+ Collection<TopologyHostRequestEntity> reservedHostRequestEntities = ImmutableSet.of(
+ hostRequestEntityHost1Matched,
+ hostRequestEntityHost2Matched,
+ hostRequestEntityHost3NotMatched,
+ hostRequestEntityHost4NotMatched);
+
+ hostGroupEntity2.setTopologyHostRequestEntities(reservedHostRequestEntities);
+
+ TopologyRequestEntity topologyRequestEntity = new TopologyRequestEntity();
+ topologyRequestEntity.setTopologyHostGroupEntities(Collections.singleton(hostGroupEntity2));
+
+ expect(logicalRequestEntity.getTopologyHostRequestEntities()).andReturn(reservedHostRequestEntities).atLeastOnce();
+
+
+ expect(logicalRequestEntity.getTopologyRequestEntity()).andReturn(topologyRequestEntity).atLeastOnce();
+ expect(blueprint.getHostGroup(eq("host_group_2"))).andReturn(hostGroup2).atLeastOnce();
+ expect(hostGroup2.containsMasterComponent()).andReturn(false).atLeastOnce();
+
+ replayAll();
+
+
+ // When
+
+ LogicalRequest req = new LogicalRequest(requestId, replayedTopologyRequest, clusterTopology, logicalRequestEntity);
+
+ // Then
+ verifyAll();
+
+ // there should be no reserved hosts when host predicates are used
+ Collection<String> actualReservedHosts = req.getReservedHosts();
+ Collection<String> expectedReservedHosts = Collections.emptySet();
+
+ assertEquals(expectedReservedHosts, actualReservedHosts);
+
+ Collection<HostRequest> actualCompletedHostReqs = req.getCompletedHostRequests();
+
+ assertEquals(2, actualCompletedHostReqs.size());
+
+ Optional<HostRequest> completedHostReq1 = Iterables.tryFind(actualCompletedHostReqs, new Predicate<HostRequest>() {
+ @Override
+ public boolean apply(@Nullable HostRequest input) {
+ return "host1".equals(input.getHostName());
+ }
+ });
+
+ Optional<HostRequest> completedHostReq2 = Iterables.tryFind(actualCompletedHostReqs, new Predicate<HostRequest>() {
+ @Override
+ public boolean apply(@Nullable HostRequest input) {
+ return "host2".equals(input.getHostName());
+ }
+ });
+
+ assertTrue(completedHostReq1.isPresent() && completedHostReq2.isPresent());
+
+ }
+}