You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:17 UTC

[07/49] incubator-gearpump git commit: fix #1943 allow user to config how many executors to use in an application

fix #1943 allow user to config how many executors to use in an application


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/47d867d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/47d867d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/47d867d2

Branch: refs/heads/master
Commit: 47d867d276dfa9adf3508e030bd7f3258291727b
Parents: 313b6c4
Author: huafengw <fv...@gmail.com>
Authored: Tue Feb 2 18:01:14 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:23:24 2016 +0800

----------------------------------------------------------------------
 conf/gear.conf                                  |   5 +-
 core/src/main/resources/geardefault.conf        |   9 +-
 .../appmaster/ExecutorSystemScheduler.scala     |  12 +--
 .../gearpump/cluster/client/ClientContext.scala |  43 ++++----
 .../gearpump/cluster/scheduler/Resource.scala   |   6 +-
 .../main/scala/io/gearpump/util/Constants.scala |   2 +-
 core/src/test/resources/test.conf               |   2 +
 .../io/gearpump/cluster/main/AppSubmitter.scala |   2 +
 .../cluster/scheduler/PriorityScheduler.scala   | 103 +++++++++++--------
 .../scheduler/PrioritySchedulerSpec.scala       |  44 +++++++-
 docs/commandline.md                             |   2 +-
 .../experiments/storm/main/GearpumpNimbus.scala |   3 +-
 .../checklist/ConnectorKafkaSpec.scala          |   4 +-
 .../checklist/DynamicDagSpec.scala              |   2 +-
 .../integrationtest/checklist/ExampleSpec.scala |  13 +--
 .../checklist/RestServiceSpec.scala             |  26 ++---
 .../minicluster/CommandLineClient.scala         |   4 +-
 .../minicluster/RestClient.scala                |   4 +-
 services/dashboard/services/restapi.js          |  12 +--
 .../dashboard/views/apps/submit/submit.html     |   4 +
 services/dashboard/views/apps/submit/submit.js  |   3 +-
 .../io/gearpump/services/MasterService.scala    |  10 +-
 .../streaming/appmaster/TaskSchedulerImpl.scala |   6 +-
 .../streaming/appmaster/JarSchedulerSpec.scala  |   4 +-
 .../streaming/appmaster/TaskSchedulerSpec.scala |   4 +-
 25 files changed, 192 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/conf/gear.conf
----------------------------------------------------------------------
diff --git a/conf/gear.conf b/conf/gear.conf
index 84694a8..1ab4bb6 100644
--- a/conf/gear.conf
+++ b/conf/gear.conf
@@ -62,11 +62,14 @@ gearpump {
   ### When the resource cannot be allocated in the timeout, then
   ### the appmaster will shutdown itself.
   resource-allocation-timeout-seconds = 120
-  
+
   ##
   ## Executor share same process of worker
   worker.executor-share-same-jvm-as-worker = false
 
+  ## Number of executors to launch when starting an application
+  application.executor-num = 1
+
   ###########################
   ### Change the dispather for tasks
   ### If you don't know what this is about, don't change it

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf
index 16bd8c0..17626a3 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -31,14 +31,8 @@ gearpump {
   ## The installation folder of gearpump
   home = ""
 
-
-
-
-
   serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool"
 
-
-
   ## How many slots each worker contains
   worker.slots = 1000
 
@@ -46,6 +40,9 @@ gearpump {
   ## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to enable CGroup support.
   worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
 
+  ## Number of executors to launch when starting an application
+  application.executor-num = 1
+
   ## To enable worker use cgroup to make resource isolation,
   ## set gearpump.worker.executor-process-launcher = "io.gearpump.cluster.worker.CGroupProcessLauncher"
   ##

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
index 0b35db9..6af68eb 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
@@ -70,20 +70,14 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
 
   def resourceAllocationMessageHandler: Receive = {
     case ResourceAllocatedForSession(allocations, session) =>
-
       if (isSessionAlive(session)) {
-        val groupedResource = allocations.groupBy(_.worker).mapValues {
-          _.reduce((resourceA, resourceB) =>
-            resourceA.copy(resource = (resourceA.resource + resourceB.resource)))
-        }.toArray
-
-        groupedResource.map((workerAndResources) => {
-          val ResourceAllocation(resource, worker, workerId) = workerAndResources._2
+        allocations.foreach { resourceAllocation =>
+          val ResourceAllocation(resource, worker, workerId) = resourceAllocation
 
           val launcher = context.actorOf(executorSystemLauncher(appId, session))
           launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource)
           currentSystemId = currentSystemId + 1
-        })
+        }
       }
     case ResourceAllocationTimeOut(session) =>
       if (isSessionAlive(session)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
index 536d8c1..3a2868c 100644
--- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
@@ -22,23 +22,21 @@ import java.util.concurrent.TimeUnit
 
 import akka.actor.{ActorRef, ActorSystem}
 import akka.util.Timeout
-import com.typesafe.config.{ConfigFactory, Config}
-import io.gearpump.cluster.MasterToAppMaster.{ReplayFromTimestampWindowTrailingEdge, AppMastersData}
+import com.typesafe.config.{ConfigValueFactory, Config}
+import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
 import io.gearpump.cluster.MasterToClient.ReplayApplicationResult
 import io.gearpump.cluster._
 import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.jarstore.{FilePath, JarStoreService}
+import io.gearpump.jarstore.JarStoreService
 import io.gearpump.util.Constants._
 import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
 import org.slf4j.Logger
 
 import scala.collection.JavaConversions._
-import scala.concurrent.Await
+import scala.concurrent.{Await, Future}
 import scala.concurrent.duration.Duration
-import scala.concurrent.Future
 import scala.util.Try
 
-
 /**
  * ClientContext is a user facing util to submit/manage an application.
  */
@@ -75,33 +73,30 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
    * "gearpump.app.jar" if defined. Otherwise, will assume the jar is on
    * the target runtime classpath, and will not send it.
    */
-  def submit(app : Application) : Int = {
+  def submit(app: Application): Int = {
     submit(app, System.getProperty(GEARPUMP_APP_JAR))
   }
 
+  def submit(app: Application, jar: String): Int = {
+    submit(app, jar, getExecutorNum())
+  }
 
-  def submit(app : Application, jar: String) : Int = {
-    import app.{name, appMaster, userConfig}
+  def submit(app: Application, jar: String, executorNum: Int): Int = {
+    val client = getMasterClient
+    val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
     val submissionConfig = getSubmissionConfig(config)
-    val appDescription = AppDescription(name, appMaster.getName, userConfig, submissionConfig)
-    submit(appDescription, jar)
+      .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
+    val appDescription = AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
+    val appJar = Option(jar).map(loadFile)
+    client.submitApplication(appDescription, appJar)
   }
 
-  import scala.collection.JavaConverters._
-  private def getSubmissionConfig(config: Config): Config = {
-    ClusterConfig.filterOutDefaultConfig(config)
+  private def getExecutorNum(): Int = {
+    Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
   }
 
-  private def submit(app : AppDescription, jarPath: String) : Int = {
-    val client = getMasterClient
-    val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
-    val updatedApp = AppDescription(appName, app.appMaster, app.userConfig, app.clusterConfig)
-    if (jarPath == null) {
-      client.submitApplication(updatedApp, None)
-    } else {
-      val appJar = loadFile(jarPath)
-      client.submitApplication(updatedApp, Option(appJar))
-    }
+  private def getSubmissionConfig(config: Config): Config = {
+    ClusterConfig.filterOutDefaultConfig(config)
   }
 
   def replayFromTimestampWindowTrailingEdge(appId : Int): ReplayApplicationResult = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
index 574c8bd..94c7532 100644
--- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
+++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
@@ -26,8 +26,12 @@ case class Resource(slots : Int) {
 
   def >(other : Resource): Boolean = slots > other.slots
 
+  def >=(other : Resource): Boolean = !(this < other)
+
   def <(other : Resource): Boolean = slots < other.slots
 
+  def <=(other : Resource): Boolean = !(this > other)
+
   def equals(other : Resource): Boolean = slots == other.slots
 
   def isEmpty: Boolean = slots == 0
@@ -47,7 +51,7 @@ object Relaxation extends Enumeration{
 
 import Relaxation._
 import Priority._
-case class ResourceRequest(resource: Resource,  workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY)
+case class ResourceRequest(resource: Resource,  workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1)
 
 case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : Int)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala
index 084d99b..ecf09e4 100644
--- a/core/src/main/scala/io/gearpump/util/Constants.scala
+++ b/core/src/main/scala/io/gearpump/util/Constants.scala
@@ -162,5 +162,5 @@ object Constants {
 
   val PREFER_IPV4 = "java.net.preferIPv4Stack"
 
-
+  val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num"
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/test/resources/test.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test.conf b/core/src/test/resources/test.conf
index 3deb28b..324e8bd 100644
--- a/core/src/test/resources/test.conf
+++ b/core/src/test/resources/test.conf
@@ -12,6 +12,8 @@ gearpump {
   ### the appmaster will shutdown itself.
   resource-allocation-timeout-seconds = 10
 
+  application.executor-num = 1
+
   worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
 
   cluster {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
index acb6247..90f653c 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
@@ -37,6 +37,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
   override val options: Array[(String, CLIOption[Any])] = Array(
     "namePrefix" -> CLIOption[String]("<application name prefix>", required = false, defaultValue = Some("")),
     "jar" -> CLIOption("<application>.jar", required = true),
+    "executors" -> CLIOption[Int]("number of executor to launch", required = false, defaultValue = Some(1)),
     "verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)),
     // For document purpose only, OPTION_CONFIG option is not used here.
     // OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
@@ -58,6 +59,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
 
     // Set jar path to be submitted to cluster
     System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
+    System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
 
     val namePrefix = config.getString("namePrefix")
     if (namePrefix.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
index c3121d8..10e9dcb 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -26,14 +26,13 @@ import io.gearpump.cluster.scheduler.Scheduler.PendingRequest
 
 import scala.collection.mutable
 
-class PriorityScheduler extends Scheduler{
-
+class PriorityScheduler extends Scheduler {
   private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
 
   def requestOrdering = new Ordering[PendingRequest] {
     override def compare(x: PendingRequest, y: PendingRequest) = {
       var res = x.request.priority.id - y.request.priority.id
-      if(res == 0)
+      if (res == 0)
         res = y.timeStamp.compareTo(x.timeStamp)
       res
     }
@@ -45,23 +44,30 @@ class PriorityScheduler extends Scheduler{
     var scheduleLater = Array.empty[PendingRequest]
     val resourcesSnapShot = resources.clone()
     var allocated = Resource.empty
-    val totalResource = resourcesSnapShot.foldLeft(Resource.empty){ (totalResource, workerWithResource) =>
-      val (_, (_, resource)) = workerWithResource
-      totalResource + resource
-    }
+    val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
 
-    while(resourceRequests.nonEmpty && (allocated < totalResource)) {
+    while (resourceRequests.nonEmpty && (allocated < totalResource)) {
       val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
       request.relaxation match {
         case ANY =>
-          val newAllocated = allocateFairly(resourcesSnapShot, PendingRequest(appId, appMaster, request, timeStamp))
+          val allocations = allocateFairly(resourcesSnapShot, request)
+          val newAllocated = Resource(allocations.map(_.resource.slots).sum)
+          if (allocations.nonEmpty) {
+            appMaster ! ResourceAllocated(allocations.toArray)
+          }
+          if (newAllocated < request.resource) {
+            val remainingRequest = request.resource - newAllocated
+            val remainingExecutors = request.executorNum - allocations.length
+            val newResourceRequest = request.copy(resource = remainingRequest, executorNum = remainingExecutors)
+            scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
+          }
           allocated = allocated + newAllocated
         case ONEWORKER =>
-          val availableResource = resourcesSnapShot.find{params =>
+          val availableResource = resourcesSnapShot.find { params =>
             val (_, (_, resource)) = params
             resource > request.resource
           }
-          if(availableResource.nonEmpty){
+          if (availableResource.nonEmpty) {
             val (workerId, (worker, resource)) = availableResource.get
             allocated = allocated + request.resource
             appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, workerId)))
@@ -70,13 +76,12 @@ class PriorityScheduler extends Scheduler{
             scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
           }
         case SPECIFICWORKER =>
-          if (resourcesSnapShot.contains(request.workerId)) {
-            val (worker, availableResource) = resourcesSnapShot.get(request.workerId).get
-            if (availableResource > request.resource) {
-              appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, request.workerId)))
-              allocated = allocated  + request.resource
-              resourcesSnapShot.update(request.workerId, (worker, availableResource - request.resource))
-            }
+          val workerAndResource = resourcesSnapShot.get(request.workerId)
+          if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
+            val (worker, availableResource) = workerAndResource.get
+            appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, request.workerId)))
+            allocated = allocated + request.resource
+            resourcesSnapShot.update(request.workerId, (worker, availableResource - request.resource))
           } else {
             scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
           }
@@ -88,38 +93,48 @@ class PriorityScheduler extends Scheduler{
 
   def resourceRequestHandler: Receive = {
     case RequestResource(appId, request) =>
-      LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, relaxation: ${request.relaxation}")
+      LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, relaxation: ${request.relaxation}," +
+        s" executor number: ${request.executorNum}")
       val appMaster = sender()
       resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, System.currentTimeMillis()))
       allocateResource()
   }
 
-  private def allocateFairly(resources : mutable.HashMap[Int, (ActorRef, Resource)], pendindRequest : PendingRequest): Resource ={
-    val length = resources.size
-    val flattenResource = resources.toArray.zipWithIndex.flatMap((workerWithIndex) => {
-      val ((workerId, (worker, resource)), index) = workerWithIndex
-      0.until(resource.slots).map((seq) => ((workerId, worker), seq * length + index))
-    }).sortBy(_._2).map(_._1)
-    val PendingRequest(appId, appMaster, request, timeStamp) = pendindRequest
-    val total = Resource(flattenResource.size)
-
-    val newAllocated = Resource.min(total, request.resource)
-    val singleAllocation = flattenResource.take(newAllocated.slots)
-      .groupBy((actor) => actor).mapValues(_.length).toArray.map((params) => {
-      val ((workerId, worker), slots) = params
-      resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
-      ResourceAllocation(Resource(slots), worker, workerId)
-    })
-    pendindRequest.appMaster ! ResourceAllocated(singleAllocation)
-    if (pendindRequest.request.resource > newAllocated) {
-      resourceRequests.enqueue(
-        PendingRequest(appId, appMaster,
-          ResourceRequest(request.resource - newAllocated, request.workerId, request.priority), timeStamp))
-    }
-    newAllocated
-  }
-
   override def doneApplication(appId: Int): Unit = {
     resourceRequests = resourceRequests.filter(_.appId != appId)
   }
+
+  private def allocateFairly(resources: mutable.HashMap[Int, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = {
+    val workerNum = resources.size
+    var allocations = List.empty[ResourceAllocation]
+    var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
+    var remainingRequest = request.resource
+    var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
+
+    while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
+      val exeutorNum = Math.min(workerNum, remainingExecutors)
+      val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
+
+      val flattenResource = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse).take(exeutorNum).zipWithIndex.flatMap { workerWithIndex =>
+        val ((workerId, (worker, resource)), index) = workerWithIndex
+        0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
+      }.sortBy(_._2).map(_._1)
+
+      if (flattenResource.length < toRequest.slots) {
+        //Can not safisfy the user's requirements
+        totalAvailable = Resource.empty
+      } else {
+        flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
+          toArray.foreach { params =>
+          val ((workerId, worker), slots) = params
+          resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
+          allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
+        }
+        totalAvailable -= toRequest
+        remainingRequest -= toRequest
+        remainingExecutors -= exeutorNum
+      }
+    }
+    allocations
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
index ccfd453..0a7e1f8 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -19,7 +19,6 @@ package io.gearpump.cluster.scheduler
 
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import io.gearpump.cluster.master.Master.MasterInfo
 import io.gearpump.cluster.AppMasterToMaster.RequestResource
 import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
@@ -84,12 +83,11 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))))
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))))
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))))
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(10), mockWorker1.ref, workerId1))))
 
       scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
       scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
-      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(30), mockWorker2.ref, workerId2))))
+      mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))))
     }
   }
 
@@ -124,7 +122,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))))
 
-      val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY)
+      val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY, executorNum = 2)
       scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))))
 
@@ -136,4 +134,42 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
       mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))))
     }
   }
+
+  "The PriorityScheduler" should {
+    "handle the resource request with different executor number" in {
+      val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+      scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+      scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+      //By default, the request requires only one executor
+      val request2 = ResourceRequest(Resource(20))
+      scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+      val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations2.allocations.length == 1)
+      assert(allocations2.allocations.head.resource == Resource(20))
+
+      val request3 = ResourceRequest(Resource(24), executorNum = 3)
+      scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+      val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations3.allocations.length == 3)
+      assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+      //The total available resource can not satisfy the requirements with executor number
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
+      scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
+      val request4 = ResourceRequest(Resource(60), executorNum = 3)
+      scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+      val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations4.allocations.length == 2)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+      //When new resources are available, the remaining request will be satisfied
+      scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
+      val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+      assert(allocations5.allocations.length == 1)
+      assert(allocations4.allocations.forall(_.resource == Resource(20)))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/docs/commandline.md
----------------------------------------------------------------------
diff --git a/docs/commandline.md b/docs/commandline.md
index 2c9334c..7e264f9 100644
--- a/docs/commandline.md
+++ b/docs/commandline.md
@@ -18,7 +18,7 @@ If you use Maven you can have a look [here](https://maven.apache.org/plugins/mav
 You can use the command `gear` under the bin directory to submit, query and terminate an application:
 
 ```bash
-gear app [-namePrefix <application name prefix>] [-conf <custom gearpump config file>] -jar xx.jar MainClass <arg1> <arg2> ...
+gear app [-namePrefix <application name prefix>] [-executors <number of executors to launch>] [-conf <custom gearpump config file>] -jar xx.jar MainClass <arg1> <arg2> ...
 ```
 
 ### List all running applications

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
index e9973e5..c16ab41 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -139,13 +139,14 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe
     implicit val system = clientContext.system
     val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
     val stormConfig = gearpumpStormTopology.getStormConfig
+    val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1)
     val processorGraph = GraphBuilder.build(gearpumpStormTopology)
     val config = UserConfig.empty
           .withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
           .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
     val app = StreamApplication(name, processorGraph, config)
     LOG.info(s"jar file uploaded to $uploadedJarLocation")
-    val appId = clientContext.submit(app, uploadedJarLocation)
+    val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
     applications += name -> appId
     topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation)
     LOG.info(s"Storm Application $appId submitted")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
index 8e294ea..59c956b 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
@@ -62,7 +62,7 @@ class ConnectorKafkaSpec extends TestSpecBase {
         "-sourceTopic", sourceTopic,
         "-sinkTopic", sinkTopic).mkString(" ")
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(kafkaJar, args)
+      val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
       success shouldBe true
 
       // verify
@@ -90,7 +90,7 @@ class ConnectorKafkaSpec extends TestSpecBase {
         "-sinkTopic", sinkTopic,
         "-source", sourcePartitionNum).mkString(" ")
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(kafkaJar, args)
+      val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
       success shouldBe true
 
       // verify #1

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
index 5a60274..44f98aa 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -121,7 +121,7 @@ class DynamicDagSpec extends TestSpecBase {
 
   private def expectSolJarSubmittedWithAppId(): Int = {
     val appId = restClient.getNextAvailableAppId()
-    val success = restClient.submitApp(solJar)
+    val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
     success shouldBe true
     expectAppIsRunning(appId, solName)
     Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
index ac18807..2772bc3 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
@@ -32,7 +32,7 @@ class ExampleSpec extends TestSpecBase {
       val mainClass = "io.gearpump.examples.distributedshell.DistributedShell"
       val clientClass = "io.gearpump.examples.distributedshell.DistributedShellClient"
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(distShellJar, mainClass)
+      val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass)
       success shouldBe true
       expectAppIsRunning(appId, "DistributedShell")
       val args = Array(
@@ -43,7 +43,8 @@ class ExampleSpec extends TestSpecBase {
       val expectedHostNames = cluster.getWorkerHosts.map(Docker.execAndCaptureOutput(_, "hostname"))
 
       def verify(): Boolean = {
-        val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, args.mkString(" ")).split("\n").
+        val workerNum = cluster.getWorkerHosts.length
+        val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, workerNum, args.mkString(" ")).split("\n").
           filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
         expectedHostNames.forall(result.contains)
       }
@@ -59,7 +60,7 @@ class ExampleSpec extends TestSpecBase {
     "can submit immediately after killing a former one" in {
       // setup
       val formerAppId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(wordCountJar)
+      val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       formerSubmissionSuccess shouldBe true
       expectAppIsRunning(formerAppId, wordCountName)
       Util.retryUntil(restClient.queryStreamingAppDetail(formerAppId).clock > 0)
@@ -67,7 +68,7 @@ class ExampleSpec extends TestSpecBase {
 
       // exercise
       val appId = formerAppId + 1
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe true
       expectAppIsRunning(appId, wordCountName)
     }
@@ -97,7 +98,7 @@ class ExampleSpec extends TestSpecBase {
     "can obtain application clock and the clock will keep changing" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(jar)
+      val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
       success shouldBe true
       expectAppIsRunning(appId, appName)
 
@@ -110,7 +111,7 @@ class ExampleSpec extends TestSpecBase {
     "can change the parallelism and description of a processor" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(jar)
+      val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length)
       formerSubmissionSuccess shouldBe true
       expectAppIsRunning(appId, appName)
       val formerProcessors = restClient.queryStreamingAppDetail(appId).processors

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
index cf2ca03..2830390 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -43,7 +43,7 @@ class RestServiceSpec extends TestSpecBase {
     "retrieve 1 application after the first application submission" in {
       // exercise
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe true
       expectAppIsRunning(appId, wordCountName)
       restClient.listRunningApps().length shouldEqual 1
@@ -54,7 +54,7 @@ class RestServiceSpec extends TestSpecBase {
     "find a running application after submission" in {
       // exercise
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe true
       expectAppIsRunning(appId, wordCountName)
     }
@@ -62,18 +62,18 @@ class RestServiceSpec extends TestSpecBase {
     "reject a repeated submission request while the application is running" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
-      val formerSubmissionSuccess = restClient.submitApp(wordCountJar)
+      val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       formerSubmissionSuccess shouldBe true
       expectAppIsRunning(appId, wordCountName)
 
       // exercise
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe false
     }
 
     "reject an invalid submission (the jar file path is incorrect)" in {
       // exercise
-      val success = restClient.submitApp(wordCountJar + ".missing")
+      val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length)
       success shouldBe false
     }
 
@@ -84,7 +84,7 @@ class RestServiceSpec extends TestSpecBase {
       val appId = restClient.getNextAvailableAppId()
 
       // exercise
-      val success = restClient.submitApp(wordCountJar, s"-split $splitNum -sum $sumNum")
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, s"-split $splitNum -sum $sumNum")
       success shouldBe true
       expectAppIsRunning(appId, wordCountName)
       val processors = restClient.queryStreamingAppDetail(appId).processors
@@ -98,7 +98,7 @@ class RestServiceSpec extends TestSpecBase {
     "can obtain application metrics and the metrics will keep changing" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe true
       expectAppIsRunning(appId, wordCountName)
 
@@ -122,7 +122,7 @@ class RestServiceSpec extends TestSpecBase {
     "can obtain application corresponding executors' metrics and the metrics will keep changing" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe true
       expectAppIsRunning(appId, wordCountName)
 
@@ -148,7 +148,7 @@ class RestServiceSpec extends TestSpecBase {
     "a running application should be killed" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe true
       expectAppIsRunning(appId, wordCountName)
 
@@ -159,7 +159,7 @@ class RestServiceSpec extends TestSpecBase {
     "should fail when attempting to kill a stopped application" in {
       // setup
       val appId = restClient.getNextAvailableAppId()
-      val submissionSucess = restClient.submitApp(wordCountJar)
+      val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       submissionSucess shouldBe true
       expectAppIsRunning(appId, wordCountName)
       killAppAndVerify(appId)
@@ -291,7 +291,7 @@ class RestServiceSpec extends TestSpecBase {
       val appId = restClient.getNextAvailableAppId()
 
       // exercise
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe true
       restClient.queryExecutorBrief(appId).foreach { executor =>
         val executorId = executor.executorId
@@ -308,7 +308,7 @@ class RestServiceSpec extends TestSpecBase {
       val appId = restClient.getNextAvailableAppId()
 
       // exercise
-      val success = restClient.submitApp(wordCountJar)
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
       success shouldBe true
       val actual = restClient.queryAppMasterConfig(appId)
       actual.hasPath("gearpump") shouldBe true
@@ -322,7 +322,7 @@ class RestServiceSpec extends TestSpecBase {
       val originSplitNum = 4
       val originSumNum = 3
       val originAppId = restClient.getNextAvailableAppId()
-      val success = restClient.submitApp(wordCountJar, s"-split $originSplitNum -sum $originSumNum")
+      val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, s"-split $originSplitNum -sum $originSumNum")
       success shouldBe true
       expectAppIsRunning(originAppId, wordCountName)
       val originAppDetail = restClient.queryStreamingAppDetail(originAppId)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
index d923a6d..506fc67 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
@@ -49,8 +49,8 @@ class CommandLineClient(host: String) {
       ""
   }
 
-  def submitAppAndCaptureOutput(jar: String, args: String = ""): String = {
-    execAndCaptureOutput(s"gear app -verbose true -jar $jar $args")
+  def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
+    execAndCaptureOutput(s"gear app -verbose true -jar $jar -executors $executorNum $args")
   }
 
   def submitApp(jar: String, args: String = ""): Int = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
index 2b3003c..2657bb3 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
@@ -87,10 +87,10 @@ class RestClient(host: String, port: Int) {
     listApps().length + 1
   }
 
-  def submitApp(jar: String, args: String = "", config: String = ""): Boolean = try {
+  def submitApp(jar: String, executorNum: Int, args: String = "", config: String = ""): Boolean = try {
     var endpoint = "master/submitapp"
     if (args.length > 0) {
-      endpoint += "?args=" + Util.encodeUriComponent(args)
+      endpoint += s"?executorNum=${executorNum}&args=" + Util.encodeUriComponent(args)
     }
     var options = Seq(s"jar=@$jar")
     if (config.length > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/services/restapi.js
----------------------------------------------------------------------
diff --git a/services/dashboard/services/restapi.js b/services/dashboard/services/restapi.js
index 386d319..d52d89d 100644
--- a/services/dashboard/services/restapi.js
+++ b/services/dashboard/services/restapi.js
@@ -118,19 +118,19 @@ angular.module('dashboard')
         },
 
         /** Submit an user defined application with user configuration */
-        submitUserApp: function(files, formFormNames, args, onComplete) {
+        submitUserApp: function(files, formFormNames, executorNum, args, onComplete) {
           return self._submitApp(restapiV1Root + 'master/submitapp',
-            files, formFormNames, args, onComplete);
+            files, formFormNames, executorNum, args, onComplete);
         },
 
         /** Submit a Storm application */
-        submitStormApp: function(files, formFormNames, args, onComplete) {
+        submitStormApp: function(files, formFormNames, executorNum, args, onComplete) {
           return self._submitApp(restapiV1Root + 'master/submitstormapp',
-            files, formFormNames, args, onComplete);
+            files, formFormNames, executorNum, args, onComplete);
         },
 
-        _submitApp: function(url, files, formFormNames, args, onComplete) {
-          var params = args ? '?args=' + encodeURIComponent(args) : '';
+        _submitApp: function(url, files, formFormNames, executorNum, args, onComplete) {
+          var params = '?executorNum=' + executorNum + '&args=' + encodeURIComponent(args);
           var upload = Upload.upload({
             url: url + params,
             method: 'POST',

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/views/apps/submit/submit.html
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/submit/submit.html b/services/dashboard/views/apps/submit/submit.html
index 140fe17..bc4af98 100644
--- a/services/dashboard/views/apps/submit/submit.html
+++ b/services/dashboard/views/apps/submit/submit.html
@@ -41,6 +41,10 @@
               ng-model="conf" accept-pattern="{{confFileSuffix}}"></form-control>
             <!-- input 3 -->
             <form-control
+              type="integer" min="1" label="Executor Number" ng-hide="isStormApp"
+              ng-model="executorNum"></form-control>
+            <!-- input 4 -->
+            <form-control
               type="text" label="Arguments"
               help="Application specific launch arguments (optional). E.g. WordCount can use &quot;-split 2 -sum 1&quot;"
               ng-model="launchArgs"></form-control>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/views/apps/submit/submit.js
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/submit/submit.js b/services/dashboard/views/apps/submit/submit.js
index 4ca1895..5e967f5 100644
--- a/services/dashboard/views/apps/submit/submit.js
+++ b/services/dashboard/views/apps/submit/submit.js
@@ -10,6 +10,7 @@ angular.module('dashboard')
 
       $scope.dialogTitle = 'Submit Gearpump Application';
       $scope.confFileSuffix = '.conf';
+      $scope.executorNum = 1;
       var submitFn = restapi.submitUserApp;
       if ($scope.isStormApp) {
         $scope.dialogTitle = 'Submit Storm Application';
@@ -29,7 +30,7 @@ angular.module('dashboard')
           fileFormNames.push('conf');
         }
         $scope.uploading = true;
-        submitFn(files, fileFormNames, $scope.launchArgs, function(response) {
+        submitFn(files, fileFormNames, $scope.executorNum, $scope.launchArgs, function(response) {
           $scope.shouldNoticeSubmitFailed = !response.success;
           $scope.uploading = false;
           if (response.success) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index 839c0ae..dd0e719 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -109,12 +109,12 @@ class MasterService(val master: ActorRef,
     } ~
     path("submitapp") {
       post {
-        parameters("args" ? "") { args: String =>
+        parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) =>
           uploadFile { fileMap =>
             val jar = fileMap.get("jar").map(_.file)
             val userConf = fileMap.get("conf").map(_.file)
             onComplete(Future(
-              MasterService.submitGearApp(jar, args, systemConfig, userConf)
+              MasterService.submitGearApp(jar, executorNum, args, systemConfig, userConf)
             )) {
               case Success(success) =>
                 val response = MasterService.AppSubmissionResult(success)
@@ -128,7 +128,7 @@ class MasterService(val master: ActorRef,
     } ~
     path("submitstormapp") {
       post {
-        parameters("args" ? "") { args: String =>
+        parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) =>
           uploadFile { fileMap =>
             val jar = fileMap.get("jar").map(_.file)
             val stormConf = fileMap.get("conf").map(_.file)
@@ -201,10 +201,10 @@ object MasterService {
   /**
    * Submit Native Application.
    */
-  def submitGearApp(jar: Option[File], args: String, systemConfig: Config, userConfigFile: Option[File]): Boolean = {
+  def submitGearApp(jar: Option[File], executorNum: Int, args: String, systemConfig: Config, userConfigFile: Option[File]): Boolean = {
     submitAndDeleteTempFiles(
       "io.gearpump.cluster.main.AppSubmitter",
-      argsArray = spaceSeparatedArgumentsToArray(args),
+      argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args),
       fileMap = Map("jar" -> jar).filter(_._2.isDefined).mapValues(_.get),
       classPath = getUserApplicationClassPath,
       systemConfig,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
index 6d589f5..9510531 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
@@ -24,7 +24,7 @@ import io.gearpump.streaming.DAG
 import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality}
 import io.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus}
 import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.LogUtil
+import io.gearpump.util.{Constants, LogUtil}
 import org.slf4j.Logger
 
 /**
@@ -80,7 +80,7 @@ object TaskScheduler {
 }
 
 class TaskSchedulerImpl(appId : Int, appName: String, config: Config)  extends TaskScheduler {
-  private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+  private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER)
 
   private var tasks = List.empty[TaskStatus]
 
@@ -121,7 +121,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config)  extends T
     workersResourceRequest.map {workerIdAndResource =>
       val (workerId, resource) = workerIdAndResource
       if (workerId == WORKER_NO_PREFERENCE) {
-        ResourceRequest(resource)
+        ResourceRequest(resource, executorNum = executorNum)
       } else {
         ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER)
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
index bc3f265..d9ebb37 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -20,7 +20,7 @@ package io.gearpump.streaming.appmaster
 import akka.actor.ActorSystem
 import com.typesafe.config.ConfigFactory
 import io.gearpump.streaming.{ProcessorDescription, DAG}
-import io.gearpump.cluster.AppJar
+import io.gearpump.cluster.{TestUtil, AppJar}
 import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
 import io.gearpump.jarstore.FilePath
 import io.gearpump.partitioner.{HashPartitioner, Partitioner}
@@ -47,7 +47,7 @@ class JarSchedulerSpec extends WordSpec with Matchers {
     "schedule tasks depends on app jar" in {
       val system = ActorSystem("JarSchedulerSpec")
       implicit val dispatcher = system.dispatcher
-      val manager = new JarScheduler(0, "APP", ConfigFactory.empty(), system)
+      val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
       manager.setDag(dag, Future{0L})
       val requests = Array(ResourceRequest(Resource(2)))
       val result = Await.result(manager.getRequestDetails(), 15 seconds)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index 78caf62..aeef61d 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -23,7 +23,7 @@ import io.gearpump.streaming.appmaster.TaskLocator.Localities
 import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
 import io.gearpump.Message
 import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
-import io.gearpump.cluster.{ClusterConfig, UserConfig}
+import io.gearpump.cluster.{TestUtil, ClusterConfig, UserConfig}
 import io.gearpump.partitioner.{HashPartitioner, Partitioner}
 import io.gearpump.streaming.appmaster.TaskLocator.Localities
 import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
@@ -41,7 +41,7 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
 
   val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
 
-  val config = ClusterConfig.default()
+  val config = TestUtil.DEFAULT_CONFIG
 
   "TaskScheduler" should {
     "schedule tasks on different workers properly according user's configuration" in {