You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ao...@apache.org on 2017/09/28 13:25:08 UTC
[19/50] [abbrv] ambari git commit: AMBARI-22012. BP deploys failing
intermittently with error (magyari_sandor)
AMBARI-22012. BP deploys failing intermittently with error (magyari_sandor)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/b653ee0a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/b653ee0a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/b653ee0a
Branch: refs/heads/branch-3.0-perf
Commit: b653ee0a5967a8255f087f43a1cb6a878e765833
Parents: 6633a7b
Author: Sandor Magyari <sm...@hortonworks.com>
Authored: Fri Sep 22 17:48:54 2017 +0200
Committer: Sandor Magyari <sm...@hortonworks.com>
Committed: Mon Sep 25 21:19:53 2017 +0200
----------------------------------------------------------------------
.../events/ClusterConfigFinishedEvent.java | 15 +++-
.../ambari/server/topology/TopologyManager.java | 88 +++++++++++++-------
.../topology/tasks/ConfigureClusterTask.java | 13 ++-
.../tasks/ConfigureClusterTaskFactory.java | 3 +-
.../server/utils/ManagedThreadPoolExecutor.java | 83 ++++++++++++++++++
.../ClusterDeployWithStartOnlyTest.java | 2 +-
...InstallWithoutStartOnComponentLevelTest.java | 2 +-
.../ClusterInstallWithoutStartTest.java | 2 +-
.../topology/ConfigureClusterTaskTest.java | 18 +++-
.../utils/ManagedThreadPoolExecutorTest.java | 51 ++++++++++++
10 files changed, 236 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigFinishedEvent.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigFinishedEvent.java b/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigFinishedEvent.java
index cdb86ac..f0574d0 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigFinishedEvent.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/ClusterConfigFinishedEvent.java
@@ -23,17 +23,27 @@ package org.apache.ambari.server.events;
* cluster configuration is successfully updated.
*/
public class ClusterConfigFinishedEvent extends AmbariEvent {
+
+ private final long clusterId;
private final String clusterName;
- public ClusterConfigFinishedEvent(String clusterName) {
+ public ClusterConfigFinishedEvent(long clusterId, String clusterName) {
super(AmbariEventType.CLUSTER_CONFIG_FINISHED);
+ this.clusterId = clusterId;
this.clusterName = clusterName;
}
/**
+ * Get the cluster id
+ * @return
+ */
+ public long getClusterId() {
+ return clusterId;
+ }
+
+ /**
* Get the cluster name
- *
* @return
*/
public String getClusterName() {
@@ -46,6 +56,7 @@ public class ClusterConfigFinishedEvent extends AmbariEvent {
@Override
public String toString() {
StringBuilder buffer = new StringBuilder("ClusterConfigChangedEvent{");
+ buffer.append("clusterId=").append(getClusterId());
buffer.append("clusterName=").append(getClusterName());
buffer.append("}");
return buffer.toString();
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
index 3af62e8..9769fae 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/TopologyManager.java
@@ -29,9 +29,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.ambari.server.AmbariException;
import org.apache.ambari.server.actionmanager.HostRoleCommand;
@@ -73,6 +74,7 @@ import org.apache.ambari.server.state.quicklinksprofile.QuickLinksProfile;
import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
import org.apache.ambari.server.topology.tasks.ConfigureClusterTaskFactory;
import org.apache.ambari.server.topology.validators.TopologyValidatorService;
+import org.apache.ambari.server.utils.ManagedThreadPoolExecutor;
import org.apache.ambari.server.utils.RetryHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,9 +104,23 @@ public class TopologyManager {
private static final String CLUSTER_CONFIG_TASK_MAX_TIME_IN_MILLIS_PROPERTY_NAME = "cluster_configure_task_timeout";
private PersistedState persistedState;
+
+ /**
+ * Single threaded executor to execute async tasks. At the moment it's only used to execute ConfigureClusterTask.
+ */
private final ExecutorService executor = Executors.newSingleThreadExecutor();
- private final Executor taskExecutor; // executes TopologyTasks
- private final boolean parallelTaskCreationEnabled;
+
+ /**
+ * Thread pool size for topology task executors.
+ */
+ private int topologyTaskExecutorThreadPoolSize;
+ /**
+ * There is one ExecutorService for each cluster to execute TopologyTasks.
+ * TopologyTasks are submitted into ExecutorService for each cluster,
+ * however the ExecutorService is started only after cluster configuration is finished.
+ */
+ private final Map<Long, ManagedThreadPoolExecutor> topologyTaskExecutorServiceMap = new HashMap<>();
+
private Collection<String> hostsToIgnore = new HashSet<>();
private final List<HostImpl> availableHosts = new LinkedList<>();
private final Map<String, LogicalRequest> reservedHosts = new HashMap<>();
@@ -158,17 +174,15 @@ public class TopologyManager {
private Map<Long, Boolean> clusterProvisionWithBlueprintCreationFinished = new HashMap<>();
public TopologyManager() {
- parallelTaskCreationEnabled = false;
- taskExecutor = executor;
+ topologyTaskExecutorThreadPoolSize = 1;
}
@Inject
public TopologyManager(Configuration configuration) {
- int threadPoolSize = configuration.getParallelTopologyTaskCreationThreadPoolSize();
- parallelTaskCreationEnabled = configuration.isParallelTopologyTaskCreationEnabled() && threadPoolSize > 1;
- taskExecutor = parallelTaskCreationEnabled
- ? Executors.newFixedThreadPool(threadPoolSize)
- : executor;
+ topologyTaskExecutorThreadPoolSize = configuration.getParallelTopologyTaskCreationThreadPoolSize();
+ if (!configuration.isParallelTopologyTaskCreationEnabled()) {
+ topologyTaskExecutorThreadPoolSize = 1;
+ }
}
// executed by the IoC framework after creating the object (guice)
@@ -310,6 +324,10 @@ public class TopologyManager {
// set provision action requested
topology.setProvisionAction(request.getProvisionAction());
+
+ // create task executor for TopologyTasks
+ getOrCreateTopologyTaskExecutor(clusterId);
+
// persist request
LogicalRequest logicalRequest = RetryHelper.executeWithRetry(new Callable<LogicalRequest>() {
@Override
@@ -325,15 +343,6 @@ public class TopologyManager {
addClusterConfigRequest(topology, new ClusterConfigurationRequest(ambariContext, topology, true,
stackAdvisorBlueprintProcessor, securityType == SecurityType.KERBEROS));
- // Notify listeners that cluster configuration finished
- executor.submit(new Callable<Boolean>() {
- @Override
- public Boolean call() throws Exception {
- ambariEventPublisher.publish(new ClusterConfigFinishedEvent(clusterName));
- return Boolean.TRUE;
- }
- });
-
// Process the logical request
processRequest(request, topology, logicalRequest);
@@ -345,6 +354,17 @@ public class TopologyManager {
return getRequestStatus(logicalRequest.getRequestId());
}
+ @Subscribe
+ public void onClusterConfigFinishedEvent(ClusterConfigFinishedEvent event) {
+ ManagedThreadPoolExecutor taskExecutor = topologyTaskExecutorServiceMap.get(event.getClusterId());
+ if (taskExecutor == null) {
+ LOG.error("Can't find executor service taskQueue not found for cluster: {} ", event.getClusterName());
+ } else {
+ LOG.info("Starting topology task ExecutorService for cluster: {}", event.getClusterName());
+ taskExecutor.start();
+ }
+ }
+
/**
* Saves the quick links profile to the DB as an Ambari setting. Creates a new setting entity or updates the existing
@@ -941,16 +961,8 @@ public class TopologyManager {
}
LOG.info("TopologyManager.processAcceptedHostOffer: queue tasks for host = {} which responded {}", hostName, response.getAnswer());
- if (parallelTaskCreationEnabled) {
- executor.execute(new Runnable() { // do not start until cluster config done
- @Override
- public void run() {
- queueHostTasks(topology, response, hostName);
- }
- });
- } else {
- queueHostTasks(topology, response, hostName);
- }
+ queueHostTasks(topology, response, hostName);
+
}
@Transactional
@@ -959,9 +971,23 @@ public class TopologyManager {
persistedState.registerInTopologyHostInfo(host);
}
+ private ExecutorService getOrCreateTopologyTaskExecutor(Long clusterId) {
+ ManagedThreadPoolExecutor topologyTaskExecutor = this.topologyTaskExecutorServiceMap.get(clusterId);
+ if (topologyTaskExecutor == null) {
+ LOG.info("Creating TopologyTaskExecutorService for clusterId: {}", clusterId);
+
+ topologyTaskExecutor = new ManagedThreadPoolExecutor(topologyTaskExecutorThreadPoolSize,
+ topologyTaskExecutorThreadPoolSize, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ topologyTaskExecutorServiceMap.put(clusterId, topologyTaskExecutor);
+ }
+ return topologyTaskExecutor;
+ }
+
private void queueHostTasks(ClusterTopology topology, HostOfferResponse response, String hostName) {
LOG.info("TopologyManager.processAcceptedHostOffer: queueing tasks for host = {}", hostName);
- response.executeTasks(taskExecutor, hostName, topology, ambariContext);
+ ExecutorService executorService = getOrCreateTopologyTaskExecutor(topology.getClusterId());
+ response.executeTasks(executorService, hostName, topology, ambariContext);
}
private void updateHostWithRackInfo(ClusterTopology topology, HostOfferResponse response, HostImpl host) {
@@ -1108,7 +1134,7 @@ public class TopologyManager {
}
ConfigureClusterTask configureClusterTask = configureClusterTaskFactory.createConfigureClusterTask(topology,
- configurationRequest);
+ configurationRequest, ambariEventPublisher);
AsyncCallableService<Boolean> asyncCallableService = new AsyncCallableService<>(configureClusterTask, timeout, delay,
Executors.newScheduledThreadPool(1));
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
index 0ce5982..60eaa59 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTask.java
@@ -22,6 +22,8 @@ import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Callable;
+import org.apache.ambari.server.events.ClusterConfigFinishedEvent;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.security.authorization.internal.RunWithInternalSecurityContext;
import org.apache.ambari.server.topology.ClusterConfigurationRequest;
import org.apache.ambari.server.topology.ClusterTopology;
@@ -39,11 +41,14 @@ public class ConfigureClusterTask implements Callable<Boolean> {
private ClusterConfigurationRequest configRequest;
private ClusterTopology topology;
+ private AmbariEventPublisher ambariEventPublisher;
@AssistedInject
- public ConfigureClusterTask(@Assisted ClusterTopology topology, @Assisted ClusterConfigurationRequest configRequest) {
+ public ConfigureClusterTask(@Assisted ClusterTopology topology, @Assisted ClusterConfigurationRequest configRequest,
+ @Assisted AmbariEventPublisher ambariEventPublisher) {
this.configRequest = configRequest;
this.topology = topology;
+ this.ambariEventPublisher = ambariEventPublisher;
}
@Override
@@ -72,6 +77,12 @@ public class ConfigureClusterTask implements Callable<Boolean> {
throw new Exception(e);
}
+ LOG.info("Cluster configuration finished successfully!");
+ // Notify listeners that cluster configuration finished
+ long clusterId = topology.getClusterId();
+ ambariEventPublisher.publish(new ClusterConfigFinishedEvent(clusterId,
+ topology.getAmbariContext().getClusterName(clusterId)));
+
LOG.info("TopologyManager.ConfigureClusterTask: Exiting");
return true;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java
index 9e3c151..558af30 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/tasks/ConfigureClusterTaskFactory.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.topology.tasks;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.topology.ClusterConfigurationRequest;
import org.apache.ambari.server.topology.ClusterTopology;
@@ -25,6 +26,6 @@ import org.apache.ambari.server.topology.ClusterTopology;
public interface ConfigureClusterTaskFactory {
ConfigureClusterTask createConfigureClusterTask(ClusterTopology topology, ClusterConfigurationRequest
- configRequest);
+ configRequest, AmbariEventPublisher ambariEventPublisher);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/main/java/org/apache/ambari/server/utils/ManagedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/utils/ManagedThreadPoolExecutor.java b/ambari-server/src/main/java/org/apache/ambari/server/utils/ManagedThreadPoolExecutor.java
new file mode 100644
index 0000000..167c9cb
--- /dev/null
+++ b/ambari-server/src/main/java/org/apache/ambari/server/utils/ManagedThreadPoolExecutor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distribut
+ * ed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.server.utils;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * ThreadPoolExecutor extension which is stopped by default and can be started & stopped.
+ */
+public class ManagedThreadPoolExecutor extends ThreadPoolExecutor {
+
+ private volatile boolean isStopped;
+ private final ReentrantLock pauseLock = new ReentrantLock();
+ private final Condition unpaused = pauseLock.newCondition();
+
+ public ManagedThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit,
+ BlockingQueue<Runnable> workQueue) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
+ Executors.defaultThreadFactory());
+ isStopped = true;
+ }
+
+ protected void beforeExecute(Thread t, Runnable r) {
+ super.beforeExecute(t, r);
+ pauseLock.lock();
+ try {
+ while (isStopped) {
+ unpaused.await();
+ }
+ } catch (InterruptedException ie) {
+ t.interrupt();
+ } finally {
+ pauseLock.unlock();
+ }
+ }
+
+ public void start() {
+ pauseLock.lock();
+ try {
+ isStopped = false;
+ unpaused.signalAll();
+ } finally {
+ pauseLock.unlock();
+ }
+ }
+
+ public void stop() {
+ pauseLock.lock();
+ try {
+ isStopped = true;
+ } finally {
+ pauseLock.unlock();
+ }
+ }
+
+ public boolean isRunning() {
+ return !isStopped;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
index c3248a3..0daa20f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterDeployWithStartOnlyTest.java
@@ -391,7 +391,7 @@ public class ClusterDeployWithStartOnlyTest extends EasyMockSupport {
ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
expectLastCall().once();
- expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(2);
+ expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
persistedTopologyRequest = new PersistedTopologyRequest(1, request);
expect(persistedState.getAllRequests()).andReturn(Collections.emptyMap()).once();
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
index 372d0a1..bbf4fdb 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartOnComponentLevelTest.java
@@ -368,7 +368,7 @@ public class ClusterInstallWithoutStartOnComponentLevelTest extends EasyMockSupp
ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
expectLastCall().once();
- expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(2);
+ expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
persistedTopologyRequest = new PersistedTopologyRequest(1, request);
expect(persistedState.getAllRequests()).andReturn(Collections.emptyMap()).once();
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
index 9620507..059a8be 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ClusterInstallWithoutStartTest.java
@@ -363,7 +363,7 @@ public class ClusterInstallWithoutStartTest extends EasyMockSupport {
ambariContext.persistInstallStateForUI(CLUSTER_NAME, STACK_NAME, STACK_VERSION);
expectLastCall().once();
- expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(2);
+ expect(executor.submit(anyObject(AsyncCallableService.class))).andReturn(mockFuture).times(1);
persistedTopologyRequest = new PersistedTopologyRequest(1, request);
expect(persistedState.getAllRequests()).andReturn(Collections.emptyMap()).once();
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
index feefcab..b2dac8f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/ConfigureClusterTaskTest.java
@@ -18,6 +18,7 @@
package org.apache.ambari.server.topology;
+import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.reset;
@@ -30,6 +31,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
+import org.apache.ambari.server.events.publishers.AmbariEventPublisher;
import org.apache.ambari.server.topology.tasks.ConfigureClusterTask;
import org.easymock.EasyMockRule;
import org.easymock.Mock;
@@ -60,12 +62,18 @@ public class ConfigureClusterTaskTest {
@Mock(type = MockType.STRICT)
private ClusterTopology clusterTopology;
+ @Mock(type = MockType.STRICT)
+ private AmbariContext ambariContext;
+
+ @Mock(type = MockType.NICE)
+ private AmbariEventPublisher ambariEventPublisher;
+
private ConfigureClusterTask testSubject;
@Before
public void before() {
- reset(clusterConfigurationRequest, clusterTopology);
- testSubject = new ConfigureClusterTask(clusterTopology, clusterConfigurationRequest);
+ reset(clusterConfigurationRequest, clusterTopology, ambariContext, ambariEventPublisher);
+ testSubject = new ConfigureClusterTask(clusterTopology, clusterConfigurationRequest, ambariEventPublisher);
}
@Test
@@ -75,11 +83,15 @@ public class ConfigureClusterTaskTest {
// is it OK to handle the non existence of hostgroups as a success?!
expect(clusterConfigurationRequest.getRequiredHostGroups()).andReturn(Collections.emptyList());
expect(clusterTopology.getHostGroupInfo()).andReturn(Collections.emptyMap());
+ expect(clusterTopology.getClusterId()).andReturn(1L).anyTimes();
+ expect(clusterTopology.getAmbariContext()).andReturn(ambariContext);
+ expect(ambariContext.getClusterName(1L)).andReturn("testCluster");
// this is only called if the "prerequisites" are satisfied
clusterConfigurationRequest.process();
+ ambariEventPublisher.publish(anyObject());
- replay(clusterConfigurationRequest, clusterTopology);
+ replay(clusterConfigurationRequest, clusterTopology, ambariContext, ambariEventPublisher);
// WHEN
Boolean result = testSubject.call();
http://git-wip-us.apache.org/repos/asf/ambari/blob/b653ee0a/ambari-server/src/test/java/org/apache/ambari/server/utils/ManagedThreadPoolExecutorTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/utils/ManagedThreadPoolExecutorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/utils/ManagedThreadPoolExecutorTest.java
new file mode 100644
index 0000000..e94b25c
--- /dev/null
+++ b/ambari-server/src/test/java/org/apache/ambari/server/utils/ManagedThreadPoolExecutorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ambari.server.utils;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+import junit.framework.Assert;
+
+public class ManagedThreadPoolExecutorTest {
+
+ @Test
+ public void testGetHostAndPortFromProperty() {
+
+ ManagedThreadPoolExecutor topologyTaskExecutor = new ManagedThreadPoolExecutor(1,
+ 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ Future<Boolean> feature = topologyTaskExecutor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() {
+ return Boolean.TRUE;
+ }
+ });
+
+ Assert.assertTrue(!topologyTaskExecutor.isRunning());
+ topologyTaskExecutor.start();
+ Assert.assertTrue(topologyTaskExecutor.isRunning());
+ topologyTaskExecutor.stop();
+ Assert.assertTrue(!topologyTaskExecutor.isRunning());
+
+ }
+}