You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/27 19:00:37 UTC

[GitHub] markusthoemmes closed pull request #3347: Generic Pooling REST Client

markusthoemmes closed pull request #3347: Generic Pooling REST Client
URL: https://github.com/apache/incubator-openwhisk/pull/3347
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
index 29ec17e922..682df26228 100644
--- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
@@ -37,6 +37,6 @@ class CloudantRestClient(host: String, port: Int, username: String, password: St
 
   // https://cloudant.com/blog/cloudant-query-grows-up-to-handle-ad-hoc-queries/#.VvllCD-0z2C
   def simpleQuery(doc: JsObject): Future[Either[StatusCode, JsObject]] = {
-    requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc))
+    requestJson[JsObject](mkJsonRequest(HttpMethods.POST, uri(db, "_find"), doc, baseHeaders))
   }
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index 0f791b29f1..e38ce6f69f 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -17,28 +17,22 @@
 
 package whisk.core.database
 
+import scala.concurrent.Future
+
 import java.net.URLEncoder
 import java.nio.charset.StandardCharsets
 
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.util.{Failure, Success}
-
 import akka.actor.ActorSystem
-import akka.http.scaladsl.Http
-import akka.http.scaladsl.Http.HostConnectionPool
-import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
-import akka.http.scaladsl.marshalling._
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.headers._
-import akka.http.scaladsl.unmarshalling._
-import akka.stream.ActorMaterializer
-import akka.stream.OverflowStrategy
-import akka.stream.QueueOfferResult
 import akka.stream.scaladsl._
 import akka.util.ByteString
+
 import spray.json._
+import spray.json.DefaultJsonProtocol._
+
 import whisk.common.Logging
+import whisk.http.PoolingRestClient
 
 /**
  * This class only handles the basic communication to the proper endpoints
@@ -50,35 +44,14 @@ import whisk.common.Logging
  */
 class CouchDbRestClient(protocol: String, host: String, port: Int, username: String, password: String, db: String)(
   implicit system: ActorSystem,
-  logging: Logging) {
-  require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
-
-  private implicit val context = system.dispatcher
-  private implicit val materializer = ActorMaterializer()
+  logging: Logging)
+    extends PoolingRestClient(protocol, host, port, 16 * 1024) {
 
-  // Creates or retrieves a connection pool for the host.
-  private val pool = if (protocol == "http") {
-    Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port)
-  } else {
-    Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
-  }
-
-  private val poolPromise = Promise[HostConnectionPool]
+  // Headers common to all requests.
+  val baseHeaders: List[HttpHeader] =
+    List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))
 
-  // Additional queue in case all connections are busy. Should hardly ever be
-  // filled in practice but can be useful, e.g., in tests starting many
-  // asynchronous requests in a very short period of time.
-  private val QUEUE_SIZE = 16 * 1024;
-  private val requestQueue = Source
-    .queue(QUEUE_SIZE, OverflowStrategy.dropNew)
-    .via(pool.mapMaterializedValue { x =>
-      poolPromise.success(x); x
-    })
-    .toMat(Sink.foreach({
-      case ((Success(response), p)) => p.success(response)
-      case ((Failure(error), p))    => p.failure(error)
-    }))(Keep.left)
-    .run
+  def revHeader(forRev: String) = List(`If-Match`(EntityTagRange(EntityTag(forRev))))
 
   // Properly encodes the potential slashes in each segment.
   protected def uri(segments: Any*): Uri = {
@@ -86,103 +59,30 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
     Uri(s"/${encodedSegments.mkString("/")}")
   }
 
-  // Headers common to all requests.
-  private val baseHeaders =
-    List(Authorization(BasicHttpCredentials(username, password)), Accept(MediaTypes.`application/json`))
-
-  // Prepares a request with the proper headers.
-  private def mkRequest0(method: HttpMethod,
-                         uri: Uri,
-                         body: Future[MessageEntity],
-                         forRev: Option[String] = None): Future[HttpRequest] = {
-    val revHeader = forRev.map(r => `If-Match`(EntityTagRange(EntityTag(r)))).toList
-    val headers = revHeader ::: baseHeaders
-    body.map { b =>
-      HttpRequest(method = method, uri = uri, headers = headers, entity = b)
-    }
-  }
-
-  protected def mkRequest(method: HttpMethod, uri: Uri, forRev: Option[String] = None): Future[HttpRequest] = {
-    mkRequest0(method, uri, Future.successful(HttpEntity.Empty), forRev = forRev)
-  }
-
-  protected def mkJsonRequest(method: HttpMethod,
-                              uri: Uri,
-                              body: JsValue,
-                              forRev: Option[String] = None): Future[HttpRequest] = {
-    val b = Marshal(body).to[MessageEntity]
-    mkRequest0(method, uri, b, forRev = forRev)
-  }
-
-  // Enqueue a request, and return a future capturing the corresponding response.
-  // WARNING: make sure that if the future response is not failed, its entity
-  // be drained entirely or the connection will be kept open until timeouts kick in.
-  private def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
-    futureRequest flatMap { request =>
-      val promise = Promise[HttpResponse]
-
-      // When the future completes, we know whether the request made it
-      // through the queue.
-      requestQueue.offer(request -> promise).flatMap { buffered =>
-        buffered match {
-          case QueueOfferResult.Enqueued =>
-            promise.future
-
-          case QueueOfferResult.Dropped =>
-            Future.failed(new Exception("DB request queue is full."))
-
-          case QueueOfferResult.QueueClosed =>
-            Future.failed(new Exception("DB request queue was closed."))
-
-          case QueueOfferResult.Failure(f) =>
-            Future.failed(f)
-        }
-      }
-    }
-  }
-
-  // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
-  protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
-    request0(futureRequest) flatMap { response =>
-      if (response.status.isSuccess()) {
-        Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
-          Right(o)
-        }
-      } else {
-        // This is important, as it drains the entity stream.
-        // Otherwise the connection stays open and the pool dries up.
-        response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ =>
-          Left(response.status)
-        }
-      }
-    }
-  }
-
-  import spray.json.DefaultJsonProtocol._
-
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid
   def putDoc(id: String, doc: JsObject): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc))
+    requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#put--db-docid
   def putDoc(id: String, rev: String, doc: JsObject): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, forRev = Some(rev)))
+    requestJson[JsObject](mkJsonRequest(HttpMethods.PUT, uri(db, id), doc, baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/2.1.0/api/database/bulk-api.html#inserting-documents-in-bulk
   def putDocs(docs: Seq[JsObject]): Future[Either[StatusCode, JsArray]] =
-    requestJson[JsArray](mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson)))
+    requestJson[JsArray](
+      mkJsonRequest(HttpMethods.POST, uri(db, "_bulk_docs"), JsObject("docs" -> docs.toJson), baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
   def getDoc(id: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id)))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
   def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), forRev = Some(rev)))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid
   def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), forRev = Some(rev)))
+    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html
   def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil,
@@ -238,7 +138,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
 
     val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap))
 
-    requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders))
   }
 
   // Streams an attachment to the database
@@ -249,7 +149,8 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
                     contentType: ContentType,
                     source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = {
     val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs)))
-    val request = mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), forRev = Some(rev))
+    val request =
+      mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
     requestJson[JsObject](request)
   }
 
@@ -259,7 +160,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
                        rev: String,
                        attName: String,
                        sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = {
-    val request = mkRequest(HttpMethods.GET, uri(db, id, attName), forRev = Some(rev))
+    val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev))
 
     request0(request) flatMap { response =>
       if (response.status.isSuccess()) {
@@ -269,19 +170,4 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
       }
     }
   }
-
-  def shutdown(): Future[Unit] = {
-    materializer.shutdown()
-    // The code below shuts down the pool, but is apparently not tolerant
-    // to multiple clients shutting down the same pool (the second one just
-    // hangs). Given that shutdown is only relevant for tests (unused pools
-    // close themselves anyway after some time) and that they can call
-    // Http().shutdownAllConnectionPools(), this is not a major issue.
-    /* Reintroduce below if they ever make HostConnectionPool.shutdown()
-     * safe to call >1x.
-     * val poolOpt = poolPromise.future.value.map(_.toOption).flatten
-     * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
-     */
-    Future.successful(())
-  }
 }
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
new file mode 100644
index 0000000000..ef097703ad
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.http
+
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.{Failure, Success}
+
+import akka.actor.ActorSystem
+import akka.http.scaladsl.Http
+import akka.http.scaladsl.Http.HostConnectionPool
+import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
+import akka.http.scaladsl.marshalling._
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.unmarshalling._
+import akka.stream.ActorMaterializer
+import akka.stream.OverflowStrategy
+import akka.stream.QueueOfferResult
+import akka.stream.scaladsl._
+
+import spray.json._
+
+/**
+ * This class only handles the basic communication to the proper endpoints.
+ *  It is up to its clients to interpret the results. It is built on akka-http
+ *  host-level connection pools; compared to single requests, it saves some time
+ *  on each request because it doesn't need to look up the pool corresponding
+ *  to the host. It is also easier to add an extra queueing mechanism.
+ */
+class PoolingRestClient(protocol: String, host: String, port: Int, queueSize: Int)(implicit system: ActorSystem) {
+  require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.")
+
+  implicit val context = system.dispatcher
+  implicit val materializer = ActorMaterializer()
+
+  // Creates or retrieves a connection pool for the host.
+  private val pool = if (protocol == "http") {
+    Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port)
+  } else {
+    Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port)
+  }
+
+  private val poolPromise = Promise[HostConnectionPool]
+
+  // Additional queue in case all connections are busy. Should hardly ever be
+  // filled in practice but can be useful, e.g., in tests starting many
+  // asynchronous requests in a very short period of time.
+  private val requestQueue = Source
+    .queue(queueSize, OverflowStrategy.dropNew)
+    .via(pool.mapMaterializedValue { x =>
+      poolPromise.success(x); x
+    })
+    .toMat(Sink.foreach({
+      case ((Success(response), p)) => p.success(response)
+      case ((Failure(error), p))    => p.failure(error)
+    }))(Keep.left)
+    .run
+
+  // Prepares a request with the proper headers.
+  def mkRequest0(method: HttpMethod,
+                 uri: Uri,
+                 body: Future[MessageEntity],
+                 headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+    body.map { b =>
+      HttpRequest(method, uri, headers, b)
+    }
+  }
+
+  protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+    mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers)
+  }
+
+  protected def mkJsonRequest(method: HttpMethod,
+                              uri: Uri,
+                              body: JsValue,
+                              headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
+    val b = Marshal(body).to[MessageEntity]
+    mkRequest0(method, uri, b, headers)
+  }
+
+  // Enqueue a request, and return a future capturing the corresponding response.
+  // WARNING: make sure that if the future response is not failed, its entity
+  // be drained entirely or the connection will be kept open until timeouts kick in.
+  def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
+    futureRequest flatMap { request =>
+      val promise = Promise[HttpResponse]
+
+      // When the future completes, we know whether the request made it
+      // through the queue.
+      requestQueue.offer(request -> promise).flatMap { buffered =>
+        buffered match {
+          case QueueOfferResult.Enqueued =>
+            promise.future
+
+          case QueueOfferResult.Dropped =>
+            Future.failed(new Exception("DB request queue is full."))
+
+          case QueueOfferResult.QueueClosed =>
+            Future.failed(new Exception("DB request queue was closed."))
+
+          case QueueOfferResult.Failure(f) =>
+            Future.failed(f)
+        }
+      }
+    }
+  }
+
+  // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
+  protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
+    request0(futureRequest) flatMap { response =>
+      if (response.status.isSuccess()) {
+        Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
+          Right(o)
+        }
+      } else {
+        // This is important, as it drains the entity stream.
+        // Otherwise the connection stays open and the pool dries up.
+        response.entity.withoutSizeLimit().dataBytes.runWith(Sink.ignore).map { _ =>
+          Left(response.status)
+        }
+      }
+    }
+  }
+
+  def shutdown(): Future[Unit] = {
+    materializer.shutdown()
+    // The code below shuts down the pool, but is apparently not tolerant
+    // to multiple clients shutting down the same pool (the second one just
+    // hangs). Given that shutdown is only relevant for tests (unused pools
+    // close themselves anyway after some time) and that they can call
+    // Http().shutdownAllConnectionPools(), this is not a major issue.
+    /* Reintroduce below if they ever make HostConnectionPool.shutdown()
+     * safe to call >1x.
+     * val poolOpt = poolPromise.future.value.map(_.toOption).flatten
+     * poolOpt.map(_.shutdown().map(_ => ())).getOrElse(Future.successful(()))
+     */
+    Future.successful(())
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
index 3b6219ba69..d554e31846 100644
--- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
+++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
@@ -39,23 +39,22 @@ class ExtendedCouchDbRestClient(protocol: String,
 
   // http://docs.couchdb.org/en/1.6.1/api/server/common.html#get--
   def instanceInfo(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs
   def dbs(): Future[Either[StatusCode, List[String]]] = {
-    implicit val ec = system.dispatcher
-    requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"))).map { either =>
+    requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either =>
       either.right.map(_.convertTo[List[String]])
     }
   }
 
   // http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db
   def createDb(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db)))
+    requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db
   def deleteDb(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db)))
+    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs
   def getAllDocs(skip: Option[Int] = None,
@@ -76,6 +75,6 @@ class ExtendedCouchDbRestClient(protocol: String,
       .toMap
 
     val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap))
-    requestJson[JsObject](mkRequest(HttpMethods.GET, url))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders))
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services