You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/06/18 06:21:17 UTC
[incubator-openwhisk] branch master updated: Refactor and unit test
PoolingRestClient. (#3416)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new b99aaa3 Refactor and unit test PoolingRestClient. (#3416)
b99aaa3 is described below
commit b99aaa3c8b4e6e7dac167f7b6f9a5e9ed13989d5
Author: James Dubee <jw...@us.ibm.com>
AuthorDate: Mon Jun 18 02:21:14 2018 -0400
Refactor and unit test PoolingRestClient. (#3416)
---
.../logging/ElasticSearchRestClient.scala | 5 +-
.../whisk/core/database/CloudantRestClient.scala | 3 +
.../whisk/core/database/CouchDbRestClient.scala | 15 +-
.../main/scala/whisk/http/PoolingRestClient.scala | 53 +++---
.../database/test/ExtendedCouchDbRestClient.scala | 13 +-
.../scala/whisk/http/PoolingRestClientTests.scala | 181 +++++++++++++++++++++
6 files changed, 227 insertions(+), 43 deletions(-)
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
index 2f34c26..1597ecc 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/ElasticSearchRestClient.scala
@@ -32,6 +32,7 @@ import scala.util.Try
import spray.json._
import whisk.http.PoolingRestClient
+import whisk.http.PoolingRestClient._
trait EsQueryMethod
trait EsOrder
@@ -164,11 +165,11 @@ class ElasticSearchRestClient(
private val baseHeaders: List[HttpHeader] = List(Accept(MediaTypes.`application/json`))
def info(headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
- requestJson[JsObject](mkRequest(GET, Uri./, baseHeaders ++ headers))
+ requestJson[JsObject](mkRequest(GET, Uri./, headers = baseHeaders ++ headers))
}
def index(index: String, headers: List[HttpHeader] = List.empty): Future[Either[StatusCode, JsObject]] = {
- requestJson[JsObject](mkRequest(GET, Uri(index), baseHeaders ++ headers))
+ requestJson[JsObject](mkRequest(GET, Uri(index), headers = baseHeaders ++ headers))
}
def search[T: RootJsonReader](index: String,
diff --git a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
index 682df26..ce6a9a9 100644
--- a/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CloudantRestClient.scala
@@ -22,9 +22,12 @@ import scala.concurrent.Future
import akka.actor.ActorSystem
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.model.StatusCode
+
import spray.json._
import spray.json.DefaultJsonProtocol._
+
import whisk.common.Logging
+import whisk.http.PoolingRestClient._
/**
* This class only handles the basic communication to the proper endpoints
diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
index 3e8143a..16e6cfa 100644
--- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
+++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestClient.scala
@@ -30,6 +30,7 @@ import spray.json._
import spray.json.DefaultJsonProtocol._
import whisk.common.Logging
import whisk.http.PoolingRestClient
+import whisk.http.PoolingRestClient._
/**
* This class only handles the basic communication to the proper endpoints
@@ -73,15 +74,15 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
// http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
def getDoc(id: String): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), headers = baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/document/common.html#get--db-docid
def getDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), baseHeaders ++ revHeader(rev)))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, uri(db, id), headers = baseHeaders ++ revHeader(rev)))
// http://docs.couchdb.org/en/1.6.1/api/document/common.html#delete--db-docid
def deleteDoc(id: String, rev: String): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), baseHeaders ++ revHeader(rev)))
+ requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db, id), headers = baseHeaders ++ revHeader(rev)))
// http://docs.couchdb.org/en/1.6.1/api/ddoc/views.html
def executeView(designDoc: String, viewName: String)(startKey: List[Any] = Nil,
@@ -137,7 +138,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
val viewUri = uri(db, "_design", designDoc, "_view", viewName).withQuery(Uri.Query(argMap))
- requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, baseHeaders))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, viewUri, headers = baseHeaders))
}
// Streams an attachment to the database
@@ -149,7 +150,7 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
source: Source[ByteString, _]): Future[Either[StatusCode, JsObject]] = {
val entity = HttpEntity.Chunked(contentType, source.map(bs => HttpEntity.ChunkStreamPart(bs)))
val request =
- mkRequest0(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
+ mkRequest(HttpMethods.PUT, uri(db, id, attName), Future.successful(entity), baseHeaders ++ revHeader(rev))
requestJson[JsObject](request)
}
@@ -159,9 +160,9 @@ class CouchDbRestClient(protocol: String, host: String, port: Int, username: Str
rev: String,
attName: String,
sink: Sink[ByteString, Future[T]]): Future[Either[StatusCode, (ContentType, T)]] = {
- val request = mkRequest(HttpMethods.GET, uri(db, id, attName), baseHeaders ++ revHeader(rev))
+ val httpRequest = mkRequest(HttpMethods.GET, uri(db, id, attName), headers = baseHeaders ++ revHeader(rev))
- request0(request) flatMap { response =>
+ request(httpRequest) flatMap { response =>
if (response.status.isSuccess()) {
response.entity.withoutSizeLimit().dataBytes.runWith(sink).map(r => Right(response.entity.contentType, r))
} else {
diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
index 0da6630..b842e75 100644
--- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
+++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala
@@ -17,10 +17,9 @@
package whisk.http
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.util.{Failure, Success}
-import scala.util.Try
+import scala.concurrent.{Future, Promise}
+import scala.util.{Failure, Success, Try}
+import scala.concurrent.ExecutionContext
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
@@ -81,32 +80,10 @@ class PoolingRestClient(
}))(Keep.left)
.run
- // Prepares a request with the proper headers.
- def mkRequest0(method: HttpMethod,
- uri: Uri,
- body: Future[MessageEntity],
- headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
- body.map { b =>
- HttpRequest(method, uri, headers, b)
- }
- }
-
- protected def mkRequest(method: HttpMethod, uri: Uri, headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
- mkRequest0(method, uri, Future.successful(HttpEntity.Empty), headers)
- }
-
- protected def mkJsonRequest(method: HttpMethod,
- uri: Uri,
- body: JsValue,
- headers: List[HttpHeader] = List.empty): Future[HttpRequest] = {
- val b = Marshal(body).to[MessageEntity]
- mkRequest0(method, uri, b, headers)
- }
-
// Enqueue a request, and return a future capturing the corresponding response.
// WARNING: make sure that if the future response is not failed, its entity
// be drained entirely or the connection will be kept open until timeouts kick in.
- def request0(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
+ def request(futureRequest: Future[HttpRequest]): Future[HttpResponse] = {
futureRequest flatMap { request =>
val promise = Promise[HttpResponse]
@@ -131,8 +108,8 @@ class PoolingRestClient(
}
// Runs a request and returns either a JsObject, or a StatusCode if not 2xx.
- protected def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
- request0(futureRequest) flatMap { response =>
+ def requestJson[T: RootJsonReader](futureRequest: Future[HttpRequest]): Future[Either[StatusCode, T]] = {
+ request(futureRequest) flatMap { response =>
if (response.status.isSuccess()) {
Unmarshal(response.entity.withoutSizeLimit()).to[T].map { o =>
Right(o)
@@ -162,3 +139,21 @@ class PoolingRestClient(
Future.successful(())
}
}
+
+object PoolingRestClient {
+
+ def mkRequest(method: HttpMethod,
+ uri: Uri,
+ body: Future[MessageEntity] = Future.successful(HttpEntity.Empty),
+ headers: List[HttpHeader] = List.empty)(implicit ec: ExecutionContext): Future[HttpRequest] = {
+ body.map { b =>
+ HttpRequest(method, uri, headers, b)
+ }
+ }
+
+ def mkJsonRequest(method: HttpMethod, uri: Uri, body: JsValue, headers: List[HttpHeader] = List.empty)(
+ implicit ec: ExecutionContext): Future[HttpRequest] = {
+ val b = Marshal(body).to[MessageEntity]
+ mkRequest(method, uri, b, headers)
+ }
+}
diff --git a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
index d554e31..a4019c9 100644
--- a/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
+++ b/tests/src/test/scala/whisk/core/database/test/ExtendedCouchDbRestClient.scala
@@ -21,10 +21,13 @@ import scala.concurrent.Future
import akka.actor.ActorSystem
import akka.http.scaladsl.model._
+
import spray.json._
import spray.json.DefaultJsonProtocol._
+
import whisk.common.Logging
import whisk.core.database.CouchDbRestClient
+import whisk.http.PoolingRestClient._
/**
* Implementation of additional endpoints that should only be used in testing.
@@ -39,22 +42,22 @@ class ExtendedCouchDbRestClient(protocol: String,
// http://docs.couchdb.org/en/1.6.1/api/server/common.html#get--
def instanceInfo(): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, baseHeaders))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, Uri./, headers = baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/server/common.html#all-dbs
def dbs(): Future[Either[StatusCode, List[String]]] = {
- requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), baseHeaders)).map { either =>
+ requestJson[JsArray](mkRequest(HttpMethods.GET, uri("_all_dbs"), headers = baseHeaders)).map { either =>
either.right.map(_.convertTo[List[String]])
}
}
// http://docs.couchdb.org/en/1.6.1/api/database/common.html#put--db
def createDb(): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), baseHeaders))
+ requestJson[JsObject](mkRequest(HttpMethods.PUT, uri(db), headers = baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/database/common.html#delete--db
def deleteDb(): Future[Either[StatusCode, JsObject]] =
- requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), baseHeaders))
+ requestJson[JsObject](mkRequest(HttpMethods.DELETE, uri(db), headers = baseHeaders))
// http://docs.couchdb.org/en/1.6.1/api/database/bulk-api.html#get--db-_all_docs
def getAllDocs(skip: Option[Int] = None,
@@ -75,6 +78,6 @@ class ExtendedCouchDbRestClient(protocol: String,
.toMap
val url = uri(db, "_all_docs").withQuery(Uri.Query(argMap))
- requestJson[JsObject](mkRequest(HttpMethods.GET, url, baseHeaders))
+ requestJson[JsObject](mkRequest(HttpMethods.GET, url, headers = baseHeaders))
}
}
diff --git a/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala b/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala
new file mode 100644
index 0000000..7213e4a
--- /dev/null
+++ b/tests/src/test/scala/whisk/http/PoolingRestClientTests.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.http
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpecLike, Matchers}
+import org.scalatest.concurrent.ScalaFutures
+import org.scalatest.junit.JUnitRunner
+
+import akka.NotUsed
+import akka.actor.ActorSystem
+import akka.stream.scaladsl.Flow
+import akka.stream.ActorMaterializer
+import akka.testkit.TestKit
+import akka.http.scaladsl.model._
+import akka.http.scaladsl.model.HttpMethods.{GET, POST}
+import akka.http.scaladsl.model.StatusCodes.{InternalServerError, NotFound}
+import akka.http.scaladsl.model.headers.RawHeader
+import akka.http.scaladsl.unmarshalling.Unmarshaller.UnsupportedContentTypeException
+
+import common.StreamLogging
+
+import spray.json.JsObject
+import spray.json.DefaultJsonProtocol._
+
+import scala.concurrent.duration._
+import scala.concurrent.{Await, ExecutionContext, Future, Promise, TimeoutException}
+import scala.util.{Success, Try}
+
+import whisk.http.PoolingRestClient._
+
+@RunWith(classOf[JUnitRunner])
+class PoolingRestClientTests
+ extends TestKit(ActorSystem("PoolingRestClientTests"))
+ with FlatSpecLike
+ with Matchers
+ with ScalaFutures
+ with StreamLogging {
+ implicit val ec: ExecutionContext = system.dispatcher
+ implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+ def testFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+ : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+ Flow[(HttpRequest, Promise[HttpResponse])]
+ .mapAsyncUnordered(1) {
+ case (request, userContext) =>
+ request shouldBe httpRequest
+ Future.successful((Success(httpResponse), userContext))
+ }
+
+ def failFlow(httpResponse: HttpResponse = HttpResponse(), httpRequest: HttpRequest = HttpRequest())
+ : Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), NotUsed] =
+ Flow[(HttpRequest, Promise[HttpResponse])]
+ .mapAsyncUnordered(1) {
+ case (request, userContext) =>
+ Future.failed(new Exception)
+ }
+
+ def await[T](awaitable: Future[T], timeout: FiniteDuration = 10.seconds) = Await.result(awaitable, timeout)
+
+ behavior of "Pooling REST Client"
+
+ it should "error when configuration protocol is invalid" in {
+ a[IllegalArgumentException] should be thrownBy new PoolingRestClient("invalid", "host", 443, 1)
+ }
+
+ it should "get a non-200 status code when performing a request" in {
+ val httpResponse = HttpResponse(InternalServerError)
+ val httpRequest = HttpRequest()
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse)))
+
+ await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+ }
+
+ it should "return payload from a request" in {
+ val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val httpRequest = HttpRequest()
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse)))
+
+ await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+ }
+
+ it should "send headers when making a request" in {
+ val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val httpRequest = HttpRequest(headers = List(RawHeader("key", "value")))
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+ await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+ }
+
+ it should "send uri when making a request" in {
+ val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val httpRequest = HttpRequest(uri = Uri("/some/where"))
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+ await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+ }
+
+ it should "send a payload when making a request" in {
+ val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val httpRequest = HttpRequest(POST, entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "payload"))
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+
+ await(poolingRestClient.request(Future.successful(httpRequest))) shouldBe httpResponse
+ }
+
+ it should "return JSON when making a request" in {
+ val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+ val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+ await(poolingRestClient.requestJson[JsObject](request)) shouldBe Right(JsObject())
+ }
+
+ it should "throw timeout exception when Future fails in httpFlow" in {
+ val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(failFlow(httpResponse, httpRequest)))
+ val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+ a[TimeoutException] should be thrownBy await(poolingRestClient.requestJson[JsObject](request))
+ }
+
+ it should "return a status code on request failure" in {
+ val httpResponse = HttpResponse(NotFound)
+ val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+ val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+ await(poolingRestClient.requestJson[JsObject](request)) shouldBe Left(NotFound)
+ }
+
+ it should "throw an unsupported content-type exception when unexpected content-type is returned" in {
+ val httpResponse = HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "plain text"))
+ val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val poolingRestClient = new PoolingRestClient("https", "host", 443, 1, Some(testFlow(httpResponse, httpRequest)))
+ val request = mkJsonRequest(GET, Uri./, JsObject(), List.empty)
+
+ a[UnsupportedContentTypeException] should be thrownBy await(poolingRestClient.requestJson[JsObject](request))
+ }
+
+ it should "create an HttpRequest without a payload" in {
+ val httpRequest = HttpRequest()
+
+ await(mkRequest(GET, Uri./)) shouldBe httpRequest
+ }
+
+ it should "create an HttpRequest with a JSON payload" in {
+ val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+
+ await(mkJsonRequest(GET, Uri./, JsObject(), List.empty)) shouldBe httpRequest
+ }
+
+ it should "create an HttpRequest with a payload" in {
+ val httpRequest = HttpRequest(entity = HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint))
+ val request = mkRequest(
+ GET,
+ Uri./,
+ Future.successful(HttpEntity(ContentTypes.`application/json`, JsObject().compactPrint)),
+ List.empty)
+
+ await(request) shouldBe httpRequest
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.