You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/27 19:00:38 UTC

[incubator-openwhisk] branch master updated: Factor PoolingRestClient out of CouchDbRestClient. (#3347)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 369517b  Factor PoolingRestClient out of CouchDbRestClient. (#3347)
369517b is described below

commit 369517b369453366c3aeb095fba79d1c4411f8ea
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Tue Feb 27 14:00:35 2018 -0500

    Factor PoolingRestClient out of CouchDbRestClient. (#3347)
---
 .../whisk/core/database/CloudantRestClient.scala   |   2 +-
 .../whisk/core/database/CouchDbRestClient.scala    | 160 +++------------------
 .../main/scala/whisk/http/PoolingRestClient.scala  | 154 ++++++++++++++++++++
 .../database/test/ExtendedCouchDbRestClient.scala  |  11 +-
 4 files changed, 183 insertions(+), 144 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
index 29ec17e..682df26 100644
--- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
@@ -37,6 +37,6 @@ class CloudantRestClient(host: String, port: Int, username: String, password: St
 
   // https://cloudant.com/blog/cloudant-query-grows-up-to-handle-ad-hoc-queries/#.VvllCD-0z2C
   def simpleQuery(doc: JsObject): Future[Either[StatusCode, JsObject]] = {
-    requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc))
+    requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc, baseHeaders))
   }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index 0f791b2..e38ce6f 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -17,28 +17,22 @@
 
 package whisk.core.database
 
+import scala.concurrent.Future
+
 import java.net.URLEncoder
 import java.nio.charset.StandardCharsets
 
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.util.{Failure, Success}
-
 import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.HostConnectionPool
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.marshalling._
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.headers._
-import akka.http.scaladsl.unmarshalling._
-import akka.stream.ActorMaterializer
-import akka.stream.OverflowStrategy
-import akka.stream.QueueOfferResult
 import akka.stream.scaladsl._
 import akka.util.ByteString
+
 import spray.json._
+import spray.json.DefaultJsonProtocol._
+
 import whisk.common.Logging
+import whisk.http.PoolingRestClient
 
 /**
  * This class only handles the basic communication to the proper endpoints
@@ -50,35 +44,14 @@ import whisk.common.Logging
  */
 class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
   implicit system: ActorSystem,
-  logging: Logging) {
-  require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
-
-  private implicit val context = system.dispatcher
-  private implicit val materializer = ActorMaterializer()
+  logging: Logging)
+    extends PoolingRestClient(protocol, host, port, 16 * 1024) {
 
-  // Creates or retrieves a connection pool for the host.
-  private val pool = if (protocol == "http") {
-    Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port)
-  } else {
-    Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
-  }
-
-  private val poolPromise = Promise[HostConnectionPool]
+  // Headers common to all requests.
+  val baseHeaders: List[HttpHeader] =
+    List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))
 
-  // Additional queue in case all connections are busy. Should hardly ever be
-  // filled in practice but can be useful, e.g., in tests starting many
-  // asynchronous requests in a very short period of time.
-  private val QUEUE_SIZE = 16 * 1024;
-  private val requestQueue = Source
-    .queue(QUEUE_SIZE, OverflowStrategy.dropNew)
-    .via(pool.mapMaterializedValue { x =>
-      poolPromise.success(x); x
-    })
-    .toMat(Sink.foreach({
-      case ((Success(response), p)) => p.success(response)
-      case ((Failure(error), p))    => p.failure(error)
-    }))(Keep.left)
-    .run
+  def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev))))
 
   // Properly encodes the potential slashes in each segment.
   protected def uri(segments: Any*): Uri = {
@@ -86,103 +59,30 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
     Uri(s"/${encodedSegments.mkString("/")}")
   }
 
-  // Headers common to all requests.
-  private val baseHeaders =
-    List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))
-
-  // Prepares a request with the proper headers.
-  private def mkRequest0(method: HttpMethod,
-                         uri: Uri,
-                         body: Future[MessageEntity],
-                         forRev: Option[String] = None): Future[HttpRequest] = {
-    val revHeader = forRev.map(r => `If-Match`(EntityTagRange(EntityTag(r)))).toList
-    val headers = revHeader ::: baseHeaders
-    body.map { b =>
-      HttpRequest(method = method, uri = uri, headers = headers, entity = b)
-    }
-  }
-
-  protected def mkRequest(method: HttpMethod, uri: Uri, forRev: Option[String] = None): Future[HttpRequest] = {
-    mkRequest0(method, uri, Future.successful(HttpEntity.Empty), forRev = forRev)
-  }
-
-  protected def mkJsonRequest(method: HttpMethod,
-                              uri: Uri,
-                              body: JsValue,
-                              forRev: Option[String] = None): Future[HttpRequest] = {
-    val b = Marshal(body).to[MessageEntity]
-    mkRequest0(method, uri, b, forRev = forRev)
-  }
-
-  // Enqueue a request, and return a future capturing the corresponding response.
-  // WARNING: make sure that if the future response is not failed, its entity
-  // be drained entirely or the connection will be kept open until timeouts kick in.
-  private def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
-    futureRequest flatMap { request =>
-      val promise = Promise[HttpResponse]
-
-      // When the future completes, we know whether the request made it
-      // through the queue.
-      requestQueue.offer(request -> promise).flatMap { buffered =>
-        buffered match {
-          case QueueOfferResult.Enqueued =>
-            promise.future
-
-          case QueueOfferResult.Dropped =>
-            Future.failed(new Exception("DB request queue is full."))
-
-          case QueueOfferResult.QueueClosed =>
-            Future.failed(new Exception("DB request queue was closed."))
-
-          case QueueOfferResult.Failure(f) =>
-            Future.failed(f)
-        }
-      }
-    }
-  }
-
-  // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
-  protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
-    request0(futureRequest) flatMap { response =>
-      if (response.status.isSuccess()) {
-        Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
-          Right(o)
-        }
-      } else {
-        // This is important, as it drains the entity stream.
-        // Otherwise the connection stays open and the pool dries up.
-        response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ =>
-          Left(response.status)
-        }
-      }
-    }
-  }
-
-  import spray.json.DefaultJsonProtocol._
-
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid
   def putDoc(id: String, doc: JsObject): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc))
+    requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid
   def putDoc(id: String, rev: String, doc: JsObject): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, forRev = Some(rev)))
+    requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/2.1.0/api/database/bulk-api.html#inserting-documents-in-bulk
   def putDocs(docs: Seq[JsObject]): Future[Either[StatusCode, JsArray]] =
-    requestJson[JsArray](mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson)))
+    requestJson[JsArray](
+      mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson), baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
   def getDoc(id: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id)))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
   def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), forRev = Some(rev)))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid
   def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), forRev = Some(rev)))
+    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html
   def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil,
@@ -238,7 +138,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
 
     val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap))
 
-    requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders))
   }
 
   // Streams an attachment to the database
@@ -249,7 +149,8 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
                     contentType: ContentType,
                     source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = {
     val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs)))
-    val request = mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), forRev = Some(rev))
+    val request =
+      mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
     requestJson[JsObject](request)
   }
 
@@ -259,7 +160,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
                        rev: String,
                        attName: String,
                        sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = {
-    val request = mkRequest(HttpMethods.GET, uri(db, id, attName), forRev = Some(rev))
+    val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev))
 
     request0(request) flatMap { response =>
       if (response.status.isSuccess()) {
@@ -269,19 +170,4 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
       }
     }
   }
-
-  def shutdown(): Future[Unit] = {
-    materializer.shutdown()
-    // The code below shuts down the pool, but is apparently not tolerant
-    // to multiple clients shutting down the same pool (the second one just
-    // hangs). Given that shutdown is only relevant for tests (unused pools
-    // close themselves anyway after some time) and that they can call
-    // Http().shutdownAllConnectionPools(), this is not a major issue.
-    /* Reintroduce below if they ever make HostConnectionPool.shutdown()
-     * safe to call >1x.
-     * val poolOpt = poolPromise.future.value.map(_.toOption).flatten
-     * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
-     */
-    Future.successful(())
-  }
 }
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
new file mode 100644
index 0000000..ef09770
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.http
+
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.{Failure, Success}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.Http.HostConnectionPool
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.unmarshalling._
+import akka.stream.ActorMaterializer
+import akka.stream.OverflowStrategy
+import akka.stream.QueueOfferResult
+import akka.stream.scaladsl._
+
+import spray.json._
+
+/**
+ * This class only handles the basic communication to the proper endpoints.
+ *  It is up to its clients to interpret the results. It is built on akka-http
+ *  host-level connection pools; compared to single requests, it saves some time
+ *  on each request because it doesn't need to look up the pool corresponding
+ *  to the host. It is also easier to add an extra queueing mechanism.
+ */
+class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: Int)(implicit system: ActorSystem) {
+  require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
+
+  implicit val context = system.dispatcher
+  implicit val materializer = ActorMaterializer()
+
+  // Creates or retrieves a connection pool for the host.
+  private val pool = if (protocol == "http") {
+    Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port)
+  } else {
+    Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
+  }
+
+  private val poolPromise = Promise[HostConnectionPool]
+
+  // Additional queue in case all connections are busy. Should hardly ever be
+  // filled in practice but can be useful, e.g., in tests starting many
+  // asynchronous requests in a very short period of time.
+  private val requestQueue = Source
+    .queue(queueSize, OverflowStrategy.dropNew)
+    .via(pool.mapMaterializedValue { x =>
+      poolPromise.success(x); x
+    })
+    .toMat(Sink.foreach({
+      case ((Success(response), p)) => p.success(response)
+      case ((Failure(error), p))    => p.failure(error)
+    }))(Keep.left)
+    .run
+
+  // Prepares a request with the proper headers.
+  def mkRequest0(method: HttpMethod,
+                 uri: Uri,
+                 body: Future[MessageEntity],
+                 headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+    body.map { b =>
+      HttpRequest(method, uri, headers, b)
+    }
+  }
+
+  protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+    mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers)
+  }
+
+  protected def mkJsonRequest(method: HttpMethod,
+                              uri: Uri,
+                              body: JsValue,
+                              headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+    val b = Marshal(body).to[MessageEntity]
+    mkRequest0(method, uri, b, headers)
+  }
+
+  // Enqueue a request, and return a future capturing the corresponding response.
+  // WARNING: make sure that if the future response is not failed, its entity
+  // be drained entirely or the connection will be kept open until timeouts kick in.
+  def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
+    futureRequest flatMap { request =>
+      val promise = Promise[HttpResponse]
+
+      // When the future completes, we know whether the request made it
+      // through the queue.
+      requestQueue.offer(request -> promise).flatMap { buffered =>
+        buffered match {
+          case QueueOfferResult.Enqueued =>
+            promise.future
+
+          case QueueOfferResult.Dropped =>
+            Future.failed(new Exception("DB request queue is full."))
+
+          case QueueOfferResult.QueueClosed =>
+            Future.failed(new Exception("DB request queue was closed."))
+
+          case QueueOfferResult.Failure(f) =>
+            Future.failed(f)
+        }
+      }
+    }
+  }
+
+  // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
+  protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
+    request0(futureRequest) flatMap { response =>
+      if (response.status.isSuccess()) {
+        Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
+          Right(o)
+        }
+      } else {
+        // This is important, as it drains the entity stream.
+        // Otherwise the connection stays open and the pool dries up.
+        response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ =>
+          Left(response.status)
+        }
+      }
+    }
+  }
+
+  def shutdown(): Future[Unit] = {
+    materializer.shutdown()
+    // The code below shuts down the pool, but is apparently not tolerant
+    // to multiple clients shutting down the same pool (the second one just
+    // hangs). Given that shutdown is only relevant for tests (unused pools
+    // close themselves anyway after some time) and that they can call
+    // Http().shutdownAllConnectionPools(), this is not a major issue.
+    /* Reintroduce below if they ever make HostConnectionPool.shutdown()
+     * safe to call >1x.
+     * val poolOpt = poolPromise.future.value.map(_.toOption).flatten
+     * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
+     */
+    Future.successful(())
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
index 3b6219b..d554e31 100644
--- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
+++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
@@ -39,23 +39,22 @@ class ExtendedCouchDbRestClient(protocol: String,
 
   // http://docs.couchdb.org/en/1.6.1/api/server/common.html#get--
   def instanceInfo(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs
   def dbs(): Future[Either[StatusCode, List[String]]] = {
-    implicit val ec = system.dispatcher
-    requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"))).map { either =>
+    requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either =>
       either.right.map(_.convertTo[List[String]])
     }
   }
 
   // http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db
   def createDb(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db)))
+    requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db
   def deleteDb(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db)))
+    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs
   def getAllDocs(skip: Option[Int] = None,
@@ -76,6 +75,6 @@ class ExtendedCouchDbRestClient(protocol: String,
       .toMap
 
     val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap))
-    requestJson[JsObject](mkRequest(HttpMethods.GET, url))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders))
   }
 }

-- 
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.