You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/13 17:13:58 UTC

[GitHub] rabbah closed pull request #3515: Introduce separate Akka dispatchers for CouchDB and Kafka Clients (#2954)

rabbah closed pull request #3515: Introduce separate Akka dispatchers for CouchDB and Kafka Clients (#2954)
URL: https://github.com/apache/incubator-openwhisk/pull/3515
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf
index 35cf4f09a5..df50702885 100644
--- a/common/scala/src/main/resources/reference.conf
+++ b/common/scala/src/main/resources/reference.conf
@@ -8,3 +8,56 @@ whisk.spi {
   LogStoreProvider = whisk.core.containerpool.logging.DockerToActivationLogStoreProvider
   LoadBalancerProvider = whisk.core.loadBalancer.ShardingContainerPoolBalancer
 }
+
+dispatchers {
+  # Custom dispatcher for CouchDB Client. Tune as needed.
+  couch-dispatcher {
+    type = Dispatcher
+    executor = "thread-pool-executor"
+
+    # Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor
+    thread-pool-executor {
+      # Min number of threads to cap factor-based corePoolSize number to
+      core-pool-size-min = 2
+
+      # The core-pool-size-factor is used to determine corePoolSize of the
+      # ThreadPoolExecutor using the following formula:
+      # ceil(available processors * factor).
+      # Resulting size is then bounded by the core-pool-size-min and
+      # core-pool-size-max values.
+      core-pool-size-factor = 2.0
+
+      # Max number of threads to cap factor-based corePoolSize number to
+      core-pool-size-max = 32
+    }
+    # Throughput defines the number of messages that are processed in a batch
+    # before the thread is returned to the pool. Set to 1 for as fair as possible.
+    throughput = 5
+  }
+
+  # Custom dispatcher for Kafka client. Tune as needed.
+  kafka-dispatcher {
+    type = Dispatcher
+    executor = "thread-pool-executor"
+
+    # Underlying thread pool implementation is java.util.concurrent.ThreadPoolExecutor
+    thread-pool-executor {
+      # Min number of threads to cap factor-based corePoolSize number to
+      core-pool-size-min = 2
+
+      # The core-pool-size-factor is used to determine corePoolSize of the
+      # ThreadPoolExecutor using the following formula:
+      # ceil(available processors * factor).
+      # Resulting size is then bounded by the core-pool-size-min and
+      # core-pool-size-max values.
+      core-pool-size-factor = 2.0
+
+      # Max number of threads to cap factor-based corePoolSize number to
+      core-pool-size-max = 32
+    }
+
+    # Throughput defines the number of messages that are processed in a batch
+    # before the thread is returned to the pool. Set to 1 for as fair as possible.
+    throughput = 5
+  }
+}
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 14af69e4a9..4c21545f41 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -170,7 +170,7 @@ class MessageFeed(description: String,
   startWith(Idle, MessageFeed.NoData)
   initialize()
 
-  private implicit val ec = context.system.dispatcher
+  private implicit val ec = context.system.dispatchers.lookup("dispatchers.kafka-dispatcher")
 
   private def fillPipeline(): Unit = {
     if (outstandingMessages.size <= pipelineFillThreshold) {
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index e38ce6f69f..3e8143a7fa 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -18,7 +18,6 @@
 package whisk.core.database
 
 import scala.concurrent.Future
-
 import java.net.URLEncoder
 import java.nio.charset.StandardCharsets
 
@@ -27,10 +26,8 @@ import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.headers._
 import akka.stream.scaladsl._
 import akka.util.ByteString
-
 import spray.json._
 import spray.json.DefaultJsonProtocol._
-
 import whisk.common.Logging
 import whisk.http.PoolingRestClient
 
@@ -47,6 +44,8 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
   logging: Logging)
     extends PoolingRestClient(protocol, host, port, 16 * 1024) {
 
+  implicit override val context = system.dispatchers.lookup("dispatchers.couch-dispatcher")
+
   // Headers common to all requests.
   val baseHeaders: List[HttpHeader] =
     List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
index d0f13c9958..aff7e85e75 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala
@@ -62,7 +62,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St
     with DefaultJsonProtocol
     with AttachmentInliner {
 
-  protected[core] implicit val executionContext = system.dispatcher
+  protected[core] implicit val executionContext = system.dispatchers.lookup("dispatchers.couch-dispatcher")
 
   val attachmentScheme: String = "couch"
 
diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
index c632c8303c..417a8b9213 100644
--- a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
+++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -54,7 +54,7 @@ object CacheInvalidationMessage extends DefaultJsonProtocol {
 class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance: InstanceId)(implicit logging: Logging,
                                                                                             as: ActorSystem) {
 
-  implicit private val ec = as.dispatcher
+  implicit private val ec = as.dispatchers.lookup("dispatchers.kafka-dispatcher")
 
   private val topic = "cacheInvalidation"
   private val instanceId = s"$component${instance.toInt}"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services