You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pekko.apache.org by he...@apache.org on 2023/03/23 11:31:19 UTC
[incubator-pekko-connectors] branch main updated: =s3 Clean up some code and make it compiles.
This is an automated email from the ASF dual-hosted git repository.
hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-pekko-connectors.git
The following commit(s) were added to refs/heads/main by this push:
new c7dd80e09 =s3 Clean up some code and make it compiles.
c7dd80e09 is described below
commit c7dd80e092b4cce29021f9c0990772bdcd7d67eb
Author: 虎鸣 <he...@alibaba-inc.com>
AuthorDate: Wed Mar 22 01:29:20 2023 +0800
=s3 Clean up some code and make it compiles.
---
.../pekko/stream/connectors/s3/impl/S3Stream.scala | 12 ++--
.../stream/connectors/s3/impl/SplitAfterSize.scala | 65 +++++++++++-----------
2 files changed, 39 insertions(+), 38 deletions(-)
diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
index 524422f7f..ec70ae8fb 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/S3Stream.scala
@@ -1024,13 +1024,14 @@ import scala.util.{ Failure, Success, Try }
/**
* Initiates a multipart upload. Returns a source of the initiated upload with upload part indicess
*/
- private def initiateUpload(s3Location: S3Location,
+ private def initiateUpload(
+ s3Location: S3Location,
contentType: ContentType,
s3Headers: immutable.Seq[HttpHeader]): Source[(MultipartUpload, Int), NotUsed] =
Source
.single(s3Location)
.flatMapConcat(initiateMultipartUpload(_, contentType, s3Headers))
- .flatMapConcat(r => Source.repeat(r))
+ .mapConcat(r => Iterator.continually(r))
.zip(Source.fromIterator(() => Iterator.from(1)))
private def poolSettings(implicit settings: S3Settings, system: ActorSystem) =
@@ -1274,15 +1275,12 @@ import scala.util.{ Failure, Success, Try }
private def requestInfoOrUploadState(s3Location: S3Location,
contentType: ContentType,
s3Headers: S3Headers,
- initialUploadState: Option[(String, Int)]) = {
+ initialUploadState: Option[(String, Int)]): Source[(MultipartUpload, Int), NotUsed] = {
initialUploadState match {
case Some((uploadId, initialIndex)) =>
// We are resuming from a previously aborted Multipart upload so rather than creating a new MultipartUpload
// resource we just need to set up the initial state
- Source
- .single(s3Location)
- .flatMapConcat(_ => Source.single(MultipartUpload(s3Location.bucket, s3Location.key, uploadId)))
- .flatMapConcat(r => Source.repeat(r))
+ Source.fromIterator(() => Iterator.continually(MultipartUpload(s3Location.bucket, s3Location.key, uploadId)))
.zip(Source.fromIterator(() => Iterator.from(initialIndex)))
case None =>
// First step of the multi part upload process is made.
diff --git a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSize.scala b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSize.scala
index 31d4782d1..5579a1fce 100644
--- a/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSize.scala
+++ b/s3/src/main/scala/org/apache/pekko/stream/connectors/s3/impl/SplitAfterSize.scala
@@ -38,43 +38,46 @@ import scala.annotation.tailrec
def apply[I, M](minChunkSize: Int,
maxChunkSize: Int)(in: Flow[I, ByteString, M]): SubFlow[ByteString, M, in.Repr, in.Closed] = {
require(minChunkSize < maxChunkSize, "the min chunk size must be smaller than the max chunk size")
- in.via(insertMarkers(minChunkSize, maxChunkSize)).splitWhen(_ == NewStream).collect { case bs: ByteString => bs }
+ in.via(insertMarkers(minChunkSize, maxChunkSize))
+ .splitWhen(_ == NewStream)
+ .collect { case bs: ByteString => bs }
}
private case object NewStream
- private def insertMarkers(minChunkSize: Long, maxChunkSize: Int) = new GraphStage[FlowShape[ByteString, Any]] {
- val in = Inlet[ByteString]("SplitAfterSize.in")
- val out = Outlet[Any]("SplitAfterSize.out")
- override val shape = FlowShape.of(in, out)
+ private def insertMarkers(minChunkSize: Long, maxChunkSize: Int): GraphStage[FlowShape[ByteString, Any]] =
+ new GraphStage[FlowShape[ByteString, Any]] {
+ val in = Inlet[ByteString]("SplitAfterSize.in")
+ val out = Outlet[Any]("SplitAfterSize.out")
+ override val shape = FlowShape.of(in, out)
- override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
- new GraphStageLogic(shape) with OutHandler with InHandler {
- var count: Int = 0
- override def onPull(): Unit = pull(in)
+ override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+ new GraphStageLogic(shape) with OutHandler with InHandler {
+ var count: Int = 0
+ override def onPull(): Unit = pull(in)
- override def onPush(): Unit = {
- val elem = grab(in)
- count += elem.size
- if (count > maxChunkSize) {
- splitElement(elem, elem.size - (count - maxChunkSize))
- } else if (count >= minChunkSize) {
- count = 0
- emitMultiple(out, elem :: NewStream :: Nil)
- } else emit(out, elem)
- }
-
- @tailrec private def splitElement(elem: ByteString, splitPos: Int): Unit =
- if (elem.size > splitPos) {
- val (part1, rest) = elem.splitAt(splitPos)
- emitMultiple(out, part1 :: NewStream :: Nil)
- splitElement(rest, maxChunkSize)
- } else {
- count = elem.size
- emit(out, elem)
+ override def onPush(): Unit = {
+ val elem = grab(in)
+ count += elem.size
+ if (count > maxChunkSize) {
+ splitElement(elem, elem.size - (count - maxChunkSize))
+ } else if (count >= minChunkSize) {
+ count = 0
+ emitMultiple(out, elem :: NewStream :: Nil)
+ } else emit(out, elem)
}
- setHandlers(in, out, this)
- }
- }
+ @tailrec private def splitElement(elem: ByteString, splitPos: Int): Unit =
+ if (elem.size > splitPos) {
+ val (part1, rest) = elem.splitAt(splitPos)
+ emitMultiple(out, part1 :: NewStream :: Nil)
+ splitElement(rest, maxChunkSize)
+ } else {
+ count = elem.size
+ emit(out, elem)
+ }
+
+ setHandlers(in, out, this)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pekko.apache.org
For additional commands, e-mail: commits-help@pekko.apache.org