You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/27 19:00:38 UTC
[incubator-openwhisk] branch master updated: Factor
PoolingRestClient out of CouchDbRestClient. (#3347)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 369517b Factor PoolingRestClient out of CouchDbRestClient. (#3347)
369517b is described below
commit 369517b369453366c3aeb095fba79d1c4411f8ea
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Tue Feb 27 14:00:35 2018 -0500
Factor PoolingRestClient out of CouchDbRestClient. (#3347)
---
.../whisk/core/database/CloudantRestClient.scala | 2 +-
.../whisk/core/database/CouchDbRestClient.scala | 160 +++------------------
.../main/scala/whisk/http/PoolingRestClient.scala | 154 ++++++++++++++++++++
.../database/test/ExtendedCouchDbRestClient.scala | 11 +-
4 files changed, 183 insertions(+), 144 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
index 29ec17e..682df26 100644
--- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
@@ -37,6 +37,6 @@ class CloudantRestClient(host: String, port: Int, username: String, password: St
// https://cloudant.com/blog/cloudant-query-grows-up-to-handle-ad-hoc-queries/#.VvllCD-0z2C
def simpleQuery(doc: JsObject): Future[Either[StatusCode, JsObject]] = {
- requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc))
+ requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc, baseHeaders))
}
}
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index 0f791b2..e38ce6f 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -17,28 +17,22 @@
package whisk.core.database
+import scala.concurrent.Future
+
import java.net.URLEncoder
import java.nio.charset.StandardCharsets
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.util.{Failure, Success}
-
import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.HostConnectionPool
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.marshalling._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers._
-import akka.http.scaladsl.unmarshalling._
-import akka.stream.ActorMaterializer
-import akka.stream.OverflowStrategy
-import akka.stream.QueueOfferResult
import akka.stream.scaladsl._
import akka.util.ByteString
+
import spray.json._
+import spray.json.DefaultJsonProtocol._
+
import whisk.common.Logging
+import whisk.http.PoolingRestClient
/**
* This class only handles the basic communication to the proper endpoints
@@ -50,35 +44,14 @@ import whisk.common.Logging
*/
class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
implicit system: ActorSystem,
- logging: Logging) {
- require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
-
- private implicit val context = system.dispatcher
- private implicit val materializer = ActorMaterializer()
+ logging: Logging)
+ extends PoolingRestClient(protocol, host, port, 16 * 1024) {
- // Creates or retrieves a connection pool for the host.
- private val pool = if (protocol == "http") {
- Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port)
- } else {
- Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
- }
-
- private val poolPromise = Promise[HostConnectionPool]
+ // Headers common to all requests.
+ val baseHeaders: List[HttpHeader] =
+ List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))
- // Additional queue in case all connections are busy. Should hardly ever be
- // filled in practice but can be useful, e.g., in tests starting many
- // asynchronous requests in a very short period of time.
- private val QUEUE_SIZE = 16 * 1024;
- private val requestQueue = Source
- .queue(QUEUE_SIZE, OverflowStrategy.dropNew)
- .via(pool.mapMaterializedValue { x =>
- poolPromise.success(x); x
- })
- .toMat(Sink.foreach({
- case ((Success(response), p)) => p.success(response)
- case ((Failure(error), p)) => p.failure(error)
- }))(Keep.left)
- .run
+ def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev))))
// Properly encodes the potential slashes in each segment.
protected def uri(segments: Any*): Uri = {
@@ -86,103 +59,30 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
Uri(s"/${encodedSegments.mkString("/")}")
}
- // Headers common to all requests.
- private val baseHeaders =
- List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))
-
- // Prepares a request with the proper headers.
- private def mkRequest0(method: HttpMethod,
- uri: Uri,
- body: Future[MessageEntity],
- forRev: Option[String] = None): Future[HttpRequest] = {
- val revHeader = forRev.map(r => `If-Match`(EntityTagRange(EntityTag(r)))).toList
- val headers = revHeader ::: baseHeaders
- body.map { b =>
- HttpRequest(method = method, uri = uri, headers = headers, entity = b)
- }
- }
-
- protected def mkRequest(method: HttpMethod, uri: Uri, forRev: Option[String] = None): Future[HttpRequest] = {
- mkRequest0(method, uri, Future.successful(HttpEntity.Empty), forRev = forRev)
- }
-
- protected def mkJsonRequest(method: HttpMethod,
- uri: Uri,
- body: JsValue,
- forRev: Option[String] = None): Future[HttpRequest] = {
- val b = Marshal(body).to[MessageEntity]
- mkRequest0(method, uri, b, forRev = forRev)
- }
-
- // Enqueue a request, and return a future capturing the corresponding response.
- // WARNING: make sure that if the future response is not failed, its entity
- // be drained entirely or the connection will be kept open until timeouts kick in.
- private def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
- futureRequest flatMap { request =>
- val promise = Promise[HttpResponse]
-
- // When the future completes, we know whether the request made it
- // through the queue.
- requestQueue.offer(request -> promise).flatMap { buffered =>
- buffered match {
- case QueueOfferResult.Enqueued =>
- promise.future
-
- case QueueOfferResult.Dropped =>
- Future.failed(new Exception("DB request queue is full."))
-
- case QueueOfferResult.QueueClosed =>
- Future.failed(new Exception("DB request queue was closed."))
-
- case QueueOfferResult.Failure(f) =>
- Future.failed(f)
- }
- }
- }
- }
-
- // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
- protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
- request0(futureRequest) flatMap { response =>
- if (response.status.isSuccess()) {
- Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
- Right(o)
- }
- } else {
- // This is important, as it drains the entity stream.
- // Otherwise the connection stays open and the pool dries up.
- response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ =>
- Left(response.status)
- }
- }
- }
- }
-
- import spray.json.DefaultJsonProtocol._
-
// http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid
def putDoc(id: String, doc: JsObject): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc))
+ requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid
def putDoc(id: String, rev: String, doc: JsObject): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, forRev = Some(rev)))
+ requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders ++ revHeader(rev)))
// http://docs.couchdb.org/en/2.1.0/api/database/bulk-api.html#inserting-documents-in-bulk
def putDocs(docs: Seq[JsObject]): Future[Either[StatusCode, JsArray]] =
- requestJson[JsArray](mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson)))
+ requestJson[JsArray](
+ mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson), baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
def getDoc(id: String): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id)))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), forRev = Some(rev)))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev)))
// http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid
def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), forRev = Some(rev)))
+ requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev)))
// http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html
def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil,
@@ -238,7 +138,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap))
- requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders))
}
// Streams an attachment to the database
@@ -249,7 +149,8 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
contentType: ContentType,
source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = {
val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs)))
- val request = mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), forRev = Some(rev))
+ val request =
+ mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
requestJson[JsObject](request)
}
@@ -259,7 +160,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
rev: String,
attName: String,
sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = {
- val request = mkRequest(HttpMethods.GET, uri(db, id, attName), forRev = Some(rev))
+ val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev))
request0(request) flatMap { response =>
if (response.status.isSuccess()) {
@@ -269,19 +170,4 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
}
}
}
-
- def shutdown(): Future[Unit] = {
- materializer.shutdown()
- // The code below shuts down the pool, but is apparently not tolerant
- // to multiple clients shutting down the same pool (the second one just
- // hangs). Given that shutdown is only relevant for tests (unused pools
- // close themselves anyway after some time) and that they can call
- // Http().shutdownAllConnectionPools(), this is not a major issue.
- /* Reintroduce below if they ever make HostConnectionPool.shutdown()
- * safe to call >1x.
- * val poolOpt = poolPromise.future.value.map(_.toOption).flatten
- * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
- */
- Future.successful(())
- }
}
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
new file mode 100644
index 0000000..ef09770
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.http
+
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.{Failure, Success}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.Http.HostConnectionPool
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.unmarshalling._
+import akka.stream.ActorMaterializer
+import akka.stream.OverflowStrategy
+import akka.stream.QueueOfferResult
+import akka.stream.scaladsl._
+
+import spray.json._
+
+/**
+ * This class only handles the basic communication to the proper endpoints.
+ * It is up to its clients to interpret the results. It is built on akka-http
+ * host-level connection pools; compared to single requests, it saves some time
+ * on each request because it doesn't need to look up the pool corresponding
+ * to the host. It is also easier to add an extra queueing mechanism.
+ */
+class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: Int)(implicit system: ActorSystem) {
+ require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
+
+ implicit val context = system.dispatcher
+ implicit val materializer = ActorMaterializer()
+
+ // Creates or retrieves a connection pool for the host.
+ private val pool = if (protocol == "http") {
+ Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port)
+ } else {
+ Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
+ }
+
+ private val poolPromise = Promise[HostConnectionPool]
+
+ // Additional queue in case all connections are busy. Should hardly ever be
+ // filled in practice but can be useful, e.g., in tests starting many
+ // asynchronous requests in a very short period of time.
+ private val requestQueue = Source
+ .queue(queueSize, OverflowStrategy.dropNew)
+ .via(pool.mapMaterializedValue { x =>
+ poolPromise.success(x); x
+ })
+ .toMat(Sink.foreach({
+ case ((Success(response), p)) => p.success(response)
+ case ((Failure(error), p)) => p.failure(error)
+ }))(Keep.left)
+ .run
+
+ // Prepares a request with the proper headers.
+ def mkRequest0(method: HttpMethod,
+ uri: Uri,
+ body: Future[MessageEntity],
+ headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+ body.map { b =>
+ HttpRequest(method, uri, headers, b)
+ }
+ }
+
+ protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+ mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers)
+ }
+
+ protected def mkJsonRequest(method: HttpMethod,
+ uri: Uri,
+ body: JsValue,
+ headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+ val b = Marshal(body).to[MessageEntity]
+ mkRequest0(method, uri, b, headers)
+ }
+
+ // Enqueue a request, and return a future capturing the corresponding response.
+ // WARNING: make sure that if the future response is not failed, its entity
+ // be drained entirely or the connection will be kept open until timeouts kick in.
+ def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
+ futureRequest flatMap { request =>
+ val promise = Promise[HttpResponse]
+
+ // When the future completes, we know whether the request made it
+ // through the queue.
+ requestQueue.offer(request -> promise).flatMap { buffered =>
+ buffered match {
+ case QueueOfferResult.Enqueued =>
+ promise.future
+
+ case QueueOfferResult.Dropped =>
+ Future.failed(new Exception("DB request queue is full."))
+
+ case QueueOfferResult.QueueClosed =>
+ Future.failed(new Exception("DB request queue was closed."))
+
+ case QueueOfferResult.Failure(f) =>
+ Future.failed(f)
+ }
+ }
+ }
+ }
+
+ // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
+ protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
+ request0(futureRequest) flatMap { response =>
+ if (response.status.isSuccess()) {
+ Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
+ Right(o)
+ }
+ } else {
+ // This is important, as it drains the entity stream.
+ // Otherwise the connection stays open and the pool dries up.
+ response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ =>
+ Left(response.status)
+ }
+ }
+ }
+ }
+
+ def shutdown(): Future[Unit] = {
+ materializer.shutdown()
+ // The code below shuts down the pool, but is apparently not tolerant
+ // to multiple clients shutting down the same pool (the second one just
+ // hangs). Given that shutdown is only relevant for tests (unused pools
+ // close themselves anyway after some time) and that they can call
+ // Http().shutdownAllConnectionPools(), this is not a major issue.
+ /* Reintroduce below if they ever make HostConnectionPool.shutdown()
+ * safe to call >1x.
+ * val poolOpt = poolPromise.future.value.map(_.toOption).flatten
+ * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
+ */
+ Future.successful(())
+ }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
index 3b6219b..d554e31 100644
--- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
+++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
@@ -39,23 +39,22 @@ class ExtendedCouchDbRestClient(protocol: String,
// http://docs.couchdb.org/en/1.6.1/api/server/common.html#get--
def instanceInfo(): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs
def dbs(): Future[Either[StatusCode, List[String]]] = {
- implicit val ec = system.dispatcher
- requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"))).map { either =>
+ requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either =>
either.right.map(_.convertTo[List[String]])
}
}
// http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db
def createDb(): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db)))
+ requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db
def deleteDb(): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db)))
+ requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs
def getAllDocs(skip: Option[Int] = None,
@@ -76,6 +75,6 @@ class ExtendedCouchDbRestClient(protocol: String,
.toMap
val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap))
- requestJson[JsObject](mkRequest(HttpMethods.GET, url))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders))
}
}
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.