You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:17 UTC
[07/49] incubator-gearpump git commit: fix #1943 allow user to config
how many executors to use in an application
fix #1943 allow user to config how many executors to use in an application
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/47d867d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/47d867d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/47d867d2
Branch: refs/heads/master
Commit: 47d867d276dfa9adf3508e030bd7f3258291727b
Parents: 313b6c4
Author: huafengw <fv...@gmail.com>
Authored: Tue Feb 2 18:01:14 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:23:24 2016 +0800
----------------------------------------------------------------------
conf/gear.conf | 5 +-
core/src/main/resources/geardefault.conf | 9 +-
.../appmaster/ExecutorSystemScheduler.scala | 12 +--
.../gearpump/cluster/client/ClientContext.scala | 43 ++++----
.../gearpump/cluster/scheduler/Resource.scala | 6 +-
.../main/scala/io/gearpump/util/Constants.scala | 2 +-
core/src/test/resources/test.conf | 2 +
.../io/gearpump/cluster/main/AppSubmitter.scala | 2 +
.../cluster/scheduler/PriorityScheduler.scala | 103 +++++++++++--------
.../scheduler/PrioritySchedulerSpec.scala | 44 +++++++-
docs/commandline.md | 2 +-
.../experiments/storm/main/GearpumpNimbus.scala | 3 +-
.../checklist/ConnectorKafkaSpec.scala | 4 +-
.../checklist/DynamicDagSpec.scala | 2 +-
.../integrationtest/checklist/ExampleSpec.scala | 13 +--
.../checklist/RestServiceSpec.scala | 26 ++---
.../minicluster/CommandLineClient.scala | 4 +-
.../minicluster/RestClient.scala | 4 +-
services/dashboard/services/restapi.js | 12 +--
.../dashboard/views/apps/submit/submit.html | 4 +
services/dashboard/views/apps/submit/submit.js | 3 +-
.../io/gearpump/services/MasterService.scala | 10 +-
.../streaming/appmaster/TaskSchedulerImpl.scala | 6 +-
.../streaming/appmaster/JarSchedulerSpec.scala | 4 +-
.../streaming/appmaster/TaskSchedulerSpec.scala | 4 +-
25 files changed, 192 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/conf/gear.conf
----------------------------------------------------------------------
diff --git a/conf/gear.conf b/conf/gear.conf
index 84694a8..1ab4bb6 100644
--- a/conf/gear.conf
+++ b/conf/gear.conf
@@ -62,11 +62,14 @@ gearpump {
### When the resource cannot be allocated in the timeout, then
### the appmaster will shutdown itself.
resource-allocation-timeout-seconds = 120
-
+
##
## Executor share same process of worker
worker.executor-share-same-jvm-as-worker = false
+ ## Number of executors to launch when starting an application
+ application.executor-num = 1
+
###########################
### Change the dispather for tasks
### If you don't know what this is about, don't change it
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/resources/geardefault.conf
----------------------------------------------------------------------
diff --git a/core/src/main/resources/geardefault.conf b/core/src/main/resources/geardefault.conf
index 16bd8c0..17626a3 100644
--- a/core/src/main/resources/geardefault.conf
+++ b/core/src/main/resources/geardefault.conf
@@ -31,14 +31,8 @@ gearpump {
## The installation folder of gearpump
home = ""
-
-
-
-
serializer.pool = "io.gearpump.serializer.FastKryoSerializerPool"
-
-
## How many slots each worker contains
worker.slots = 1000
@@ -46,6 +40,9 @@ gearpump {
## User can switch to "io.gearpump.cluster.worker.CGroupProcessLauncher" to enable CGroup support.
worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
+ ## Number of executors to launch when starting an application
+ application.executor-num = 1
+
## To enable worker use cgroup to make resource isolation,
## set gearpump.worker.executor-process-launcher = "io.gearpump.cluster.worker.CGroupProcessLauncher"
##
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
index 0b35db9..6af68eb 100644
--- a/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
+++ b/core/src/main/scala/io/gearpump/cluster/appmaster/ExecutorSystemScheduler.scala
@@ -70,20 +70,14 @@ class ExecutorSystemScheduler (appId: Int, masterProxy: ActorRef,
def resourceAllocationMessageHandler: Receive = {
case ResourceAllocatedForSession(allocations, session) =>
-
if (isSessionAlive(session)) {
- val groupedResource = allocations.groupBy(_.worker).mapValues {
- _.reduce((resourceA, resourceB) =>
- resourceA.copy(resource = (resourceA.resource + resourceB.resource)))
- }.toArray
-
- groupedResource.map((workerAndResources) => {
- val ResourceAllocation(resource, worker, workerId) = workerAndResources._2
+ allocations.foreach { resourceAllocation =>
+ val ResourceAllocation(resource, worker, workerId) = resourceAllocation
val launcher = context.actorOf(executorSystemLauncher(appId, session))
launcher ! LaunchExecutorSystem(WorkerInfo(workerId, worker), currentSystemId, resource)
currentSystemId = currentSystemId + 1
- })
+ }
}
case ResourceAllocationTimeOut(session) =>
if (isSessionAlive(session)) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
index 536d8c1..3a2868c 100644
--- a/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
+++ b/core/src/main/scala/io/gearpump/cluster/client/ClientContext.scala
@@ -22,23 +22,21 @@ import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorSystem}
import akka.util.Timeout
-import com.typesafe.config.{ConfigFactory, Config}
-import io.gearpump.cluster.MasterToAppMaster.{ReplayFromTimestampWindowTrailingEdge, AppMastersData}
+import com.typesafe.config.{ConfigValueFactory, Config}
+import io.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge}
import io.gearpump.cluster.MasterToClient.ReplayApplicationResult
import io.gearpump.cluster._
import io.gearpump.cluster.master.MasterProxy
-import io.gearpump.jarstore.{FilePath, JarStoreService}
+import io.gearpump.jarstore.JarStoreService
import io.gearpump.util.Constants._
import io.gearpump.util.{ActorUtil, Constants, LogUtil, Util}
import org.slf4j.Logger
import scala.collection.JavaConversions._
-import scala.concurrent.Await
+import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
-import scala.concurrent.Future
import scala.util.Try
-
/**
* ClientContext is a user facing util to submit/manage an application.
*/
@@ -75,33 +73,30 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) {
* "gearpump.app.jar" if defined. Otherwise, will assume the jar is on
* the target runtime classpath, and will not send it.
*/
- def submit(app : Application) : Int = {
+ def submit(app: Application): Int = {
submit(app, System.getProperty(GEARPUMP_APP_JAR))
}
+ def submit(app: Application, jar: String): Int = {
+ submit(app, jar, getExecutorNum())
+ }
- def submit(app : Application, jar: String) : Int = {
- import app.{name, appMaster, userConfig}
+ def submit(app: Application, jar: String, executorNum: Int): Int = {
+ val client = getMasterClient
+ val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
val submissionConfig = getSubmissionConfig(config)
- val appDescription = AppDescription(name, appMaster.getName, userConfig, submissionConfig)
- submit(appDescription, jar)
+ .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum))
+ val appDescription = AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig)
+ val appJar = Option(jar).map(loadFile)
+ client.submitApplication(appDescription, appJar)
}
- import scala.collection.JavaConverters._
- private def getSubmissionConfig(config: Config): Config = {
- ClusterConfig.filterOutDefaultConfig(config)
+ private def getExecutorNum(): Int = {
+ Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1)
}
- private def submit(app : AppDescription, jarPath: String) : Int = {
- val client = getMasterClient
- val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX))
- val updatedApp = AppDescription(appName, app.appMaster, app.userConfig, app.clusterConfig)
- if (jarPath == null) {
- client.submitApplication(updatedApp, None)
- } else {
- val appJar = loadFile(jarPath)
- client.submitApplication(updatedApp, Option(appJar))
- }
+ private def getSubmissionConfig(config: Config): Config = {
+ ClusterConfig.filterOutDefaultConfig(config)
}
def replayFromTimestampWindowTrailingEdge(appId : Int): ReplayApplicationResult = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
index 574c8bd..94c7532 100644
--- a/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
+++ b/core/src/main/scala/io/gearpump/cluster/scheduler/Resource.scala
@@ -26,8 +26,12 @@ case class Resource(slots : Int) {
def >(other : Resource): Boolean = slots > other.slots
+ def >=(other : Resource): Boolean = !(this < other)
+
def <(other : Resource): Boolean = slots < other.slots
+ def <=(other : Resource): Boolean = !(this > other)
+
def equals(other : Resource): Boolean = slots == other.slots
def isEmpty: Boolean = slots == 0
@@ -47,7 +51,7 @@ object Relaxation extends Enumeration{
import Relaxation._
import Priority._
-case class ResourceRequest(resource: Resource, workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY)
+case class ResourceRequest(resource: Resource, workerId: Int = 0, priority: Priority = NORMAL, relaxation: Relaxation = ANY, executorNum: Int = 1)
case class ResourceAllocation(resource : Resource, worker : ActorRef, workerId : Int)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/main/scala/io/gearpump/util/Constants.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/io/gearpump/util/Constants.scala b/core/src/main/scala/io/gearpump/util/Constants.scala
index 084d99b..ecf09e4 100644
--- a/core/src/main/scala/io/gearpump/util/Constants.scala
+++ b/core/src/main/scala/io/gearpump/util/Constants.scala
@@ -162,5 +162,5 @@ object Constants {
val PREFER_IPV4 = "java.net.preferIPv4Stack"
-
+ val APPLICATION_EXECUTOR_NUMBER = "gearpump.application.executor-num"
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/core/src/test/resources/test.conf
----------------------------------------------------------------------
diff --git a/core/src/test/resources/test.conf b/core/src/test/resources/test.conf
index 3deb28b..324e8bd 100644
--- a/core/src/test/resources/test.conf
+++ b/core/src/test/resources/test.conf
@@ -12,6 +12,8 @@ gearpump {
### the appmaster will shutdown itself.
resource-allocation-timeout-seconds = 10
+ application.executor-num = 1
+
worker.executor-process-launcher = "io.gearpump.cluster.worker.DefaultExecutorProcessLauncher"
cluster {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
index acb6247..90f653c 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/main/AppSubmitter.scala
@@ -37,6 +37,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
override val options: Array[(String, CLIOption[Any])] = Array(
"namePrefix" -> CLIOption[String]("<application name prefix>", required = false, defaultValue = Some("")),
"jar" -> CLIOption("<application>.jar", required = true),
+ "executors" -> CLIOption[Int]("number of executor to launch", required = false, defaultValue = Some(1)),
"verbose" -> CLIOption("<print verbose log on console>", required = false, defaultValue = Some(false)),
// For document purpose only, OPTION_CONFIG option is not used here.
// OPTION_CONFIG is parsed by parent shell command "Gear" transparently.
@@ -58,6 +59,7 @@ object AppSubmitter extends AkkaApp with ArgumentsParser {
// Set jar path to be submitted to cluster
System.setProperty(Constants.GEARPUMP_APP_JAR, jar)
+ System.setProperty(Constants.APPLICATION_EXECUTOR_NUMBER, config.getInt("executors").toString)
val namePrefix = config.getString("namePrefix")
if (namePrefix.nonEmpty) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
----------------------------------------------------------------------
diff --git a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
index c3121d8..10e9dcb 100644
--- a/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
+++ b/daemon/src/main/scala/io/gearpump/cluster/scheduler/PriorityScheduler.scala
@@ -26,14 +26,13 @@ import io.gearpump.cluster.scheduler.Scheduler.PendingRequest
import scala.collection.mutable
-class PriorityScheduler extends Scheduler{
-
+class PriorityScheduler extends Scheduler {
private var resourceRequests = new mutable.PriorityQueue[PendingRequest]()(requestOrdering)
def requestOrdering = new Ordering[PendingRequest] {
override def compare(x: PendingRequest, y: PendingRequest) = {
var res = x.request.priority.id - y.request.priority.id
- if(res == 0)
+ if (res == 0)
res = y.timeStamp.compareTo(x.timeStamp)
res
}
@@ -45,23 +44,30 @@ class PriorityScheduler extends Scheduler{
var scheduleLater = Array.empty[PendingRequest]
val resourcesSnapShot = resources.clone()
var allocated = Resource.empty
- val totalResource = resourcesSnapShot.foldLeft(Resource.empty){ (totalResource, workerWithResource) =>
- val (_, (_, resource)) = workerWithResource
- totalResource + resource
- }
+ val totalResource = Resource(resourcesSnapShot.values.map(_._2.slots).sum)
- while(resourceRequests.nonEmpty && (allocated < totalResource)) {
+ while (resourceRequests.nonEmpty && (allocated < totalResource)) {
val PendingRequest(appId, appMaster, request, timeStamp) = resourceRequests.dequeue()
request.relaxation match {
case ANY =>
- val newAllocated = allocateFairly(resourcesSnapShot, PendingRequest(appId, appMaster, request, timeStamp))
+ val allocations = allocateFairly(resourcesSnapShot, request)
+ val newAllocated = Resource(allocations.map(_.resource.slots).sum)
+ if (allocations.nonEmpty) {
+ appMaster ! ResourceAllocated(allocations.toArray)
+ }
+ if (newAllocated < request.resource) {
+ val remainingRequest = request.resource - newAllocated
+ val remainingExecutors = request.executorNum - allocations.length
+ val newResourceRequest = request.copy(resource = remainingRequest, executorNum = remainingExecutors)
+ scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, newResourceRequest, timeStamp)
+ }
allocated = allocated + newAllocated
case ONEWORKER =>
- val availableResource = resourcesSnapShot.find{params =>
+ val availableResource = resourcesSnapShot.find { params =>
val (_, (_, resource)) = params
resource > request.resource
}
- if(availableResource.nonEmpty){
+ if (availableResource.nonEmpty) {
val (workerId, (worker, resource)) = availableResource.get
allocated = allocated + request.resource
appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, workerId)))
@@ -70,13 +76,12 @@ class PriorityScheduler extends Scheduler{
scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
}
case SPECIFICWORKER =>
- if (resourcesSnapShot.contains(request.workerId)) {
- val (worker, availableResource) = resourcesSnapShot.get(request.workerId).get
- if (availableResource > request.resource) {
- appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, request.workerId)))
- allocated = allocated + request.resource
- resourcesSnapShot.update(request.workerId, (worker, availableResource - request.resource))
- }
+ val workerAndResource = resourcesSnapShot.get(request.workerId)
+ if (workerAndResource.nonEmpty && workerAndResource.get._2 > request.resource) {
+ val (worker, availableResource) = workerAndResource.get
+ appMaster ! ResourceAllocated(Array(ResourceAllocation(request.resource, worker, request.workerId)))
+ allocated = allocated + request.resource
+ resourcesSnapShot.update(request.workerId, (worker, availableResource - request.resource))
} else {
scheduleLater = scheduleLater :+ PendingRequest(appId, appMaster, request, timeStamp)
}
@@ -88,38 +93,48 @@ class PriorityScheduler extends Scheduler{
def resourceRequestHandler: Receive = {
case RequestResource(appId, request) =>
- LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, relaxation: ${request.relaxation}")
+ LOG.info(s"Request resource: appId: $appId, slots: ${request.resource.slots}, relaxation: ${request.relaxation}," +
+ s" executor number: ${request.executorNum}")
val appMaster = sender()
resourceRequests.enqueue(new PendingRequest(appId, appMaster, request, System.currentTimeMillis()))
allocateResource()
}
- private def allocateFairly(resources : mutable.HashMap[Int, (ActorRef, Resource)], pendindRequest : PendingRequest): Resource ={
- val length = resources.size
- val flattenResource = resources.toArray.zipWithIndex.flatMap((workerWithIndex) => {
- val ((workerId, (worker, resource)), index) = workerWithIndex
- 0.until(resource.slots).map((seq) => ((workerId, worker), seq * length + index))
- }).sortBy(_._2).map(_._1)
- val PendingRequest(appId, appMaster, request, timeStamp) = pendindRequest
- val total = Resource(flattenResource.size)
-
- val newAllocated = Resource.min(total, request.resource)
- val singleAllocation = flattenResource.take(newAllocated.slots)
- .groupBy((actor) => actor).mapValues(_.length).toArray.map((params) => {
- val ((workerId, worker), slots) = params
- resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
- ResourceAllocation(Resource(slots), worker, workerId)
- })
- pendindRequest.appMaster ! ResourceAllocated(singleAllocation)
- if (pendindRequest.request.resource > newAllocated) {
- resourceRequests.enqueue(
- PendingRequest(appId, appMaster,
- ResourceRequest(request.resource - newAllocated, request.workerId, request.priority), timeStamp))
- }
- newAllocated
- }
-
override def doneApplication(appId: Int): Unit = {
resourceRequests = resourceRequests.filter(_.appId != appId)
}
+
+ private def allocateFairly(resources: mutable.HashMap[Int, (ActorRef, Resource)], request: ResourceRequest): List[ResourceAllocation] = {
+ val workerNum = resources.size
+ var allocations = List.empty[ResourceAllocation]
+ var totalAvailable = Resource(resources.values.map(_._2.slots).sum)
+ var remainingRequest = request.resource
+ var remainingExecutors = Math.min(request.executorNum, request.resource.slots)
+
+ while (remainingExecutors > 0 && !totalAvailable.isEmpty) {
+ val exeutorNum = Math.min(workerNum, remainingExecutors)
+ val toRequest = Resource(remainingRequest.slots * exeutorNum / remainingExecutors)
+
+ val flattenResource = resources.toArray.sortBy(_._2._2.slots)(Ordering[Int].reverse).take(exeutorNum).zipWithIndex.flatMap { workerWithIndex =>
+ val ((workerId, (worker, resource)), index) = workerWithIndex
+ 0.until(resource.slots).map(seq => ((workerId, worker), seq * workerNum + index))
+ }.sortBy(_._2).map(_._1)
+
+ if (flattenResource.length < toRequest.slots) {
+ //Can not safisfy the user's requirements
+ totalAvailable = Resource.empty
+ } else {
+ flattenResource.take(toRequest.slots).groupBy(actor => actor).mapValues(_.length).
+ toArray.foreach { params =>
+ val ((workerId, worker), slots) = params
+ resources.update(workerId, (worker, resources.get(workerId).get._2 - Resource(slots)))
+ allocations :+= ResourceAllocation(Resource(slots), worker, workerId)
+ }
+ totalAvailable -= toRequest
+ remainingRequest -= toRequest
+ remainingExecutors -= exeutorNum
+ }
+ }
+ allocations
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
index ccfd453..0a7e1f8 100644
--- a/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
+++ b/daemon/src/test/scala/io/gearpump/cluster/scheduler/PrioritySchedulerSpec.scala
@@ -19,7 +19,6 @@ package io.gearpump.cluster.scheduler
import akka.actor.{ActorSystem, Props}
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
-import io.gearpump.cluster.master.Master.MasterInfo
import io.gearpump.cluster.AppMasterToMaster.RequestResource
import io.gearpump.cluster.MasterToAppMaster.ResourceAllocated
import io.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, WorkerRegistered}
@@ -84,12 +83,11 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(30), mockWorker1.ref, workerId1))))
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(20), mockWorker1.ref, workerId1))))
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker1.ref, workerId1))))
- mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(10), mockWorker1.ref, workerId1))))
scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource.empty), mockWorker1.ref)
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
- mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(30), mockWorker2.ref, workerId2))))
+ mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))))
}
}
@@ -124,7 +122,7 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(40), mockWorker2.ref, workerId2))))
- val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY)
+ val request3 = ResourceRequest(Resource(30), 0, Priority.NORMAL, Relaxation.ANY, executorNum = 2)
scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(15), mockWorker1.ref, workerId1), ResourceAllocation(Resource(15), mockWorker2.ref, workerId2))))
@@ -136,4 +134,42 @@ class PrioritySchedulerSpec(_system: ActorSystem) extends TestKit(_system) with
mockAppMaster.expectMsg(5 seconds, ResourceAllocated(Array(ResourceAllocation(Resource(60), mockWorker1.ref, workerId1))))
}
}
+
+ "The PriorityScheduler" should {
+ "handle the resource request with different executor number" in {
+ val scheduler = system.actorOf(Props(classOf[PriorityScheduler]))
+ scheduler.tell(WorkerRegistered(workerId1, MasterInfo.empty), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(100)), mockWorker1.ref)
+ scheduler.tell(WorkerRegistered(workerId2, MasterInfo.empty), mockWorker2.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(100)), mockWorker2.ref)
+
+ //By default, the request requires only one executor
+ val request2 = ResourceRequest(Resource(20))
+ scheduler.tell(RequestResource(appId, request2), mockAppMaster.ref)
+ val allocations2 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations2.allocations.length == 1)
+ assert(allocations2.allocations.head.resource == Resource(20))
+
+ val request3 = ResourceRequest(Resource(24), executorNum = 3)
+ scheduler.tell(RequestResource(appId, request3), mockAppMaster.ref)
+ val allocations3 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations3.allocations.length == 3)
+ assert(allocations3.allocations.forall(_.resource == Resource(8)))
+
+ //The total available resource can not satisfy the requirements with executor number
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(30)), mockWorker1.ref)
+ scheduler.tell(ResourceUpdate(mockWorker2.ref, workerId2, Resource(30)), mockWorker2.ref)
+ val request4 = ResourceRequest(Resource(60), executorNum = 3)
+ scheduler.tell(RequestResource(appId, request4), mockAppMaster.ref)
+ val allocations4 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations4.allocations.length == 2)
+ assert(allocations4.allocations.forall(_.resource == Resource(20)))
+
+ //When new resources are available, the remaining request will be satisfied
+ scheduler.tell(ResourceUpdate(mockWorker1.ref, workerId1, Resource(40)), mockWorker1.ref)
+ val allocations5 = mockAppMaster.receiveN(1).head.asInstanceOf[ResourceAllocated]
+ assert(allocations5.allocations.length == 1)
+ assert(allocations4.allocations.forall(_.resource == Resource(20)))
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/docs/commandline.md
----------------------------------------------------------------------
diff --git a/docs/commandline.md b/docs/commandline.md
index 2c9334c..7e264f9 100644
--- a/docs/commandline.md
+++ b/docs/commandline.md
@@ -18,7 +18,7 @@ If you use Maven you can have a look [here](https://maven.apache.org/plugins/mav
You can use the command `gear` under the bin directory to submit, query and terminate an application:
```bash
-gear app [-namePrefix <application name prefix>] [-conf <custom gearpump config file>] -jar xx.jar MainClass <arg1> <arg2> ...
+gear app [-namePrefix <application name prefix>] [-executors <number of executors to launch>] [-conf <custom gearpump config file>] -jar xx.jar MainClass <arg1> <arg2> ...
```
### List all running applications
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
index e9973e5..c16ab41 100644
--- a/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ b/experiments/storm/src/main/scala/io/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -139,13 +139,14 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe
implicit val system = clientContext.system
val gearpumpStormTopology = GearpumpStormTopology(name, topology, jsonConf)
val stormConfig = gearpumpStormTopology.getStormConfig
+ val workerNum = StormUtil.getInt(stormConfig, Config.TOPOLOGY_WORKERS).getOrElse(1)
val processorGraph = GraphBuilder.build(gearpumpStormTopology)
val config = UserConfig.empty
.withValue[StormTopology](StormConstants.STORM_TOPOLOGY, topology)
.withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig)
val app = StreamApplication(name, processorGraph, config)
LOG.info(s"jar file uploaded to $uploadedJarLocation")
- val appId = clientContext.submit(app, uploadedJarLocation)
+ val appId = clientContext.submit(app, uploadedJarLocation, workerNum)
applications += name -> appId
topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation)
LOG.info(s"Storm Application $appId submitted")
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
index 8e294ea..59c956b 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ConnectorKafkaSpec.scala
@@ -62,7 +62,7 @@ class ConnectorKafkaSpec extends TestSpecBase {
"-sourceTopic", sourceTopic,
"-sinkTopic", sinkTopic).mkString(" ")
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(kafkaJar, args)
+ val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
success shouldBe true
// verify
@@ -90,7 +90,7 @@ class ConnectorKafkaSpec extends TestSpecBase {
"-sinkTopic", sinkTopic,
"-source", sourcePartitionNum).mkString(" ")
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(kafkaJar, args)
+ val success = restClient.submitApp(kafkaJar, cluster.getWorkerHosts.length, args)
success shouldBe true
// verify #1
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
index 5a60274..44f98aa 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -121,7 +121,7 @@ class DynamicDagSpec extends TestSpecBase {
private def expectSolJarSubmittedWithAppId(): Int = {
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(solJar)
+ val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, solName)
Util.retryUntil(restClient.queryStreamingAppDetail(appId).clock > 0)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
index ac18807..2772bc3 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/ExampleSpec.scala
@@ -32,7 +32,7 @@ class ExampleSpec extends TestSpecBase {
val mainClass = "io.gearpump.examples.distributedshell.DistributedShell"
val clientClass = "io.gearpump.examples.distributedshell.DistributedShellClient"
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(distShellJar, mainClass)
+ val success = restClient.submitApp(distShellJar, cluster.getWorkerHosts.length, mainClass)
success shouldBe true
expectAppIsRunning(appId, "DistributedShell")
val args = Array(
@@ -43,7 +43,8 @@ class ExampleSpec extends TestSpecBase {
val expectedHostNames = cluster.getWorkerHosts.map(Docker.execAndCaptureOutput(_, "hostname"))
def verify(): Boolean = {
- val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, args.mkString(" ")).split("\n").
+ val workerNum = cluster.getWorkerHosts.length
+ val result = commandLineClient.submitAppAndCaptureOutput(distShellJar, workerNum, args.mkString(" ")).split("\n").
filterNot(line => line.startsWith("[INFO]") || line.isEmpty)
expectedHostNames.forall(result.contains)
}
@@ -59,7 +60,7 @@ class ExampleSpec extends TestSpecBase {
"can submit immediately after killing a former one" in {
// setup
val formerAppId = restClient.getNextAvailableAppId()
- val formerSubmissionSuccess = restClient.submitApp(wordCountJar)
+ val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
formerSubmissionSuccess shouldBe true
expectAppIsRunning(formerAppId, wordCountName)
Util.retryUntil(restClient.queryStreamingAppDetail(formerAppId).clock > 0)
@@ -67,7 +68,7 @@ class ExampleSpec extends TestSpecBase {
// exercise
val appId = formerAppId + 1
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, wordCountName)
}
@@ -97,7 +98,7 @@ class ExampleSpec extends TestSpecBase {
"can obtain application clock and the clock will keep changing" in {
// setup
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(jar)
+ val success = restClient.submitApp(jar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, appName)
@@ -110,7 +111,7 @@ class ExampleSpec extends TestSpecBase {
"can change the parallelism and description of a processor" in {
// setup
val appId = restClient.getNextAvailableAppId()
- val formerSubmissionSuccess = restClient.submitApp(jar)
+ val formerSubmissionSuccess = restClient.submitApp(jar, cluster.getWorkerHosts.length)
formerSubmissionSuccess shouldBe true
expectAppIsRunning(appId, appName)
val formerProcessors = restClient.queryStreamingAppDetail(appId).processors
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
index cf2ca03..2830390 100644
--- a/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ b/integrationtest/core/src/it/scala/io/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -43,7 +43,7 @@ class RestServiceSpec extends TestSpecBase {
"retrieve 1 application after the first application submission" in {
// exercise
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, wordCountName)
restClient.listRunningApps().length shouldEqual 1
@@ -54,7 +54,7 @@ class RestServiceSpec extends TestSpecBase {
"find a running application after submission" in {
// exercise
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, wordCountName)
}
@@ -62,18 +62,18 @@ class RestServiceSpec extends TestSpecBase {
"reject a repeated submission request while the application is running" in {
// setup
val appId = restClient.getNextAvailableAppId()
- val formerSubmissionSuccess = restClient.submitApp(wordCountJar)
+ val formerSubmissionSuccess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
formerSubmissionSuccess shouldBe true
expectAppIsRunning(appId, wordCountName)
// exercise
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe false
}
"reject an invalid submission (the jar file path is incorrect)" in {
// exercise
- val success = restClient.submitApp(wordCountJar + ".missing")
+ val success = restClient.submitApp(wordCountJar + ".missing", cluster.getWorkerHosts.length)
success shouldBe false
}
@@ -84,7 +84,7 @@ class RestServiceSpec extends TestSpecBase {
val appId = restClient.getNextAvailableAppId()
// exercise
- val success = restClient.submitApp(wordCountJar, s"-split $splitNum -sum $sumNum")
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, s"-split $splitNum -sum $sumNum")
success shouldBe true
expectAppIsRunning(appId, wordCountName)
val processors = restClient.queryStreamingAppDetail(appId).processors
@@ -98,7 +98,7 @@ class RestServiceSpec extends TestSpecBase {
"can obtain application metrics and the metrics will keep changing" in {
// setup
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, wordCountName)
@@ -122,7 +122,7 @@ class RestServiceSpec extends TestSpecBase {
"can obtain application corresponding executors' metrics and the metrics will keep changing" in {
// setup
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, wordCountName)
@@ -148,7 +148,7 @@ class RestServiceSpec extends TestSpecBase {
"a running application should be killed" in {
// setup
val appId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
expectAppIsRunning(appId, wordCountName)
@@ -159,7 +159,7 @@ class RestServiceSpec extends TestSpecBase {
"should fail when attempting to kill a stopped application" in {
// setup
val appId = restClient.getNextAvailableAppId()
- val submissionSucess = restClient.submitApp(wordCountJar)
+ val submissionSucess = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
submissionSucess shouldBe true
expectAppIsRunning(appId, wordCountName)
killAppAndVerify(appId)
@@ -291,7 +291,7 @@ class RestServiceSpec extends TestSpecBase {
val appId = restClient.getNextAvailableAppId()
// exercise
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
restClient.queryExecutorBrief(appId).foreach { executor =>
val executorId = executor.executorId
@@ -308,7 +308,7 @@ class RestServiceSpec extends TestSpecBase {
val appId = restClient.getNextAvailableAppId()
// exercise
- val success = restClient.submitApp(wordCountJar)
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length)
success shouldBe true
val actual = restClient.queryAppMasterConfig(appId)
actual.hasPath("gearpump") shouldBe true
@@ -322,7 +322,7 @@ class RestServiceSpec extends TestSpecBase {
val originSplitNum = 4
val originSumNum = 3
val originAppId = restClient.getNextAvailableAppId()
- val success = restClient.submitApp(wordCountJar, s"-split $originSplitNum -sum $originSumNum")
+ val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length, s"-split $originSplitNum -sum $originSumNum")
success shouldBe true
expectAppIsRunning(originAppId, wordCountName)
val originAppDetail = restClient.queryStreamingAppDetail(originAppId)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
index d923a6d..506fc67 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/CommandLineClient.scala
@@ -49,8 +49,8 @@ class CommandLineClient(host: String) {
""
}
- def submitAppAndCaptureOutput(jar: String, args: String = ""): String = {
- execAndCaptureOutput(s"gear app -verbose true -jar $jar $args")
+ def submitAppAndCaptureOutput(jar: String, executorNum: Int, args: String = ""): String = {
+ execAndCaptureOutput(s"gear app -verbose true -jar $jar -executors $executorNum $args")
}
def submitApp(jar: String, args: String = ""): Int = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
index 2b3003c..2657bb3 100644
--- a/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
+++ b/integrationtest/core/src/main/scala/io/gearpump/integrationtest/minicluster/RestClient.scala
@@ -87,10 +87,10 @@ class RestClient(host: String, port: Int) {
listApps().length + 1
}
- def submitApp(jar: String, args: String = "", config: String = ""): Boolean = try {
+ def submitApp(jar: String, executorNum: Int, args: String = "", config: String = ""): Boolean = try {
var endpoint = "master/submitapp"
if (args.length > 0) {
- endpoint += "?args=" + Util.encodeUriComponent(args)
+ endpoint += s"?executorNum=${executorNum}&args=" + Util.encodeUriComponent(args)
}
var options = Seq(s"jar=@$jar")
if (config.length > 0) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/services/restapi.js
----------------------------------------------------------------------
diff --git a/services/dashboard/services/restapi.js b/services/dashboard/services/restapi.js
index 386d319..d52d89d 100644
--- a/services/dashboard/services/restapi.js
+++ b/services/dashboard/services/restapi.js
@@ -118,19 +118,19 @@ angular.module('dashboard')
},
/** Submit an user defined application with user configuration */
- submitUserApp: function(files, formFormNames, args, onComplete) {
+ submitUserApp: function(files, formFormNames, executorNum, args, onComplete) {
return self._submitApp(restapiV1Root + 'master/submitapp',
- files, formFormNames, args, onComplete);
+ files, formFormNames, executorNum, args, onComplete);
},
/** Submit a Storm application */
- submitStormApp: function(files, formFormNames, args, onComplete) {
+ submitStormApp: function(files, formFormNames, executorNum, args, onComplete) {
return self._submitApp(restapiV1Root + 'master/submitstormapp',
- files, formFormNames, args, onComplete);
+ files, formFormNames, executorNum, args, onComplete);
},
- _submitApp: function(url, files, formFormNames, args, onComplete) {
- var params = args ? '?args=' + encodeURIComponent(args) : '';
+ _submitApp: function(url, files, formFormNames, executorNum, args, onComplete) {
+ var params = '?executorNum=' + executorNum + '&args=' + encodeURIComponent(args);
var upload = Upload.upload({
url: url + params,
method: 'POST',
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/views/apps/submit/submit.html
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/submit/submit.html b/services/dashboard/views/apps/submit/submit.html
index 140fe17..bc4af98 100644
--- a/services/dashboard/views/apps/submit/submit.html
+++ b/services/dashboard/views/apps/submit/submit.html
@@ -41,6 +41,10 @@
ng-model="conf" accept-pattern="{{confFileSuffix}}"></form-control>
<!-- input 3 -->
<form-control
+ type="integer" min="1" label="Executor Number" ng-hide="isStormApp"
+ ng-model="executorNum"></form-control>
+ <!-- input 4 -->
+ <form-control
type="text" label="Arguments"
help="Application specific launch arguments (optional). E.g. WordCount can use "-split 2 -sum 1""
ng-model="launchArgs"></form-control>
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/dashboard/views/apps/submit/submit.js
----------------------------------------------------------------------
diff --git a/services/dashboard/views/apps/submit/submit.js b/services/dashboard/views/apps/submit/submit.js
index 4ca1895..5e967f5 100644
--- a/services/dashboard/views/apps/submit/submit.js
+++ b/services/dashboard/views/apps/submit/submit.js
@@ -10,6 +10,7 @@ angular.module('dashboard')
$scope.dialogTitle = 'Submit Gearpump Application';
$scope.confFileSuffix = '.conf';
+ $scope.executorNum = 1;
var submitFn = restapi.submitUserApp;
if ($scope.isStormApp) {
$scope.dialogTitle = 'Submit Storm Application';
@@ -29,7 +30,7 @@ angular.module('dashboard')
fileFormNames.push('conf');
}
$scope.uploading = true;
- submitFn(files, fileFormNames, $scope.launchArgs, function(response) {
+ submitFn(files, fileFormNames, $scope.executorNum, $scope.launchArgs, function(response) {
$scope.shouldNoticeSubmitFailed = !response.success;
$scope.uploading = false;
if (response.success) {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
index 839c0ae..dd0e719 100644
--- a/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/io/gearpump/services/MasterService.scala
@@ -109,12 +109,12 @@ class MasterService(val master: ActorRef,
} ~
path("submitapp") {
post {
- parameters("args" ? "") { args: String =>
+ parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) =>
uploadFile { fileMap =>
val jar = fileMap.get("jar").map(_.file)
val userConf = fileMap.get("conf").map(_.file)
onComplete(Future(
- MasterService.submitGearApp(jar, args, systemConfig, userConf)
+ MasterService.submitGearApp(jar, executorNum, args, systemConfig, userConf)
)) {
case Success(success) =>
val response = MasterService.AppSubmissionResult(success)
@@ -128,7 +128,7 @@ class MasterService(val master: ActorRef,
} ~
path("submitstormapp") {
post {
- parameters("args" ? "") { args: String =>
+ parameters('executorNum.as[Int] ? 1, 'args ? "") { (executorNum, args) =>
uploadFile { fileMap =>
val jar = fileMap.get("jar").map(_.file)
val stormConf = fileMap.get("conf").map(_.file)
@@ -201,10 +201,10 @@ object MasterService {
/**
* Submit Native Application.
*/
- def submitGearApp(jar: Option[File], args: String, systemConfig: Config, userConfigFile: Option[File]): Boolean = {
+ def submitGearApp(jar: Option[File], executorNum: Int, args: String, systemConfig: Config, userConfigFile: Option[File]): Boolean = {
submitAndDeleteTempFiles(
"io.gearpump.cluster.main.AppSubmitter",
- argsArray = spaceSeparatedArgumentsToArray(args),
+ argsArray = Array("-executors", executorNum.toString) ++ spaceSeparatedArgumentsToArray(args),
fileMap = Map("jar" -> jar).filter(_._2.isDefined).mapValues(_.get),
classPath = getUserApplicationClassPath,
systemConfig,
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
index 6d589f5..9510531 100644
--- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
+++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/TaskSchedulerImpl.scala
@@ -24,7 +24,7 @@ import io.gearpump.streaming.DAG
import io.gearpump.streaming.appmaster.TaskLocator.{Locality, WorkerLocality}
import io.gearpump.streaming.appmaster.TaskScheduler.{Location, TaskStatus}
import io.gearpump.streaming.task.TaskId
-import io.gearpump.util.LogUtil
+import io.gearpump.util.{Constants, LogUtil}
import org.slf4j.Logger
/**
@@ -80,7 +80,7 @@ object TaskScheduler {
}
class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends TaskScheduler {
- private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
+ private val executorNum = config.getInt(Constants.APPLICATION_EXECUTOR_NUMBER)
private var tasks = List.empty[TaskStatus]
@@ -121,7 +121,7 @@ class TaskSchedulerImpl(appId : Int, appName: String, config: Config) extends T
workersResourceRequest.map {workerIdAndResource =>
val (workerId, resource) = workerIdAndResource
if (workerId == WORKER_NO_PREFERENCE) {
- ResourceRequest(resource)
+ ResourceRequest(resource, executorNum = executorNum)
} else {
ResourceRequest(resource, workerId, relaxation = SPECIFICWORKER)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
index bc3f265..d9ebb37 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -20,7 +20,7 @@ package io.gearpump.streaming.appmaster
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import io.gearpump.streaming.{ProcessorDescription, DAG}
-import io.gearpump.cluster.AppJar
+import io.gearpump.cluster.{TestUtil, AppJar}
import io.gearpump.cluster.scheduler.{Resource, ResourceRequest}
import io.gearpump.jarstore.FilePath
import io.gearpump.partitioner.{HashPartitioner, Partitioner}
@@ -47,7 +47,7 @@ class JarSchedulerSpec extends WordSpec with Matchers {
"schedule tasks depends on app jar" in {
val system = ActorSystem("JarSchedulerSpec")
implicit val dispatcher = system.dispatcher
- val manager = new JarScheduler(0, "APP", ConfigFactory.empty(), system)
+ val manager = new JarScheduler(0, "APP", TestUtil.DEFAULT_CONFIG, system)
manager.setDag(dag, Future{0L})
val requests = Array(ResourceRequest(Resource(2)))
val result = Await.result(manager.getRequestDetails(), 15 seconds)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/47d867d2/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index 78caf62..aeef61d 100644
--- a/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ b/streaming/src/test/scala/io/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -23,7 +23,7 @@ import io.gearpump.streaming.appmaster.TaskLocator.Localities
import io.gearpump.streaming.task.{StartTime, TaskContext, TaskId}
import io.gearpump.Message
import io.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
-import io.gearpump.cluster.{ClusterConfig, UserConfig}
+import io.gearpump.cluster.{TestUtil, ClusterConfig, UserConfig}
import io.gearpump.partitioner.{HashPartitioner, Partitioner}
import io.gearpump.streaming.appmaster.TaskLocator.Localities
import io.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
@@ -41,7 +41,7 @@ class TaskSchedulerSpec extends WordSpec with Matchers {
val dag = DAG(Graph(task1 ~ Partitioner[HashPartitioner] ~> task2))
- val config = ClusterConfig.default()
+ val config = TestUtil.DEFAULT_CONFIG
"TaskScheduler" should {
"schedule tasks on different workers properly according user's configuration" in {