You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by md...@apache.org on 2023/01/27 08:16:30 UTC
[incubator-pekko-connectors] 02/02: Revert GCS Unmarshaller PartialFunction
This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
commit 90e23ede54243eb0b8e5009d6e87446c6b367d9b
Author: Matthew de Detrich <ma...@aiven.io>
AuthorDate: Fri Jan 27 07:21:30 2023 +0100
Revert GCS Unmarshaller PartialFunction
---
.../googlecloud/storage/impl/GCStorageStream.scala | 26 +++++++++-------------
1 file changed, 11 insertions(+), 15 deletions(-)
diff --git a/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala b/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
index 4c412a59..60c18bda 100644
--- a/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
+++ b/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
@@ -146,21 +146,17 @@ import scala.concurrent.Future
metadata.fold(HttpEntity.Empty)(m => HttpEntity(ContentTypes.`application/json`, m.toJson.toString))
val request = HttpRequest(POST, uri, headers, entity)
- implicit val um: Unmarshaller[HttpResponse, StorageObject] =
- Unmarshaller
- .withMaterializer[HttpResponse, StorageObject] { implicit ec => implicit mat => response =>
- {
- val status = response.status
- val entity = response.entity
- if (status.isSuccess())
- Unmarshal(entity).to[StorageObject]
- else
- Unmarshal(entity).to[String].flatMap[StorageObject] { errorString =>
- Future.failed(new RuntimeException(s"Uploading part failed with status $status: $errorString"))
- }
- }
- }
- .withDefaultRetry
+ implicit val um: Unmarshaller[HttpResponse, StorageObject] = Unmarshaller.withMaterializer {
+ implicit ec => implicit mat =>
+ {
+ case HttpResponse(status, _, entity, _) if status.isSuccess() =>
+ Unmarshal(entity).to[StorageObject]
+ case HttpResponse(status, _, entity, _) =>
+ Unmarshal(entity).to[String].flatMap { errorString =>
+ Future.failed(new RuntimeException(s"Uploading part failed with status $status: $errorString"))
+ }
+ }: PartialFunction[HttpResponse, Future[StorageObject]]
+ }.withDefaultRetry
ResumableUpload[StorageObject](request).addAttributes(GoogleAttributes.settings(settings))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org