You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/06/18 06:21:16 UTC

[GitHub] markusthoemmes closed pull request #3416: Refactor and unit test PoolingRestClient

markusthoemmes closed pull request #3416: Refactor and unit test PoolingRestClient
URL: https://github.com/apache/incubator-openwhisk/pull/3416
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
index 2f34c26da3..1597eccadc 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
@@ -32,6 +32,7 @@ import scala.util.Try
 import spray.json._
 
 import whisk.http.PoolingRestClient
+import whisk.http.PoolingRestClient._
 
 trait EsQueryMethod
 trait EsOrder
@@ -164,11 +165,11 @@ class ElasticSearchRestClient(
   private val baseHeaders: List[HttpHeader] = List(Accept(MediaTypes.`application/json`))
 
   def info(headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
-    requestJson[JsObject](mkRequest(GET, Uri./, baseHeaders ++ headers))
+    requestJson[JsObject](mkRequest(GET, Uri./, headers = baseHeaders ++ headers))
   }
 
   def index(index: String, headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
-    requestJson[JsObject](mkRequest(GET, Uri(index), baseHeaders ++ headers))
+    requestJson[JsObject](mkRequest(GET, Uri(index), headers = baseHeaders ++ headers))
   }
 
   def search[T: RootJsonReader](index: String,
diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
index 682df26228..ce6a9a9a88 100644
--- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
@@ -22,9 +22,12 @@ import scala.concurrent.Future
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.HttpMethods
 import akka.http.scaladsl.model.StatusCode
+
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+
 import whisk.common.Logging
+import whisk.http.PoolingRestClient._
 
 /**
  * This class only handles the basic communication to the proper endpoints
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index e38ce6f69f..f3d25a5286 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -33,6 +33,7 @@ import spray.json.DefaultJsonProtocol._
 
 import whisk.common.Logging
 import whisk.http.PoolingRestClient
+import whisk.http.PoolingRestClient._
 
 /**
  * This class only handles the basic communication to the proper endpoints
@@ -74,15 +75,15 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
   def getDoc(id: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), headers = baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
   def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev)))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), headers = baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid
   def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev)))
+    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), headers = baseHeaders ++ revHeader(rev)))
 
   // http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html
   def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil,
@@ -138,7 +139,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
 
     val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap))
 
-    requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, headers = baseHeaders))
   }
 
   // Streams an attachment to the database
@@ -150,7 +151,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
                     source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = {
     val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs)))
     val request =
-      mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
+      mkRequest(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
     requestJson[JsObject](request)
   }
 
@@ -160,9 +161,9 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
                        rev: String,
                        attName: String,
                        sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = {
-    val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev))
+    val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers = baseHeaders ++ revHeader(rev))
 
-    request0(request) flatMap { response =>
+    request(httpRequest) flatMap { response =>
       if (response.status.isSuccess()) {
         response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r => Right(response.entity.contentType, r))
       } else {
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index 0da66305dc..b842e759ba 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -17,10 +17,9 @@
 
 package whisk.http
 
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.util.{Failure, Success}
-import scala.util.Try
+import scala.concurrent.{Future, Promise}
+import scala.util.{Failure, Success, Try}
+import scala.concurrent.ExecutionContext
 
 import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
@@ -81,32 +80,10 @@ class PoolingRestClient(
     }))(Keep.left)
     .run
 
-  // Prepares a request with the proper headers.
-  def mkRequest0(method: HttpMethod,
-                 uri: Uri,
-                 body: Future[MessageEntity],
-                 headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
-    body.map { b =>
-      HttpRequest(method, uri, headers, b)
-    }
-  }
-
-  protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
-    mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers)
-  }
-
-  protected def mkJsonRequest(method: HttpMethod,
-                              uri: Uri,
-                              body: JsValue,
-                              headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
-    val b = Marshal(body).to[MessageEntity]
-    mkRequest0(method, uri, b, headers)
-  }
-
   // Enqueue a request, and return a future capturing the corresponding response.
   // WARNING: make sure that if the future response is not failed, its entity
   // be drained entirely or the connection will be kept open until timeouts kick in.
-  def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
+  def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
     futureRequest flatMap { request =>
       val promise = Promise[HttpResponse]
 
@@ -131,8 +108,8 @@ class PoolingRestClient(
   }
 
   // Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
-  protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
-    request0(futureRequest) flatMap { response =>
+  def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
+    request(futureRequest) flatMap { response =>
       if (response.status.isSuccess()) {
         Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
           Right(o)
@@ -162,3 +139,21 @@ class PoolingRestClient(
     Future.successful(())
   }
 }
+
+object PoolingRestClient {
+
+  def mkRequest(method: HttpMethod,
+                uri: Uri,
+                body: Future[MessageEntity] = Future.successful(HttpEntity.Empty),
+                headers: List[HttpHeader] = List.empty)(implicit ec: ExecutionContext): Future[HttpRequest] = {
+    body.map { b =>
+      HttpRequest(method, uri, headers, b)
+    }
+  }
+
+  def mkJsonRequest(method: HttpMethod, uri: Uri, body: JsValue, headers: List[HttpHeader] = List.empty)(
+    implicit ec: ExecutionContext): Future[HttpRequest] = {
+    val b = Marshal(body).to[MessageEntity]
+    mkRequest(method, uri, b, headers)
+  }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
index d554e31846..a4019c9bf0 100644
--- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
+++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
@@ -21,10 +21,13 @@ import scala.concurrent.Future
 
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model._
+
 import spray.json._
 import spray.json.DefaultJsonProtocol._
+
 import whisk.common.Logging
 import whisk.core.database.CouchDbRestClient
+import whisk.http.PoolingRestClient._
 
 /**
  * Implementation of additional endpoints that should only be used in testing.
@@ -39,22 +42,22 @@ class ExtendedCouchDbRestClient(protocol: String,
 
   // http://docs.couchdb.org/en/1.6.1/api/server/common.html#get--
   def instanceInfo(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, headers = baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs
   def dbs(): Future[Either[StatusCode, List[String]]] = {
-    requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either =>
+    requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), headers = baseHeaders)).map { either =>
       either.right.map(_.convertTo[List[String]])
     }
   }
 
   // http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db
   def createDb(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), headers = baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db
   def deleteDb(): Future[Either[StatusCode, JsObject]] =
-    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), headers = baseHeaders))
 
   // http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs
   def getAllDocs(skip: Option[Int] = None,
@@ -75,6 +78,6 @@ class ExtendedCouchDbRestClient(protocol: String,
       .toMap
 
     val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap))
-    requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders))
+    requestJson[JsObject](mkRequest(HttpMethods.GET, url, headers = baseHeaders))
   }
 }
diff --git a/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala b/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala
new file mode 100644
index 0000000000..7213e4ae4b
--- /dev/null
+++ b/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.http
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Flow
+import akka.stream.ActorMaterializer
+import akka.testkit.TestKit
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model.StatusCodes.{InternalServerError, NotFound}
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.unmarshalling.Unmarshaller.UnsupportedContentTypeException
+
+import common.StreamLogging
+
+import spray.json.JsObject
+import spray.json.DefaultJsonProtocol._
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise, TimeoutException}
+import scala.util.{Success, Try}
+
+import whisk.http.PoolingRestClient._
+
+@RunWith(classOf[JUnitRunner])
+class PoolingRestClientTests
+    extends TestKit(ActorSystem("PoolingRestClientTests"))
+    with FlatSpecLike
+    with Matchers
+    with ScalaFutures
+    with StreamLogging {
+  implicit val ec: ExecutionContext = system.dispatcher
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+    : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          request shouldBe httpRequest
+          Future.successful((Success(httpResponse), userContext))
+      }
+
+  def failFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+    : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+    Flow[(HttpRequest, Promise[HttpResponse])]
+      .mapAsyncUnordered(1) {
+        case (request, userContext) =>
+          Future.failed(new Exception)
+      }
+
+  def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
+
+  behavior of "Pooling REST Client"
+
+  it should "error when configuration protocol is invalid" in {
+    a[IllegalArgumentException] should be thrownBy new PoolingRestClient("invalid", "host", 443, 1)
+  }
+
+  it should "get a non-200 status code when performing a request" in {
+    val httpResponse = HttpResponse(InternalServerError)
+    val httpRequest = HttpRequest()
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "return payload from a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest()
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "send headers when making a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(headers = List(RawHeader("key", "value")))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "send uri when making a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(uri = Uri("/some/where"))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "send a payload when making a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(POST, entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "payload"))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+    await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+  }
+
+  it should "return JSON when making a request" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+    val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+    await(poolingRestClient.requestJson[JsObject](request)) shouldBe Right(JsObject())
+  }
+
+  it should "throw timeout exception when Future fails in httpFlow" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(failFlow(httpResponse, httpRequest)))
+    val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+    a[TimeoutException] should be thrownBy await(poolingRestClient.requestJson[JsObject](request))
+  }
+
+  it should "return a status code on request failure" in {
+    val httpResponse = HttpResponse(NotFound)
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+    val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+    await(poolingRestClient.requestJson[JsObject](request)) shouldBe Left(NotFound)
+  }
+
+  it should "throw an unsupported content-type exception when unexpected content-type is returned" in {
+    val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "plain text"))
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+    val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+    a[UnsupportedContentTypeException] should be thrownBy await(poolingRestClient.requestJson[JsObject](request))
+  }
+
+  it should "create an HttpRequest without a payload" in {
+    val httpRequest = HttpRequest()
+
+    await(mkRequest(GET, Uri./)) shouldBe httpRequest
+  }
+
+  it should "create an HttpRequest with a JSON payload" in {
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+
+    await(mkJsonRequest(GET, Uri./, JsObject(), List.empty)) shouldBe httpRequest
+  }
+
+  it should "create an HttpRequest with a payload" in {
+    val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+    val request = mkRequest(
+      GET,
+      Uri./,
+      Future.successful(HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)),
+      List.empty)
+
+    await(request) shouldBe httpRequest
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services