You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by he...@apache.org on 2023/03/23 11:31:19 UTC

[incubator-pekko-connectors] branch main updated: =s3 Clean up some code and make it compiles.

This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git


The following commit(s) were added to refs/heads/main by this push:
     new c7dd80e09 =s3 Clean up some code and make it compiles.
c7dd80e09 is described below

commit c7dd80e092b4cce29021f9c0990772bdcd7d67eb
Author: 虎鸣 <he...@alibaba-inc.com>
AuthorDate: Wed Mar 22 01:29:20 2023 +0800

    =s3 Clean up some code and make it compiles.
---
 .../pekko/stream/connectors/s3/impl/S3Stream.scala | 12 ++--
 .../stream/connectors/s3/impl/SplitAfterSize.scala | 65 +++++++++++-----------
 2 files changed, 39 insertions(+), 38 deletions(-)

diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 524422f7f..ec70ae8fb 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -1024,13 +1024,14 @@ import scala.util.{ Failure, Success, Try }
   /**
    * Initiates a multipart upload. Returns a source of the initiated upload with upload part indicess
    */
-  private def initiateUpload(s3Location: S3Location,
+  private def initiateUpload(
+      s3Location: S3Location,
       contentType: ContentType,
       s3Headers: immutable.Seq[HttpHeader]): Source[(MultipartUpload, Int), NotUsed] =
     Source
       .single(s3Location)
       .flatMapConcat(initiateMultipartUpload(_, contentType, s3Headers))
-      .flatMapConcat(r => Source.repeat(r))
+      .mapConcat(r => Iterator.continually(r))
       .zip(Source.fromIterator(() => Iterator.from(1)))
 
   private def poolSettings(implicit settings: S3Settings, system: ActorSystem) =
@@ -1274,15 +1275,12 @@ import scala.util.{ Failure, Success, Try }
   private def requestInfoOrUploadState(s3Location: S3Location,
       contentType: ContentType,
       s3Headers: S3Headers,
-      initialUploadState: Option[(String, Int)]) = {
+      initialUploadState: Option[(String, Int)]): Source[(MultipartUpload, Int), NotUsed] = {
     initialUploadState match {
       case Some((uploadId, initialIndex)) =>
         // We are resuming from a previously aborted Multipart upload so rather than creating a new MultipartUpload
         // resource we just need to set up the initial state
-        Source
-          .single(s3Location)
-          .flatMapConcat(_ => Source.single(MultipartUpload(s3Location.bucket, s3Location.key, uploadId)))
-          .flatMapConcat(r => Source.repeat(r))
+        Source.fromIterator(() => Iterator.continually(MultipartUpload(s3Location.bucket, s3Location.key, uploadId)))
           .zip(Source.fromIterator(() => Iterator.from(initialIndex)))
       case None =>
         // First step of the multi part upload process is made.
diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSize.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSize.scala
index 31d4782d1..5579a1fce 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSize.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSize.scala
@@ -38,43 +38,46 @@ import scala.annotation.tailrec
   def apply[I, M](minChunkSize: Int,
       maxChunkSize: Int)(in: Flow[I, ByteString, M]): SubFlow[ByteString, M, in.Repr, in.Closed] = {
     require(minChunkSize < maxChunkSize, "the min chunk size must be smaller than the max chunk size")
-    in.via(insertMarkers(minChunkSize, maxChunkSize)).splitWhen(_ == NewStream).collect { case bs: ByteString => bs }
+    in.via(insertMarkers(minChunkSize, maxChunkSize))
+      .splitWhen(_ == NewStream)
+      .collect { case bs: ByteString => bs }
   }
 
   private case object NewStream
 
-  private def insertMarkers(minChunkSize: Long, maxChunkSize: Int) = new GraphStage[FlowShape[ByteString, Any]] {
-    val in = Inlet[ByteString]("SplitAfterSize.in")
-    val out = Outlet[Any]("SplitAfterSize.out")
-    override val shape = FlowShape.of(in, out)
+  private def insertMarkers(minChunkSize: Long, maxChunkSize: Int): GraphStage[FlowShape[ByteString, Any]] =
+    new GraphStage[FlowShape[ByteString, Any]] {
+      val in = Inlet[ByteString]("SplitAfterSize.in")
+      val out = Outlet[Any]("SplitAfterSize.out")
+      override val shape = FlowShape.of(in, out)
 
-    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
-      new GraphStageLogic(shape) with OutHandler with InHandler {
-        var count: Int = 0
-        override def onPull(): Unit = pull(in)
+      override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+        new GraphStageLogic(shape) with OutHandler with InHandler {
+          var count: Int = 0
+          override def onPull(): Unit = pull(in)
 
-        override def onPush(): Unit = {
-          val elem = grab(in)
-          count += elem.size
-          if (count > maxChunkSize) {
-            splitElement(elem, elem.size - (count - maxChunkSize))
-          } else if (count >= minChunkSize) {
-            count = 0
-            emitMultiple(out, elem :: NewStream :: Nil)
-          } else emit(out, elem)
-        }
-
-        @tailrec private def splitElement(elem: ByteString, splitPos: Int): Unit =
-          if (elem.size > splitPos) {
-            val (part1, rest) = elem.splitAt(splitPos)
-            emitMultiple(out, part1 :: NewStream :: Nil)
-            splitElement(rest, maxChunkSize)
-          } else {
-            count = elem.size
-            emit(out, elem)
+          override def onPush(): Unit = {
+            val elem = grab(in)
+            count += elem.size
+            if (count > maxChunkSize) {
+              splitElement(elem, elem.size - (count - maxChunkSize))
+            } else if (count >= minChunkSize) {
+              count = 0
+              emitMultiple(out, elem :: NewStream :: Nil)
+            } else emit(out, elem)
           }
 
-        setHandlers(in, out, this)
-      }
-  }
+          @tailrec private def splitElement(elem: ByteString, splitPos: Int): Unit =
+            if (elem.size > splitPos) {
+              val (part1, rest) = elem.splitAt(splitPos)
+              emitMultiple(out, part1 :: NewStream :: Nil)
+              splitElement(rest, maxChunkSize)
+            } else {
+              count = elem.size
+              emit(out, elem)
+            }
+
+          setHandlers(in, out, this)
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org