You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by md...@apache.org on 2023/01/27 08:16:30 UTC

[incubator-pekko-connectors] 02/02: Revert GCS Unmarshaller PartialFunction

This is an automated email from the ASF dual-hosted git repository.

mdedetrich pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git

commit 90e23ede54243eb0b8e5009d6e87446c6b367d9b
Author: Matthew de Detrich <ma...@aiven.io>
AuthorDate: Fri Jan 27 07:21:30 2023 +0100

    Revert GCS Unmarshaller PartialFunction
---
 .../googlecloud/storage/impl/GCStorageStream.scala | 26 +++++++++-------------
 1 file changed, 11 insertions(+), 15 deletions(-)

diff --git a/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala b/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
index 4c412a59..60c18bda 100644
--- a/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
+++ b/google-cloud-storage/src/main/scala/akka/stream/alpakka/googlecloud/storage/impl/GCStorageStream.scala
@@ -146,21 +146,17 @@ import scala.concurrent.Future
           metadata.fold(HttpEntity.Empty)(m => HttpEntity(ContentTypes.`application/json`, m.toJson.toString))
         val request = HttpRequest(POST, uri, headers, entity)
 
-        implicit val um: Unmarshaller[HttpResponse, StorageObject] =
-          Unmarshaller
-            .withMaterializer[HttpResponse, StorageObject] { implicit ec => implicit mat => response =>
-              {
-                val status = response.status
-                val entity = response.entity
-                if (status.isSuccess())
-                  Unmarshal(entity).to[StorageObject]
-                else
-                  Unmarshal(entity).to[String].flatMap[StorageObject] { errorString =>
-                    Future.failed(new RuntimeException(s"Uploading part failed with status $status: $errorString"))
-                  }
-              }
-            }
-            .withDefaultRetry
+        implicit val um: Unmarshaller[HttpResponse, StorageObject] = Unmarshaller.withMaterializer {
+          implicit ec => implicit mat =>
+            {
+              case HttpResponse(status, _, entity, _) if status.isSuccess() =>
+                Unmarshal(entity).to[StorageObject]
+              case HttpResponse(status, _, entity, _) =>
+                Unmarshal(entity).to[String].flatMap { errorString =>
+                  Future.failed(new RuntimeException(s"Uploading part failed with status $status: $errorString"))
+                }
+            }: PartialFunction[HttpResponse, Future[StorageObject]]
+        }.withDefaultRetry
 
         ResumableUpload[StorageObject](request).addAttributes(GoogleAttributes.settings(settings))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org