You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/06/18 06:21:17 UTC

[incubator-openwhisk] branch master updated: Refactor and unit test PoolingRestClient. (#3416)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new b99aaa3  Refactor and unit test PoolingRestClient. (#3416)
b99aaa3 is described below

commit b99aaa3c8b4e6e7dac167f7b6f9a5e9ed13989d5
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Mon Jun 18 02:21:14 2018 -0400

    Refactor and unit test PoolingRestClient. (#3416)
---
 .../logging/ElasticSearchRestClient.scala          |   5 +-
 .../whisk/core/database/CloudantRestClient.scala   |   3 +
 .../whisk/core/database/CouchDbRestClient.scala    |  15 +-
 .../main/scala/whisk/http/PoolingRestClient.scala  |  53 +++---
 .../database/test/ExtendedCouchDbRestClient.scala  |  13 +-
 .../scala/whisk/http/PoolingRestClientTests.scala  | 181 +++++++++++++++++++++
 6 files changed, 227 insertions(+), 43 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
index 2f34c26..1597ecc 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
@@ -32,6 +32,7 @@ import scala.util.Try
 import spray.json._
 
 import whisk.http.PoolingRestClient
+import whisk.http.PoolingRestClient._
 
 trait EsQueryMethod
 trait EsOrder
@@ -164,11 +165,11 @@ class ElasticSearchRestClient(
   private val baseHeaders: List[HttpHeader] = List(Accept(MediaTypes.`application/json`))
 
   def info(headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
-    requestJson[JsObject](mkRequest(GET, Uri./, baseHeaders ++ headers))
+    requestJson[JsObject](mkRequest(GET, Uri./, headers = baseHeaders ++ headers))
   }
 
   def index(index: String, headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
-    requestJson[JsObject](mkRequest(GET, Uri(index), baseHeaders ++ headers))
+    requestJson[JsObject](mkRequest(GET, Uri(index), headers = baseHeaders ++ headers))
   }
 
   def search[T: RootJsonReader](index: String,
diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
index 682df26..ce6a9a9 100644
--- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
@@ -22,9 +22,12 @@ import scala.concurrent.Future
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.HttpMethods
 import akka.http.scaladsl.model.StatusCode
+
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+
 import whisk.common.Logging
+import whisk.http.PoolingRestClient._
 
 /**
  * This class only handles the basic communication to the proper endpoints
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index 3e8143a..16e6cfa 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -30,6 +30,7 @@ import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.common.Logging
 import whisk.http.PoolingRestClient
+import whisk.http.PoolingRestClient._
 
 /**
  * This class only handles the basic communication to the proper endpoints
@@ -73,15 +74,15 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
   def getDoc(id: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), headers = baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
   def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev)))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), headers = baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid
   def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev)))
+    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), headers = baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html
   def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil,
@@ -137,7 +138,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
 
     val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap))
 
-    requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, headers = baseHeaders))
   }
 
   // Streams an attachment to the database
@@ -149,7 +150,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
                     source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = {
     val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs)))
     val request =
-      mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
+      mkRequest(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
     requestJson[JsObject](request)
   }
 
@@ -159,9 +160,9 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
                        rev: String,
                        attName: String,
                        sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = {
-    val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev))
+    val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers = baseHeaders ++ revHeader(rev))
 
-    request0(request) flatMap { response =>
+    request(httpRequest) flatMap { response =>
       if (response.status.isSuccess()) {
         response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r => Right(response.entity.contentType, r))
       } else {
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index 0da6630..b842e75 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -17,10 +17,9 @@
 
 package whisk.http
 
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.util.{Failure, Success}
-import scala.util.Try
+import scala.concurrent.{Future, Promise}
+import scala.util.{Failure, Success, Try}
+import scala.concurrent.ExecutionContext
 
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
@@ -81,32 +80,10 @@ class PoolingRestClient(
     }))(Keep.left)
     .run
 
-  // Prepares a request with the proper headers.
-  def mkRequest0(method: HttpMethod,
-                 uri: Uri,
-                 body: Future[MessageEntity],
-                 headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
-    body.map { b =>
-      HttpRequest(method, uri, headers, b)
-    }
-  }
-
-  protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
-    mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers)
-  }
-
-  protected def mkJsonRequest(method: HttpMethod,
-                              uri: Uri,
-                              body: JsValue,
-                              headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
-    val b = Marshal(body).to[MessageEntity]
-    mkRequest0(method, uri, b, headers)
-  }
-
   // Enqueue a request, and return a future capturing the corresponding response.
   // WARNING: make sure that if the future response is not failed, its entity
   // be drained entirely or the connection will be kept open until timeouts kick in.
-  def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
+  def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
     futureRequest flatMap { request =>
       val promise = Promise[HttpResponse]
 
@@ -131,8 +108,8 @@ class PoolingRestClient(
   }
 
   // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
-  protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
-    request0(futureRequest) flatMap { response =>
+  def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
+    request(futureRequest) flatMap { response =>
       if (response.status.isSuccess()) {
         Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
           Right(o)
@@ -162,3 +139,21 @@ class PoolingRestClient(
     Future.successful(())
   }
 }
+
+object PoolingRestClient {
+
+  def mkRequest(method: HttpMethod,
+                uri: Uri,
+                body: Future[MessageEntity] = Future.successful(HttpEntity.Empty),
+                headers: List[HttpHeader] = List.empty)(implicit ec: ExecutionContext): Future[HttpRequest] = {
+    body.map { b =>
+      HttpRequest(method, uri, headers, b)
+    }
+  }
+
+  def mkJsonRequest(method: HttpMethod, uri: Uri, body: JsValue, headers: List[HttpHeader] = List.empty)(
+    implicit ec: ExecutionContext): Future[HttpRequest] = {
+    val b = Marshal(body).to[MessageEntity]
+    mkRequest(method, uri, b, headers)
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
index d554e31..a4019c9 100644
--- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
+++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
@@ -21,10 +21,13 @@ import scala.concurrent.Future
 
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model._
+
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+
 import whisk.common.Logging
 import whisk.core.database.CouchDbRestClient
+import whisk.http.PoolingRestClient._
 
 /**
  * Implementation of additional endpoints that should only be used in testing.
@@ -39,22 +42,22 @@ class ExtendedCouchDbRestClient(protocol: String,
 
   // http://docs.couchdb.org/en/1.6.1/api/server/common.html#get--
   def instanceInfo(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, headers = baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs
   def dbs(): Future[Either[StatusCode, List[String]]] = {
-    requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either =>
+    requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), headers = baseHeaders)).map { either =>
       either.right.map(_.convertTo[List[String]])
     }
   }
 
   // http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db
   def createDb(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), headers = baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db
   def deleteDb(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), headers = baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs
   def getAllDocs(skip: Option[Int] = None,
@@ -75,6 +78,6 @@ class ExtendedCouchDbRestClient(protocol: String,
       .toMap
 
     val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap))
-    requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, url, headers = baseHeaders))
   }
 }
diff --git a/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala b/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala
new file mode 100644
index 0000000..7213e4a
--- /dev/null
+++ b/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.http
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Flow
+import akka.stream.ActorMaterializer
+import akka.testkit.TestKit
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model.StatusCodes.{InternalServerError, NotFound}
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.unmarshalling.Unmarshaller.UnsupportedContentTypeException
+
+import common.StreamLogging
+
+import spray.json.JsObject
+import spray.json.DefaultJsonProtocol._
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise, TimeoutException}
+import scala.util.{Success, Try}
+
+import whisk.http.PoolingRestClient._
+
+@RunWith(classOf[JUnitRunner])
+class PoolingRestClientTests
+    extends TestKit(ActorSystem("PoolingRestClientTests"))
+    with FlatSpecLike
+    with Matchers
+    with ScalaFutures
+    with StreamLogging {
+  implicit val ec: ExecutionContext = system.dispatcher
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+    : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          request shouldBe httpRequest
+          Future.successful((Success(httpResponse), userContext))
+      }
+
+  def failFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+    : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          Future.failed(new Exception)
+      }
+
+  def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
+
+  behavior of "Pooling REST Client"
+
+  it should "error when configuration protocol is invalid" in {
+    a[IllegalArgumentException] should be thrownBy new PoolingRestClient("invalid", "host", 443, 1)
+  }
+
+  it should "get a non-200 status code when performing a request" in {
+    val httpResponse = HttpResponse(InternalServerError)
+    val httpRequest = HttpRequest()
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "return payload from a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest()
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "send headers when making a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(headers = List(RawHeader("key", "value")))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "send uri when making a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(uri = Uri("/some/where"))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "send a payload when making a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(POST, entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "payload"))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "return JSON when making a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+    val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+    await(poolingRestClient.requestJson[JsObject](request)) shouldBe Right(JsObject())
+  }
+
+  it should "throw timeout exception when Future fails in httpFlow" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(failFlow(httpResponse, httpRequest)))
+    val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+    a[TimeoutException] should be thrownBy await(poolingRestClient.requestJson[JsObject](request))
+  }
+
+  it should "return a status code on request failure" in {
+    val httpResponse = HttpResponse(NotFound)
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+    val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+    await(poolingRestClient.requestJson[JsObject](request)) shouldBe Left(NotFound)
+  }
+
+  it should "throw an unsupported content-type exception when unexpected content-type is returned" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "plain text"))
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+    val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+    a[UnsupportedContentTypeException] should be thrownBy await(poolingRestClient.requestJson[JsObject](request))
+  }
+
+  it should "create an HttpRequest without a payload" in {
+    val httpRequest = HttpRequest()
+
+    await(mkRequest(GET, Uri./)) shouldBe httpRequest
+  }
+
+  it should "create an HttpRequest with a JSON payload" in {
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+
+    await(mkJsonRequest(GET, Uri./, JsObject(), List.empty)) shouldBe httpRequest
+  }
+
+  it should "create an HttpRequest with a payload" in {
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val request = mkRequest(
+      GET,
+      Uri./,
+      Future.successful(HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)),
+      List.empty)
+
+    await(request) shouldBe httpRequest
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.