You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by se...@apache.org on 2023/01/05 03:08:28 UTC
[incubator-pekko-connectors] 01/01: update pf
This is an automated email from the ASF dual-hosted git repository.
seanglover pushed a commit to branch seglo/update-gcs-mat
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
commit 85f9e8a852ef5a2d68f044812e29ec9b2f5757e9
Author: Sean Glover <se...@seanglover.com>
AuthorDate: Wed Jan 4 22:07:25 2023 -0500
update pf
---
.../googlecloud/storage/impl/GCStorageStream.scala | 26 +++++++++++++---------
1 file changed, 15 insertions(+), 11 deletions(-)
diff --git a/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala b/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
index c17861a2..6cfe79bb 100644
--- a/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
+++ b/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
@@ -146,17 +146,21 @@ import scala.concurrent.Future
metadata.fold(HttpEntity.Empty)(m => HttpEntity(ContentTypes.`application/json`, m.toJson.toString))
val request = HttpRequest(POST, uri, headers, entity)
- implicit val um: Unmarshaller[HttpResponse, StorageObject] = Unmarshaller.withMaterializer {
- implicit ec => implicit mat =>
- {
- case HttpResponse(status, _, entity, _) if status.isSuccess() =>
- Unmarshal(entity).to[StorageObject]
- case HttpResponse(status, _, entity, _) =>
- Unmarshal(entity).to[String].flatMap { errorString =>
- Future.failed(new RuntimeException(s"Uploading part failed with status $status: $errorString"))
- }
- }: PartialFunction[HttpResponse, Future[StorageObject]]
- }.withDefaultRetry
+ implicit val um: Unmarshaller[HttpResponse, StorageObject] =
+ Unmarshaller
+ .withMaterializer[HttpResponse, StorageObject] { implicit ec => implicit mat => response =>
+ {
+ val status = response.status
+ val entity = response.entity
+ if (status.isSuccess())
+ Unmarshal(entity).to[StorageObject]
+ else
+ Unmarshal(entity).to[String].flatMap[StorageObject] { errorString =>
+ Future.failed(new RuntimeException(s"Uploading part failed with status $status: $errorString"))
+ }
+ }
+ }
+ .withDefaultRetry
ResumableUpload[StorageObject](request).addAttributes(GoogleAttributes.settings(settings))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org