You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by dm...@apache.org on 2015/04/28 15:43:10 UTC

ambari git commit: AMBARI-10731. RU - Install repo should batch the distribution of bits to prevent a Denial-of-Service attack (dlysnichenko)

Repository: ambari
Updated Branches:
  refs/heads/trunk f254d2e55 -> 5314c6904


AMBARI-10731. RU - Install repo should batch the distribution of bits to prevent a Denial-of-Service attack (dlysnichenko)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5314c690
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5314c690
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5314c690

Branch: refs/heads/trunk
Commit: 5314c6904e1eb5de2f70548ddc2a344e26d3a1b8
Parents: f254d2e
Author: Lisnichenko Dmitro <dl...@hortonworks.com>
Authored: Tue Apr 28 16:42:21 2015 +0300
Committer: Lisnichenko Dmitro <dl...@hortonworks.com>
Committed: Tue Apr 28 16:42:21 2015 +0300

----------------------------------------------------------------------
 .../server/configuration/Configuration.java     |  21 +++
 .../ClusterStackVersionResourceProvider.java    | 156 +++++++++++--------
 .../DistributeRepositoriesActionListener.java   |   2 +-
 .../server/configuration/ConfigurationTest.java |  14 ++
 ...ClusterStackVersionResourceProviderTest.java |  80 +++++++---
 5 files changed, 187 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/5314c690/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
index bbeca38..ce6d3cd 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/configuration/Configuration.java
@@ -264,6 +264,15 @@ public class Configuration {
   public static final String PARALLEL_STAGE_EXECUTION_KEY = "server.stages.parallel";
   public static final String AGENT_TASK_TIMEOUT_KEY = "agent.task.timeout";
   public static final String AGENT_PACKAGE_INSTALL_TASK_TIMEOUT_KEY = "agent.package.install.task.timeout";
+
+  /**
+   * Max number of tasks that may be executed within a single stage.
+   * This limitation is used for tasks that when executed in a 1000+ node cluster,
+   * may DDOS servers providing downloadable resources
+   */
+  public static final String AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY = "agent.package.parallel.commands.limit";
+  public static final String AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_DEFAULT = "100";
+
   public static final String AGENT_TASK_TIMEOUT_DEFAULT = "900";
   public static final String AGENT_PACKAGE_INSTALL_TASK_TIMEOUT_DEFAULT = "1800";
 
@@ -535,6 +544,8 @@ public class Configuration {
     configsMap.put(KDC_PORT_KEY, properties.getProperty(
         KDC_PORT_KEY, KDC_PORT_KEY_DEFAULT));
 
+    configsMap.put(AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY, properties.getProperty(
+            AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY, AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_DEFAULT));
 
     File passFile = new File(configsMap.get(SRVR_KSTR_DIR_KEY) + File.separator
         + configsMap.get(SRVR_CRT_PASS_FILE_KEY));
@@ -1277,6 +1288,16 @@ public class Configuration {
                                   CUSTOM_ACTION_DEFINITION_DEF_VALUE);
   }
 
+  public int getAgentPackageParallelCommandsLimit() {
+    int value = Integer.parseInt(properties.getProperty(
+            AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY,
+            AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_DEFAULT));
+    if (value < 1) {
+      value = 1;
+    }
+    return value;
+  }
+  
   /**
    * @param isPackageInstallationTask true, if task is for installing packages
    * @return default task timeout in seconds (string representation). This value

http://git-wip-us.apache.org/repos/asf/ambari/blob/5314c690/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java
index 4e6877e..6c8b733 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProvider.java
@@ -20,6 +20,7 @@ package org.apache.ambari.server.controller.internal;
 import static org.apache.ambari.server.agent.ExecutionCommand.KeyNames.JDK_LOCATION;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -327,82 +328,51 @@ public class ClusterStackVersionResourceProvider extends AbstractControllerResou
     }
 
     RequestStageContainer req = createRequest();
-    String stageName = String.format(INSTALL_PACKAGES_FULL_NAME);
 
+    Iterator<Host> hostsForClusterIter = hostsForCluster.values().iterator();
     Map<String, String> hostLevelParams = new HashMap<String, String>();
     hostLevelParams.put(JDK_LOCATION, getManagementController().getJdkResourceUrl());
+    String hostParamsJson = StageUtils.getGson().toJson(hostLevelParams);
 
-    Stage stage = stageFactory.createNew(req.getId(),
-            "/tmp/ambari",
-            cluster.getClusterName(),
-            cluster.getClusterId(),
-            stageName,
-            "{}",
-            "{}",
-            StageUtils.getGson().toJson(hostLevelParams));
+    int maxTasks = configuration.getAgentPackageParallelCommandsLimit();
+    int hostCount = hostsForCluster.size();
+    int batchCount = (int) (Math.ceil((double)hostCount / maxTasks));
 
     long stageId = req.getLastStageId() + 1;
     if (0L == stageId) {
       stageId = 1L;
     }
-    stage.setStageId(stageId);
-    req.addStages(Collections.singletonList(stage));
-
-    for (Host host : hostsForCluster.values()) {
-      // Determine repositories for host
-      final List<RepositoryEntity> repoInfo = perOsRepos.get(host.getOsFamily());
-      if (repoInfo == null) {
-        throw new SystemException(String.format("Repositories for os type %s are " +
-                        "not defined. Repo version=%s, stackId=%s",
-                        host.getOsFamily(), desiredRepoVersion, stackId));
-      }
-      // For every host at cluster, determine packages for all installed services
-      List<ServiceOsSpecific.Package> packages = new ArrayList<ServiceOsSpecific.Package>();
-      Set<String> servicesOnHost = new HashSet<String>();
-      List<ServiceComponentHost> components = cluster.getServiceComponentHosts(host.getHostName());
-      for (ServiceComponentHost component : components) {
-        servicesOnHost.add(component.getServiceName());
-      }
-
-      for (String serviceName : servicesOnHost) {
-        ServiceInfo info;
-        try {
-          info = ami.getService(stackName, stackVersion, serviceName);
-        } catch (AmbariException e) {
-          throw new SystemException("Cannot enumerate services", e);
-        }
 
-        List<ServiceOsSpecific.Package> packagesForService = managementController.getPackagesForServiceHost(info,
-                new HashMap<String, String>(), // Contents are ignored
-                host.getOsFamily());
-        packages.addAll(packagesForService);
+    ArrayList<Stage> stages = new ArrayList<Stage>(batchCount);
+    for (int batchId = 1; batchId <= batchCount ; batchId++) {
+      // Create next stage
+      String stageName;
+      if (batchCount > 1) {
+        stageName = INSTALL_PACKAGES_FULL_NAME;
+      } else {
+        stageName = String.format(INSTALL_PACKAGES_FULL_NAME +
+                ". Batch %d of %d", batchId, batchCount);
       }
-      final String packageList = gson.toJson(packages);
-      final String repoList = gson.toJson(repoInfo);
-
-      Map<String, String> params = new HashMap<String, String>() {{
-        put("stack_id", stackId.getStackId());
-        put("repository_version", desiredRepoVersion);
-        put("base_urls", repoList);
-        put("package_list", packageList);
-      }};
-
-      // add host to this stage
-      RequestResourceFilter filter = new RequestResourceFilter(null, null,
-              Collections.singletonList(host.getHostName()));
-
-      ActionExecutionContext actionContext = new ActionExecutionContext(
-              cluster.getClusterName(), INSTALL_PACKAGES_ACTION,
-              Collections.singletonList(filter),
-              params);
-      actionContext.setTimeout(Short.valueOf(configuration.getDefaultAgentTaskTimeout(true)));
-
-      try {
-        actionExecutionHelper.get().addExecutionCommandsToStage(actionContext, stage, false);
-      } catch (AmbariException e) {
-        throw new SystemException("Can not modify stage", e);
+      Stage stage = stageFactory.createNew(req.getId(),
+              "/tmp/ambari",
+              cluster.getClusterName(),
+              cluster.getClusterId(),
+              stageName,
+              "{}",
+              "{}",
+              hostParamsJson
+      );
+      stage.setStageId(stageId);
+      stages.add(stage);
+      stageId++;
+      // Populate with commands for host
+      for (int i = 0; i < maxTasks && hostsForClusterIter.hasNext(); i++) {
+        Host host = hostsForClusterIter.next();
+        addHostVersionInstallCommandsToStage(desiredRepoVersion,
+                cluster, managementController, ami, stackId, perOsRepos, stage, host);
       }
     }
+    req.addStages(stages);
 
     try {
       ClusterVersionEntity existingCSVer = clusterVersionDAO.findByClusterAndStackAndVersion(
@@ -438,6 +408,66 @@ public class ClusterStackVersionResourceProvider extends AbstractControllerResou
     return getRequestStatus(req.getRequestStatusResponse());
   }
 
+  private void addHostVersionInstallCommandsToStage(final String desiredRepoVersion,
+                                                    Cluster cluster, AmbariManagementController managementController,
+                                                    AmbariMetaInfo ami,
+                                                    final StackId stackId,
+                                                    Map<String, List<RepositoryEntity>> perOsRepos,
+                                                    Stage stage, Host host) throws SystemException {
+    // Determine repositories for host
+    final List<RepositoryEntity> repoInfo = perOsRepos.get(host.getOsFamily());
+    if (repoInfo == null) {
+      throw new SystemException(String.format("Repositories for os type %s are " +
+                      "not defined. Repo version=%s, stackId=%s",
+              host.getOsFamily(), desiredRepoVersion, stackId));
+    }
+    // For every host at cluster, determine packages for all installed services
+    List<ServiceOsSpecific.Package> packages = new ArrayList<ServiceOsSpecific.Package>();
+    Set<String> servicesOnHost = new HashSet<String>();
+    List<ServiceComponentHost> components = cluster.getServiceComponentHosts(host.getHostName());
+    for (ServiceComponentHost component : components) {
+      servicesOnHost.add(component.getServiceName());
+    }
+
+    for (String serviceName : servicesOnHost) {
+      ServiceInfo info;
+      try {
+        info = ami.getService(stackId.getStackName(), stackId.getStackVersion(), serviceName);
+      } catch (AmbariException e) {
+        throw new SystemException("Cannot enumerate services", e);
+      }
+
+      List<ServiceOsSpecific.Package> packagesForService = managementController.getPackagesForServiceHost(info,
+              new HashMap<String, String>(), // Contents are ignored
+              host.getOsFamily());
+      packages.addAll(packagesForService);
+    }
+    final String packageList = gson.toJson(packages);
+    final String repoList = gson.toJson(repoInfo);
+
+    Map<String, String> params = new HashMap<String, String>() {{
+      put("stack_id", stackId.getStackId());
+      put("repository_version", desiredRepoVersion);
+      put("base_urls", repoList);
+      put("package_list", packageList);
+    }};
+
+    // add host to this stage
+    RequestResourceFilter filter = new RequestResourceFilter(null, null,
+            Collections.singletonList(host.getHostName()));
+
+    ActionExecutionContext actionContext = new ActionExecutionContext(
+            cluster.getClusterName(), INSTALL_PACKAGES_ACTION,
+            Collections.singletonList(filter),
+            params);
+    actionContext.setTimeout(Short.valueOf(configuration.getDefaultAgentTaskTimeout(true)));
+
+    try {
+      actionExecutionHelper.get().addExecutionCommandsToStage(actionContext, stage, false);
+    } catch (AmbariException e) {
+      throw new SystemException("Can not modify stage", e);
+    }
+  }
 
 
   private RequestStageContainer createRequest() {

http://git-wip-us.apache.org/repos/asf/ambari/blob/5314c690/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/DistributeRepositoriesActionListener.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/DistributeRepositoriesActionListener.java b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/DistributeRepositoriesActionListener.java
index 5a32a82..5600ef1 100644
--- a/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/DistributeRepositoriesActionListener.java
+++ b/ambari-server/src/main/java/org/apache/ambari/server/events/listeners/upgrade/DistributeRepositoriesActionListener.java
@@ -164,7 +164,7 @@ public class DistributeRepositoriesActionListener {
       // If we know exact host stack version, there will be single execution of a code below
       if (hostVersion.getState() == RepositoryVersionState.INSTALLING) {
         hostVersion.setState(newHostState);
-
+        hostVersionDAO.get().merge(hostVersion);
         // Update state of a cluster stack version
         try {
           Cluster cluster = clusters.get().getClusterById(clusterId);

http://git-wip-us.apache.org/repos/asf/ambari/blob/5314c690/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
index 63447ca..0823f99 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/configuration/ConfigurationTest.java
@@ -441,4 +441,18 @@ public class ConfigurationTest {
     ambariProperties.setProperty(Configuration.SERVER_JDBC_URL_KEY, "jdbc:sqlserver://server");
     Assert.assertEquals( DatabaseType.SQL_SERVER, configuration.getDatabaseType() );
   }
+
+  @Test
+  public void testGetAgentPackageParallelCommandsLimit() throws Exception {
+    final Properties ambariProperties = new Properties();
+    final Configuration configuration = new Configuration(ambariProperties);
+
+    Assert.assertEquals(100, configuration.getAgentPackageParallelCommandsLimit());
+
+    ambariProperties.setProperty(Configuration.AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY, "5");
+    Assert.assertEquals(5, configuration.getAgentPackageParallelCommandsLimit());
+
+    ambariProperties.setProperty(Configuration.AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY, "0");
+    Assert.assertEquals(1, configuration.getAgentPackageParallelCommandsLimit());
+  }
 }

http://git-wip-us.apache.org/repos/asf/ambari/blob/5314c690/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProviderTest.java
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProviderTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProviderTest.java
index 8aeda4f..7ff7c7f 100644
--- a/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProviderTest.java
+++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/internal/ClusterStackVersionResourceProviderTest.java
@@ -36,21 +36,29 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.ambari.server.actionmanager.ActionManager;
+import org.apache.ambari.server.actionmanager.ExecutionCommandWrapper;
 import org.apache.ambari.server.actionmanager.HostRoleCommand;
+import org.apache.ambari.server.actionmanager.Stage;
+import org.apache.ambari.server.actionmanager.StageFactory;
 import org.apache.ambari.server.agent.CommandReport;
+import org.apache.ambari.server.agent.ExecutionCommand;
 import org.apache.ambari.server.api.services.AmbariMetaInfo;
+import org.apache.ambari.server.configuration.Configuration;
 import org.apache.ambari.server.controller.AmbariManagementController;
 import org.apache.ambari.server.controller.RequestStatusResponse;
 import org.apache.ambari.server.controller.ResourceProviderFactory;
 import org.apache.ambari.server.controller.spi.Request;
+import org.apache.ambari.server.controller.spi.RequestStatus;
 import org.apache.ambari.server.controller.spi.Resource;
 import org.apache.ambari.server.controller.spi.ResourceProvider;
 import org.apache.ambari.server.controller.utilities.PropertyHelper;
 import org.apache.ambari.server.orm.GuiceJpaInitializer;
 import org.apache.ambari.server.orm.InMemoryDefaultTestModule;
+import org.apache.ambari.server.orm.PersistenceType;
 import org.apache.ambari.server.orm.dao.ClusterDAO;
 import org.apache.ambari.server.orm.dao.HostDAO;
 import org.apache.ambari.server.orm.dao.RepositoryVersionDAO;
@@ -88,6 +96,7 @@ import com.google.inject.util.Modules;
  */
 public class ClusterStackVersionResourceProviderTest {
 
+  public static final int MAX_TASKS_PER_STAGE = 2;
   private Injector injector;
   private AmbariMetaInfo ambariMetaInfo;
   private RepositoryVersionDAO repositoryVersionDAOMock;
@@ -96,6 +105,8 @@ public class ClusterStackVersionResourceProviderTest {
   private ClusterDAO clusterDAO;
   private HostDAO hostDAO;
   private ConfigHelper configHelper;
+  private Configuration configuration;
+  private StageFactory stageFactory;
 
   private String operatingSystemsJson = "[\n" +
           "   {\n" +
@@ -115,15 +126,21 @@ public class ClusterStackVersionResourceProviderTest {
           "   }\n" +
           "]";
 
+
   @Before
   public void setup() throws Exception {
     // Create instances of mocks
     repositoryVersionDAOMock = createNiceMock(RepositoryVersionDAO.class);
     configHelper = createNiceMock(ConfigHelper.class);
+    InMemoryDefaultTestModule inMemoryModule = new InMemoryDefaultTestModule();
+    Properties properties = inMemoryModule.getProperties();
+    properties.setProperty(Configuration.AGENT_PACKAGE_PARALLEL_COMMANDS_LIMIT_KEY,
+            String.valueOf(MAX_TASKS_PER_STAGE));
+    configuration = new Configuration(properties);
+    stageFactory = createNiceMock(StageFactory.class);
 
     // Initialize injector
-    InMemoryDefaultTestModule module = new InMemoryDefaultTestModule();
-    injector = Guice.createInjector(Modules.override(module).with(new MockModule()));
+    injector = Guice.createInjector(Modules.override(inMemoryModule).with(new MockModule()));
     injector.getInstance(GuiceJpaInitializer.class);
     ambariMetaInfo = injector.getInstance(AmbariMetaInfo.class);
     resourceTypeDAO = injector.getInstance(ResourceTypeDAO.class);
@@ -146,17 +163,16 @@ public class ClusterStackVersionResourceProviderTest {
     Cluster cluster = createNiceMock(Cluster.class);
     StackId stackId = new StackId("HDP", "2.0.1");
 
-    final Host host1 = createNiceMock("host1", Host.class);
-    final Host host2 = createNiceMock("host2", Host.class);
-    expect(host1.getHostName()).andReturn("host1").anyTimes();
-    expect(host1.getOsFamily()).andReturn("redhat6").anyTimes();
-    expect(host2.getHostName()).andReturn("host2").anyTimes();
-    expect(host2.getOsFamily()).andReturn("redhat6").anyTimes();
-    replay(host1, host2);
-    Map<String, Host> hostsForCluster = new HashMap<String, Host>() {{
-      put(host1.getHostName(), host1);
-      put(host2.getHostName(), host2);
-    }};
+    Map<String, Host> hostsForCluster = new HashMap<String, Host>();
+    int hostCount = 10;
+    for (int i = 0; i < hostCount; i++) {
+      String hostname = "host" + i;
+      Host host = createNiceMock(hostname, Host.class);
+      expect(host.getHostName()).andReturn(hostname).anyTimes();
+      expect(host.getOsFamily()).andReturn("redhat6").anyTimes();
+      replay(host);
+      hostsForCluster.put(hostname, host);
+    }
 
     ServiceComponentHost sch = createMock(ServiceComponentHost.class);
     List<ServiceComponentHost> schs = Collections.singletonList(sch);
@@ -188,7 +204,7 @@ public class ClusterStackVersionResourceProviderTest {
             (Map<String, String>) anyObject(List.class), anyObject(String.class))).andReturn(packages).anyTimes();
 
     expect(resourceProviderFactory.getHostResourceProvider(anyObject(Set.class), anyObject(Map.class),
-        eq(managementController))).andReturn(csvResourceProvider).anyTimes();
+            eq(managementController))).andReturn(csvResourceProvider).anyTimes();
 
     expect(clusters.getCluster(anyObject(String.class))).andReturn(cluster);
     expect(clusters.getHostsForCluster(anyObject(String.class))).andReturn(hostsForCluster);
@@ -198,16 +214,33 @@ public class ClusterStackVersionResourceProviderTest {
 
     expect(sch.getServiceName()).andReturn("HIVE").anyTimes();
 
+    ExecutionCommand executionCommand = createNiceMock(ExecutionCommand.class);
+    ExecutionCommandWrapper executionCommandWrapper = createNiceMock(ExecutionCommandWrapper.class);
+
+    expect(executionCommandWrapper.getExecutionCommand()).andReturn(executionCommand).anyTimes();
+
+    Stage stage = createNiceMock(Stage.class);
+    expect(stage.getExecutionCommandWrapper(anyObject(String.class), anyObject(String.class))).
+            andReturn(executionCommandWrapper).anyTimes();
+
+    // Check that we create proper stage count
+    expect(stageFactory.createNew(anyLong(), anyObject(String.class),
+            anyObject(String.class), anyLong(),
+            anyObject(String.class), anyObject(String.class), anyObject(String.class),
+            anyObject(String.class))).andReturn(stage).
+            times((int) Math.ceil(hostCount / MAX_TASKS_PER_STAGE));
+
     expect(
-        repositoryVersionDAOMock.findByStackAndVersion(
-            anyObject(StackId.class),
-            anyObject(String.class))).andReturn(repoVersion);
+            repositoryVersionDAOMock.findByStackAndVersion(
+                    anyObject(StackId.class),
+                    anyObject(String.class))).andReturn(repoVersion);
 
     expect(actionManager.getRequestTasks(anyLong())).andReturn(Collections.<HostRoleCommand>emptyList()).anyTimes();
 
     // replay
     replay(managementController, response, clusters, resourceProviderFactory, csvResourceProvider,
-        cluster, repositoryVersionDAOMock, configHelper, sch, actionManager);
+        cluster, repositoryVersionDAOMock, configHelper, sch, actionManager,
+            executionCommand, executionCommandWrapper,stage, stageFactory);
 
     ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
         type,
@@ -233,10 +266,11 @@ public class ClusterStackVersionResourceProviderTest {
     // create the request
     Request request = PropertyHelper.getCreateRequest(propertySet, null);
 
-    provider.createResources(request);
+    RequestStatus status = provider.createResources(request);
 
     // verify
-    verify(managementController, response, clusters);
+    verify(managementController, response, clusters, stageFactory);
+
   }
 
 
@@ -356,7 +390,8 @@ public class ClusterStackVersionResourceProviderTest {
 
     // replay
     replay(managementController, response, clusters, resourceProviderFactory, csvResourceProvider,
-            cluster, repositoryVersionDAOMock, configHelper, sch, actionManager, finalizeUpgradeAction, report);
+            cluster, repositoryVersionDAOMock, configHelper, sch, actionManager, finalizeUpgradeAction, report,
+            stageFactory);
 
     ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
             type,
@@ -394,7 +429,8 @@ public class ClusterStackVersionResourceProviderTest {
     protected void configure() {
       bind(RepositoryVersionDAO.class).toInstance(repositoryVersionDAOMock);
       bind(ConfigHelper.class).toInstance(configHelper);
-      //bind(FinalizeUpgradeAction.class).toInstance(finalizeUpgradeAction);
+      bind(Configuration.class).toInstance(configuration);
+      bind(StageFactory.class).toInstance(stageFactory);
     }
   }
 }