You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "vicennial (via GitHub)" <gi...@apache.org> on 2023/03/02 16:49:47 UTC

[GitHub] [spark] vicennial opened a new pull request, #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

vicennial opened a new pull request, #40256:
URL: https://github.com/apache/spark/pull/40256

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR introduces a mechanism to transfer artifacts (currently, local `.jar` + `.class` files) from a Spark Connect JVM/Scala client over to the server side of Spark Connect. The mechanism follows the protocol as defined in https://github.com/apache/spark/pull/40147 and supports batching (for multiple "small" artifacts) and chunking (for large artifacts).
   
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   In the decoupled client-server architecture of Spark Connect, a remote client may use a local JAR or a new class in their UDF that may not be present on the server. To handle these cases of missing "artifacts", we implement a mechanism to transfer artifacts from the client side over to the server side as per the protocol defined in https://github.com/apache/spark/pull/40147.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes, users would be able to use the `addArtifact` and `addArtifacts` methods (via a `SparkSession` instance) to transfer local files (`.jar` and `.class` extensions).
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Unit tests - located in `ArtifactSuite`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on PR #40256:
URL: https://github.com/apache/spark/pull/40256#issuecomment-1452899534

   Overall looks good. Thank you! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1124392305


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.TimeUnit
+
+import collection.JavaConverters._
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsRequest
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+
+class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
+
+  private var client: SparkConnectClient = _
+  private var service: DummySparkConnectService = _
+  private var server: Server = _
+  private var artifactManager: ArtifactManager = _
+  private var channel: ManagedChannel = _
+
+  private def startDummyServer(): Unit = {
+    service = new DummySparkConnectService()
+    server = InProcessServerBuilder
+      .forName(getClass.getName)
+      .addService(service)
+      .build()
+    server.start()
+  }
+
+  private def createArtifactManager(): Unit = {
+    channel = InProcessChannelBuilder.forName(getClass.getName).directExecutor().build()
+    artifactManager = new ArtifactManager(proto.UserContext.newBuilder().build(), channel)
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    startDummyServer()
+    createArtifactManager()
+    client = null
+  }
+
+  override def afterEach(): Unit = {
+    if (server != null) {
+      server.shutdownNow()
+      assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to shutdown")
+    }
+
+    if (channel != null) {
+      channel.shutdownNow()
+    }
+
+    if (client != null) {
+      client.shutdown()
+    }
+  }
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+  protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests")
+  protected def artifactCrcPath: Path = artifactFilePath.resolve("crc")
+
+  private def getCrcValues(filePath: Path): Seq[Long] = {
+    val fileName = filePath.getFileName.toString
+    val crcFileName = fileName.split('.').head + ".txt"
+    Files
+      .readAllLines(artifactCrcPath.resolve(crcFileName))
+      .asScala
+      .map(_.toLong)

Review Comment:
   Scala 2.13 build is broken by this. I made a quick followup https://github.com/apache/spark/pull/40267



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123572526


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -287,6 +288,28 @@ class SparkSession private[sql] (
     client.execute(plan).asScala.foreach(_ => ())
   }
 
+  /**
+   * Add a single artifact to the client session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = client.addArtifact(path)

Review Comment:
   Can you mark these as experimental?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123897616


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsRequest
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+
+class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
+
+  private var client: SparkConnectClient = _
+  private var service: DummySparkConnectService = _
+  private var server: Server = _
+  private var artifactManager: ArtifactManager = _
+  private var channel: ManagedChannel = _
+
+  private def startDummyServer(): Unit = {
+    service = new DummySparkConnectService()
+    server = InProcessServerBuilder
+      .forName(getClass.getName)
+      .addService(service)
+      .build()
+    server.start()
+  }
+
+  private def createArtifactManager(): Unit = {
+    channel = InProcessChannelBuilder.forName(getClass.getName).directExecutor().build()
+    artifactManager = new ArtifactManager(proto.UserContext.newBuilder().build(), channel)
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    startDummyServer()
+    createArtifactManager()
+    client = null
+  }
+
+  override def afterEach(): Unit = {
+    if (server != null) {
+      server.shutdownNow()
+      assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to shutdown")
+    }
+
+    if (channel != null) {
+      channel.shutdownNow()
+    }
+
+    if (client != null) {
+      client.shutdown()
+    }
+  }
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+  protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests")
+
+  /**
+   * Check if the data sent to the server (stored in `artifactChunk`) is equivalent to the local
+   * data at `localPath`.
+   * @param artifactChunk
+   * @param localPath
+   */
+  private def assertFileDataEquality(
+      artifactChunk: AddArtifactsRequest.ArtifactChunk,
+      localPath: Path): Unit = {
+    val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32)
+    val localData = ByteString.readFrom(in)
+    assert(artifactChunk.getData == localData)
+    assert(artifactChunk.getCrc == in.getChecksum.getValue)
+  }
+
+  private def singleChunkArtifactTest(path: String): Unit = {
+    test(s"Single Chunk Artifact - $path") {
+      val artifactPath = artifactFilePath.resolve(path)
+      artifactManager.addArtifact(artifactPath.toString)
+
+      val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+      // Single `AddArtifactRequest`
+      assert(receivedRequests.size == 1)
+
+      val request = receivedRequests.head
+      assert(request.hasBatch)
+
+      val batch = request.getBatch
+      // Single artifact in batch
+      assert(batch.getArtifactsList.size() == 1)
+
+      val singleChunkArtifact = batch.getArtifacts(0)
+      val namePrefix = artifactPath.getFileName.toString match {
+        case jar if jar.endsWith(".jar") => "jars"
+        case cf if cf.endsWith(".class") => "classes"
+      }
+      assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path))
+      assertFileDataEquality(singleChunkArtifact.getData, artifactPath)
+    }
+  }
+
+  singleChunkArtifactTest("smallClassFile.class")
+
+  singleChunkArtifactTest("smallJar.jar")
+
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  /**
+   * Read data in a chunk of `CHUNK_SIZE` bytes from `in` and verify equality with server-side
+   * data stored in `chunk`.
+   * @param in
+   * @param chunk
+   * @return
+   */
+  private def checkChunkDataAndCrc(

Review Comment:
   Added truth/golden files 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123591424


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO: Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)
+
+        builder.getBatchBuilder
+          .addArtifactsBuilder()
+          .setName(artifact.path.toString)
+          .setData(data)
+          .build()
+      } catch {
+        case NonFatal(e) =>
+          stream.onError(e)
+          throw e
+      } finally {
+        in.close()
+      }
+    }
+    stream.onNext(builder.build())
+  }
+
+  /**
+   * Read data from an [[InputStream]] in pieces of `chunkSize` bytes and convert to
+   * protobuf-compatible [[ByteString]].
+   * @param in
+   * @return
+   */
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  /**
+   * Add a artifact in chunks to the stream. The artifact's data is spread out over multiple
+   * [[proto.AddArtifactsRequest requests]].
+   */
+  private def addChunkedArtifact(
+      artifact: Artifact,
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+
+    val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+    try {
+      // First RPC contains the `BeginChunkedArtifact` payload (`begin_chunk`).
+      // Subsequent RPCs contains the `ArtifactChunk` payload (`chunk`).
+      val artifactChunkBuilder = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
+      var dataChunk = readNextChunk(in)
+      // Integer division that rounds up to the nearest whole number.
+      def getNumChunks(size: Long): Long = (size + (CHUNK_SIZE - 1)) / CHUNK_SIZE
+
+      builder.getBeginChunkBuilder
+        .setName(artifact.path.toString)
+        .setTotalBytes(artifact.size)
+        .setNumChunks(getNumChunks(artifact.size))
+        .setInitialChunk(
+          artifactChunkBuilder
+            .setData(dataChunk)
+            .setCrc(in.getChecksum.getValue))
+      stream.onNext(builder.build())
+      builder.clearBeginChunk()
+
+      dataChunk = readNextChunk(in)
+      // Consume stream in chunks until there is no data left to read.
+      while (!dataChunk.isEmpty) {
+        artifactChunkBuilder.setData(dataChunk).setCrc(in.getChecksum.getValue)
+        builder.setChunk(artifactChunkBuilder.build())
+        stream.onNext(builder.build())
+
+        builder.clearChunk()
+        dataChunk = readNextChunk(in)
+      }
+    } catch {
+      case NonFatal(e) =>
+        stream.onError(e)
+        throw e
+    } finally {
+      in.close()
+    }
+  }
+}
+
+trait ClassFinder {
+  def findClasses(): Iterator[Artifact]
+}
+
+class Artifact private (val path: Path, val storage: Storage) {
+  require(!path.isAbsolute, s"Bad path: $path")
+
+  lazy val size: Long = storage match {
+    case localData: LocalData => localData.size
+  }
+}
+
+object Artifact {
+  val CLASS_PREFIX: Path = Paths.get("classes")
+  val JAR_PREFIX: Path = Paths.get("jars")
+
+  def newJarArtifact(fileName: Path, storage: Storage): Artifact = {
+    newArtifact(JAR_PREFIX, ".jar", fileName, storage)
+  }
+
+  def newClassArtifact(fileName: Path, storage: Storage): Artifact = {
+    newArtifact(CLASS_PREFIX, ".class", fileName, storage)
+  }
+
+  private def newArtifact(
+      prefix: Path,
+      requiredSuffix: String,
+      fileName: Path,
+      storage: Storage): Artifact = {
+    require(!fileName.isAbsolute)
+    require(fileName.toString.endsWith(requiredSuffix))
+    new Artifact(prefix.resolve(fileName), storage)
+  }
+
+  /**
+   * A pointer to the stored bytes of an artifact.
+   */
+  sealed trait Storage
+
+  /**
+   * Payload stored on this machine.
+   */
+  sealed trait LocalData extends Storage {

Review Comment:
   I think we can flatten this hierarchy for now. There is no other data than local data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1199853838


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO(SPARK-42658): Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)
+
+        builder.getBatchBuilder
+          .addArtifactsBuilder()
+          .setName(artifact.path.toString)
+          .setData(data)
+          .build()
+      } catch {
+        case NonFatal(e) =>
+          stream.onError(e)
+          throw e
+      } finally {
+        in.close()
+      }
+    }
+    stream.onNext(builder.build())
+  }
+
+  /**
+   * Read data from an [[InputStream]] in pieces of `chunkSize` bytes and convert to
+   * protobuf-compatible [[ByteString]].
+   * @param in
+   * @return
+   */
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {

Review Comment:
   qq: why do we need this `while` loop? Seems like:
   
   ```scala
   count = in.read(buf, 0, CHUNK_SIZE)
   if (count == 0) ByteString.empty()
   else ByteString.copyFrom(buf, 0, count)
   ```
   
   would be good enough because `read` is blocked until it meets EOF IIRC.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123590310


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO: Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)
+
+        builder.getBatchBuilder
+          .addArtifactsBuilder()
+          .setName(artifact.path.toString)
+          .setData(data)
+          .build()
+      } catch {
+        case NonFatal(e) =>
+          stream.onError(e)
+          throw e
+      } finally {
+        in.close()
+      }
+    }
+    stream.onNext(builder.build())
+  }
+
+  /**
+   * Read data from an [[InputStream]] in pieces of `chunkSize` bytes and convert to
+   * protobuf-compatible [[ByteString]].
+   * @param in
+   * @return
+   */
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  /**
+   * Add a artifact in chunks to the stream. The artifact's data is spread out over multiple
+   * [[proto.AddArtifactsRequest requests]].
+   */
+  private def addChunkedArtifact(
+      artifact: Artifact,
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+
+    val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+    try {
+      // First RPC contains the `BeginChunkedArtifact` payload (`begin_chunk`).
+      // Subsequent RPCs contains the `ArtifactChunk` payload (`chunk`).
+      val artifactChunkBuilder = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
+      var dataChunk = readNextChunk(in)
+      // Integer division that rounds up to the nearest whole number.
+      def getNumChunks(size: Long): Long = (size + (CHUNK_SIZE - 1)) / CHUNK_SIZE
+
+      builder.getBeginChunkBuilder
+        .setName(artifact.path.toString)
+        .setTotalBytes(artifact.size)
+        .setNumChunks(getNumChunks(artifact.size))
+        .setInitialChunk(
+          artifactChunkBuilder
+            .setData(dataChunk)
+            .setCrc(in.getChecksum.getValue))
+      stream.onNext(builder.build())
+      builder.clearBeginChunk()
+
+      dataChunk = readNextChunk(in)
+      // Consume stream in chunks until there is no data left to read.
+      while (!dataChunk.isEmpty) {
+        artifactChunkBuilder.setData(dataChunk).setCrc(in.getChecksum.getValue)
+        builder.setChunk(artifactChunkBuilder.build())
+        stream.onNext(builder.build())
+
+        builder.clearChunk()
+        dataChunk = readNextChunk(in)
+      }
+    } catch {
+      case NonFatal(e) =>
+        stream.onError(e)
+        throw e
+    } finally {
+      in.close()
+    }
+  }
+}
+
+trait ClassFinder {
+  def findClasses(): Iterator[Artifact]

Review Comment:
   We should document this a bit better. For example is this method returning all REPL generated classes, or only the new ones?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1124398384


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala:
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.TimeUnit
+
+import collection.JavaConverters._
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsRequest
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+
+class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
+
+  private var client: SparkConnectClient = _
+  private var service: DummySparkConnectService = _
+  private var server: Server = _
+  private var artifactManager: ArtifactManager = _
+  private var channel: ManagedChannel = _
+
+  private def startDummyServer(): Unit = {
+    service = new DummySparkConnectService()
+    server = InProcessServerBuilder
+      .forName(getClass.getName)
+      .addService(service)
+      .build()
+    server.start()
+  }
+
+  private def createArtifactManager(): Unit = {
+    channel = InProcessChannelBuilder.forName(getClass.getName).directExecutor().build()
+    artifactManager = new ArtifactManager(proto.UserContext.newBuilder().build(), channel)
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    startDummyServer()
+    createArtifactManager()
+    client = null
+  }
+
+  override def afterEach(): Unit = {
+    if (server != null) {
+      server.shutdownNow()
+      assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to shutdown")
+    }
+
+    if (channel != null) {
+      channel.shutdownNow()
+    }
+
+    if (client != null) {
+      client.shutdown()
+    }
+  }
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+  protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests")
+  protected def artifactCrcPath: Path = artifactFilePath.resolve("crc")
+
+  private def getCrcValues(filePath: Path): Seq[Long] = {
+    val fileName = filePath.getFileName.toString
+    val crcFileName = fileName.split('.').head + ".txt"
+    Files
+      .readAllLines(artifactCrcPath.resolve(crcFileName))
+      .asScala
+      .map(_.toLong)

Review Comment:
   Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123833306


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)

Review Comment:
   I agree, we need a timeout policy. Handling this as part of https://issues.apache.org/jira/browse/SPARK-42658 (along with retry policy)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123965938


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO(SPARK-42658): Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)

Review Comment:
   I am not an expert on networking so just a question for my self education:
   
   so the gRPC level bytes transmission is not 100% reliable so we need another CRC to check that?    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell closed pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell closed pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server
URL: https://github.com/apache/spark/pull/40256


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123588842


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)

Review Comment:
   I am a bit on the fence about this one. This is fine for now, but in a not so far away future we shouldn't block indefinitely.



##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO: Handle responses containing CRC failures.

Review Comment:
   File a ticket please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123987429


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO(SPARK-42658): Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)

Review Comment:
   Sounds good. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123821569


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO: Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)
+
+        builder.getBatchBuilder
+          .addArtifactsBuilder()
+          .setName(artifact.path.toString)
+          .setData(data)
+          .build()
+      } catch {
+        case NonFatal(e) =>
+          stream.onError(e)
+          throw e
+      } finally {
+        in.close()
+      }
+    }
+    stream.onNext(builder.build())
+  }
+
+  /**
+   * Read data from an [[InputStream]] in pieces of `chunkSize` bytes and convert to
+   * protobuf-compatible [[ByteString]].
+   * @param in
+   * @return
+   */
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  /**
+   * Add a artifact in chunks to the stream. The artifact's data is spread out over multiple
+   * [[proto.AddArtifactsRequest requests]].
+   */
+  private def addChunkedArtifact(
+      artifact: Artifact,
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+
+    val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+    try {
+      // First RPC contains the `BeginChunkedArtifact` payload (`begin_chunk`).
+      // Subsequent RPCs contains the `ArtifactChunk` payload (`chunk`).
+      val artifactChunkBuilder = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
+      var dataChunk = readNextChunk(in)
+      // Integer division that rounds up to the nearest whole number.
+      def getNumChunks(size: Long): Long = (size + (CHUNK_SIZE - 1)) / CHUNK_SIZE
+
+      builder.getBeginChunkBuilder
+        .setName(artifact.path.toString)
+        .setTotalBytes(artifact.size)
+        .setNumChunks(getNumChunks(artifact.size))
+        .setInitialChunk(
+          artifactChunkBuilder
+            .setData(dataChunk)
+            .setCrc(in.getChecksum.getValue))
+      stream.onNext(builder.build())
+      builder.clearBeginChunk()
+
+      dataChunk = readNextChunk(in)
+      // Consume stream in chunks until there is no data left to read.
+      while (!dataChunk.isEmpty) {
+        artifactChunkBuilder.setData(dataChunk).setCrc(in.getChecksum.getValue)
+        builder.setChunk(artifactChunkBuilder.build())
+        stream.onNext(builder.build())
+
+        builder.clearChunk()
+        dataChunk = readNextChunk(in)
+      }
+    } catch {
+      case NonFatal(e) =>
+        stream.onError(e)
+        throw e
+    } finally {
+      in.close()
+    }
+  }
+}
+
+trait ClassFinder {
+  def findClasses(): Iterator[Artifact]

Review Comment:
   👍 Adding this as part of https://issues.apache.org/jira/browse/SPARK-42657



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123986940


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO(SPARK-42658): Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)

Review Comment:
   I am not sure about grpc's guarantees. However I have seen network transfers go wrong, and then checksums are your friend.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1199852797


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO(SPARK-42658): Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)
+
+        builder.getBatchBuilder
+          .addArtifactsBuilder()
+          .setName(artifact.path.toString)
+          .setData(data)
+          .build()
+      } catch {
+        case NonFatal(e) =>
+          stream.onError(e)
+          throw e
+      } finally {
+        in.close()
+      }
+    }
+    stream.onNext(builder.build())
+  }
+
+  /**
+   * Read data from an [[InputStream]] in pieces of `chunkSize` bytes and convert to
+   * protobuf-compatible [[ByteString]].
+   * @param in
+   * @return
+   */
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {

Review Comment:
   quick question. Why do we need this `while` loop? Can't we just read it via:
   
   ```scala
   count = in.read(buf, 0, CHUNK_SIZE)
   if (count == 0) ByteString.empty()
   else ByteString.copyFrom(buf, 0, count)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123590597


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO: Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)
+
+        builder.getBatchBuilder
+          .addArtifactsBuilder()
+          .setName(artifact.path.toString)
+          .setData(data)
+          .build()
+      } catch {
+        case NonFatal(e) =>
+          stream.onError(e)
+          throw e
+      } finally {
+        in.close()
+      }
+    }
+    stream.onNext(builder.build())
+  }
+
+  /**
+   * Read data from an [[InputStream]] in pieces of `chunkSize` bytes and convert to
+   * protobuf-compatible [[ByteString]].
+   * @param in
+   * @return
+   */
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  /**
+   * Add a artifact in chunks to the stream. The artifact's data is spread out over multiple
+   * [[proto.AddArtifactsRequest requests]].
+   */
+  private def addChunkedArtifact(
+      artifact: Artifact,
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+
+    val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+    try {
+      // First RPC contains the `BeginChunkedArtifact` payload (`begin_chunk`).
+      // Subsequent RPCs contains the `ArtifactChunk` payload (`chunk`).
+      val artifactChunkBuilder = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
+      var dataChunk = readNextChunk(in)
+      // Integer division that rounds up to the nearest whole number.
+      def getNumChunks(size: Long): Long = (size + (CHUNK_SIZE - 1)) / CHUNK_SIZE
+
+      builder.getBeginChunkBuilder
+        .setName(artifact.path.toString)
+        .setTotalBytes(artifact.size)
+        .setNumChunks(getNumChunks(artifact.size))
+        .setInitialChunk(
+          artifactChunkBuilder
+            .setData(dataChunk)
+            .setCrc(in.getChecksum.getValue))
+      stream.onNext(builder.build())
+      builder.clearBeginChunk()
+
+      dataChunk = readNextChunk(in)
+      // Consume stream in chunks until there is no data left to read.
+      while (!dataChunk.isEmpty) {
+        artifactChunkBuilder.setData(dataChunk).setCrc(in.getChecksum.getValue)
+        builder.setChunk(artifactChunkBuilder.build())
+        stream.onNext(builder.build())
+
+        builder.clearChunk()
+        dataChunk = readNextChunk(in)
+      }
+    } catch {
+      case NonFatal(e) =>
+        stream.onError(e)
+        throw e
+    } finally {
+      in.close()
+    }
+  }
+}
+
+trait ClassFinder {

Review Comment:
   Move it to its own source file?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hvanhovell commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123642955


##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/ArtifactSuite.scala:
##########
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.TimeUnit
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import com.google.protobuf.ByteString
+import io.grpc.{ManagedChannel, Server}
+import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
+import org.scalatest.BeforeAndAfterEach
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsRequest
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+
+class ArtifactSuite extends ConnectFunSuite with BeforeAndAfterEach {
+
+  private var client: SparkConnectClient = _
+  private var service: DummySparkConnectService = _
+  private var server: Server = _
+  private var artifactManager: ArtifactManager = _
+  private var channel: ManagedChannel = _
+
+  private def startDummyServer(): Unit = {
+    service = new DummySparkConnectService()
+    server = InProcessServerBuilder
+      .forName(getClass.getName)
+      .addService(service)
+      .build()
+    server.start()
+  }
+
+  private def createArtifactManager(): Unit = {
+    channel = InProcessChannelBuilder.forName(getClass.getName).directExecutor().build()
+    artifactManager = new ArtifactManager(proto.UserContext.newBuilder().build(), channel)
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    startDummyServer()
+    createArtifactManager()
+    client = null
+  }
+
+  override def afterEach(): Unit = {
+    if (server != null) {
+      server.shutdownNow()
+      assert(server.awaitTermination(5, TimeUnit.SECONDS), "server failed to shutdown")
+    }
+
+    if (channel != null) {
+      channel.shutdownNow()
+    }
+
+    if (client != null) {
+      client.shutdown()
+    }
+  }
+
+  private val CHUNK_SIZE: Int = 32 * 1024
+  protected def artifactFilePath: Path = baseResourcePath.resolve("artifact-tests")
+
+  /**
+   * Check if the data sent to the server (stored in `artifactChunk`) is equivalent to the local
+   * data at `localPath`.
+   * @param artifactChunk
+   * @param localPath
+   */
+  private def assertFileDataEquality(
+      artifactChunk: AddArtifactsRequest.ArtifactChunk,
+      localPath: Path): Unit = {
+    val in = new CheckedInputStream(Files.newInputStream(localPath), new CRC32)
+    val localData = ByteString.readFrom(in)
+    assert(artifactChunk.getData == localData)
+    assert(artifactChunk.getCrc == in.getChecksum.getValue)
+  }
+
+  private def singleChunkArtifactTest(path: String): Unit = {
+    test(s"Single Chunk Artifact - $path") {
+      val artifactPath = artifactFilePath.resolve(path)
+      artifactManager.addArtifact(artifactPath.toString)
+
+      val receivedRequests = service.getAndClearLatestAddArtifactRequests()
+      // Single `AddArtifactRequest`
+      assert(receivedRequests.size == 1)
+
+      val request = receivedRequests.head
+      assert(request.hasBatch)
+
+      val batch = request.getBatch
+      // Single artifact in batch
+      assert(batch.getArtifactsList.size() == 1)
+
+      val singleChunkArtifact = batch.getArtifacts(0)
+      val namePrefix = artifactPath.getFileName.toString match {
+        case jar if jar.endsWith(".jar") => "jars"
+        case cf if cf.endsWith(".class") => "classes"
+      }
+      assert(singleChunkArtifact.getName.equals(namePrefix + "/" + path))
+      assertFileDataEquality(singleChunkArtifact.getData, artifactPath)
+    }
+  }
+
+  singleChunkArtifactTest("smallClassFile.class")
+
+  singleChunkArtifactTest("smallJar.jar")
+
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  /**
+   * Read data in a chunk of `CHUNK_SIZE` bytes from `in` and verify equality with server-side
+   * data stored in `chunk`.
+   * @param in
+   * @param chunk
+   * @return
+   */
+  private def checkChunkDataAndCrc(

Review Comment:
   A bit of a high level point. You are now using the same code to compute the crc, and to verify it. Is it possible to create more separation here. I would consider checking crcs in, or creating a file with known crc segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123829338


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO: Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)
+
+        builder.getBatchBuilder
+          .addArtifactsBuilder()
+          .setName(artifact.path.toString)
+          .setData(data)
+          .build()
+      } catch {
+        case NonFatal(e) =>
+          stream.onError(e)
+          throw e
+      } finally {
+        in.close()
+      }
+    }
+    stream.onNext(builder.build())
+  }
+
+  /**
+   * Read data from an [[InputStream]] in pieces of `chunkSize` bytes and convert to
+   * protobuf-compatible [[ByteString]].
+   * @param in
+   * @return
+   */
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  /**
+   * Add a artifact in chunks to the stream. The artifact's data is spread out over multiple
+   * [[proto.AddArtifactsRequest requests]].
+   */
+  private def addChunkedArtifact(
+      artifact: Artifact,
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+
+    val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+    try {
+      // First RPC contains the `BeginChunkedArtifact` payload (`begin_chunk`).
+      // Subsequent RPCs contains the `ArtifactChunk` payload (`chunk`).
+      val artifactChunkBuilder = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
+      var dataChunk = readNextChunk(in)
+      // Integer division that rounds up to the nearest whole number.
+      def getNumChunks(size: Long): Long = (size + (CHUNK_SIZE - 1)) / CHUNK_SIZE
+
+      builder.getBeginChunkBuilder
+        .setName(artifact.path.toString)
+        .setTotalBytes(artifact.size)
+        .setNumChunks(getNumChunks(artifact.size))
+        .setInitialChunk(
+          artifactChunkBuilder
+            .setData(dataChunk)
+            .setCrc(in.getChecksum.getValue))
+      stream.onNext(builder.build())
+      builder.clearBeginChunk()
+
+      dataChunk = readNextChunk(in)
+      // Consume stream in chunks until there is no data left to read.
+      while (!dataChunk.isEmpty) {
+        artifactChunkBuilder.setData(dataChunk).setCrc(in.getChecksum.getValue)
+        builder.setChunk(artifactChunkBuilder.build())
+        stream.onNext(builder.build())
+
+        builder.clearChunk()
+        dataChunk = readNextChunk(in)
+      }
+    } catch {
+      case NonFatal(e) =>
+        stream.onError(e)
+        throw e
+    } finally {
+      in.close()
+    }
+  }
+}
+
+trait ClassFinder {
+  def findClasses(): Iterator[Artifact]
+}
+
+class Artifact private (val path: Path, val storage: Storage) {
+  require(!path.isAbsolute, s"Bad path: $path")
+
+  lazy val size: Long = storage match {
+    case localData: LocalData => localData.size
+  }
+}
+
+object Artifact {
+  val CLASS_PREFIX: Path = Paths.get("classes")
+  val JAR_PREFIX: Path = Paths.get("jars")
+
+  def newJarArtifact(fileName: Path, storage: Storage): Artifact = {
+    newArtifact(JAR_PREFIX, ".jar", fileName, storage)
+  }
+
+  def newClassArtifact(fileName: Path, storage: Storage): Artifact = {
+    newArtifact(CLASS_PREFIX, ".class", fileName, storage)
+  }
+
+  private def newArtifact(
+      prefix: Path,
+      requiredSuffix: String,
+      fileName: Path,
+      storage: Storage): Artifact = {
+    require(!fileName.isAbsolute)
+    require(fileName.toString.endsWith(requiredSuffix))
+    new Artifact(prefix.resolve(fileName), storage)
+  }
+
+  /**
+   * A pointer to the stored bytes of an artifact.
+   */
+  sealed trait Storage
+
+  /**
+   * Payload stored on this machine.
+   */
+  sealed trait LocalData extends Storage {

Review Comment:
   Yeah, makes sense. Keeping the name `LocalData` (rather than renaming it to say, `Data`) intact to make it explicit that the data needs to be present locally for the transfer to take place (for now).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] vicennial commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "vicennial (via GitHub)" <gi...@apache.org>.
vicennial commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123821210


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.{ByteArrayInputStream, InputStream}
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.concurrent.CopyOnWriteArrayList
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+  private[this] val classFinders = new CopyOnWriteArrayList[ClassFinder]
+
+  /**
+   * Register a [[ClassFinder]] for dynamically generated classes.
+   */
+  def register(finder: ClassFinder): Unit = classFinders.add(finder)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Ensure that all classfile artifacts have been uploaded to the server, and are ready for use.
+   */
+  private[client] def ensureAllClassFileArtifactsUploaded(): Unit = {
+    addArtifacts(classFinders.asScala.flatMap(_.findClasses()))
+  }
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage.asInstanceOf[LocalData]
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO: Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)
+
+        builder.getBatchBuilder
+          .addArtifactsBuilder()
+          .setName(artifact.path.toString)
+          .setData(data)
+          .build()
+      } catch {
+        case NonFatal(e) =>
+          stream.onError(e)
+          throw e
+      } finally {
+        in.close()
+      }
+    }
+    stream.onNext(builder.build())
+  }
+
+  /**
+   * Read data from an [[InputStream]] in pieces of `chunkSize` bytes and convert to
+   * protobuf-compatible [[ByteString]].
+   * @param in
+   * @return
+   */
+  private def readNextChunk(in: InputStream): ByteString = {
+    val buf = new Array[Byte](CHUNK_SIZE)
+    var bytesRead = 0
+    var count = 0
+    while (count != -1 && bytesRead < CHUNK_SIZE) {
+      count = in.read(buf, bytesRead, CHUNK_SIZE - bytesRead)
+      if (count != -1) {
+        bytesRead += count
+      }
+    }
+    if (bytesRead == 0) ByteString.empty()
+    else ByteString.copyFrom(buf, 0, bytesRead)
+  }
+
+  /**
+   * Add a artifact in chunks to the stream. The artifact's data is spread out over multiple
+   * [[proto.AddArtifactsRequest requests]].
+   */
+  private def addChunkedArtifact(
+      artifact: Artifact,
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+
+    val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+    try {
+      // First RPC contains the `BeginChunkedArtifact` payload (`begin_chunk`).
+      // Subsequent RPCs contains the `ArtifactChunk` payload (`chunk`).
+      val artifactChunkBuilder = proto.AddArtifactsRequest.ArtifactChunk.newBuilder()
+      var dataChunk = readNextChunk(in)
+      // Integer division that rounds up to the nearest whole number.
+      def getNumChunks(size: Long): Long = (size + (CHUNK_SIZE - 1)) / CHUNK_SIZE
+
+      builder.getBeginChunkBuilder
+        .setName(artifact.path.toString)
+        .setTotalBytes(artifact.size)
+        .setNumChunks(getNumChunks(artifact.size))
+        .setInitialChunk(
+          artifactChunkBuilder
+            .setData(dataChunk)
+            .setCrc(in.getChecksum.getValue))
+      stream.onNext(builder.build())
+      builder.clearBeginChunk()
+
+      dataChunk = readNextChunk(in)
+      // Consume stream in chunks until there is no data left to read.
+      while (!dataChunk.isEmpty) {
+        artifactChunkBuilder.setData(dataChunk).setCrc(in.getChecksum.getValue)
+        builder.setChunk(artifactChunkBuilder.build())
+        stream.onNext(builder.build())
+
+        builder.clearChunk()
+        dataChunk = readNextChunk(in)
+      }
+    } catch {
+      case NonFatal(e) =>
+        stream.onError(e)
+        throw e
+    } finally {
+      in.close()
+    }
+  }
+}
+
+trait ClassFinder {

Review Comment:
   Deleting the class finder related code for now, will add it as part of https://issues.apache.org/jira/browse/SPARK-42657 (since we don't use them in this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] amaliujia commented on a diff in pull request #40256: [SPARK-42653][CONNECT] Artifact transfer from Scala/JVM client to Server

Posted by "amaliujia (via GitHub)" <gi...@apache.org>.
amaliujia commented on code in PR #40256:
URL: https://github.com/apache/spark/pull/40256#discussion_r1123965938


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ArtifactManager.scala:
##########
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.connect.client
+
+import java.io.InputStream
+import java.net.URI
+import java.nio.file.{Files, Path, Paths}
+import java.util.zip.{CheckedInputStream, CRC32}
+
+import scala.collection.mutable
+import scala.concurrent.Promise
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import Artifact._
+import com.google.protobuf.ByteString
+import io.grpc.ManagedChannel
+import io.grpc.stub.StreamObserver
+
+import org.apache.spark.connect.proto
+import org.apache.spark.connect.proto.AddArtifactsResponse
+import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * The Artifact Manager is responsible for handling and transferring artifacts from the local
+ * client to the server (local/remote).
+ * @param userContext
+ * @param channel
+ */
+class ArtifactManager(userContext: proto.UserContext, channel: ManagedChannel) {
+  // Using the midpoint recommendation of 32KiB for chunk size as specified in
+  // https://github.com/grpc/grpc.github.io/issues/371.
+  private val CHUNK_SIZE: Int = 32 * 1024
+
+  private[this] val stub = proto.SparkConnectServiceGrpc.newStub(channel)
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(path: String): Unit = {
+    addArtifact(Utils.resolveURI(path))
+  }
+
+  private def parseArtifacts(uri: URI): Seq[Artifact] = {
+    // Currently only local files with extensions .jar and .class are supported.
+    uri.getScheme match {
+      case "file" =>
+        val path = Paths.get(uri)
+        val artifact = path.getFileName.toString match {
+          case jar if jar.endsWith(".jar") =>
+            newJarArtifact(path.getFileName, new LocalFile(path))
+          case cf if cf.endsWith(".class") =>
+            newClassArtifact(path.getFileName, new LocalFile(path))
+          case other =>
+            throw new UnsupportedOperationException(s"Unsuppoted file format: $other")
+        }
+        Seq[Artifact](artifact)
+
+      case other =>
+        throw new UnsupportedOperationException(s"Unsupported scheme: $other")
+    }
+  }
+
+  /**
+   * Add a single artifact to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifact(uri: URI): Unit = addArtifacts(parseArtifacts(uri))
+
+  /**
+   * Add multiple artifacts to the session.
+   *
+   * Currently only local files with extensions .jar and .class are supported.
+   */
+  def addArtifacts(uris: Seq[URI]): Unit = addArtifacts(uris.flatMap(parseArtifacts))
+
+  /**
+   * Add a number of artifacts to the session.
+   */
+  private def addArtifacts(artifacts: Iterable[Artifact]): Unit = {
+    val promise = Promise[Seq[ArtifactSummary]]
+    val responseHandler = new StreamObserver[proto.AddArtifactsResponse] {
+      private val summaries = mutable.Buffer.empty[ArtifactSummary]
+      override def onNext(v: AddArtifactsResponse): Unit = {
+        v.getArtifactsList.forEach { summary =>
+          summaries += summary
+        }
+      }
+      override def onError(throwable: Throwable): Unit = {
+        promise.failure(throwable)
+      }
+      override def onCompleted(): Unit = {
+        promise.success(summaries.toSeq)
+      }
+    }
+    val stream = stub.addArtifacts(responseHandler)
+    val currentBatch = mutable.Buffer.empty[Artifact]
+    var currentBatchSize = 0L
+
+    def addToBatch(dep: Artifact, size: Long): Unit = {
+      currentBatch += dep
+      currentBatchSize += size
+    }
+
+    def writeBatch(): Unit = {
+      addBatchedArtifacts(currentBatch.toSeq, stream)
+      currentBatch.clear()
+      currentBatchSize = 0
+    }
+
+    artifacts.iterator.foreach { artifact =>
+      val data = artifact.storage
+      val size = data.size
+      if (size > CHUNK_SIZE) {
+        // Payload can either be a batch OR a single chunked artifact. Write batch if non-empty
+        // before chunking current artifact.
+        if (currentBatch.nonEmpty) {
+          writeBatch()
+        }
+        addChunkedArtifact(artifact, stream)
+      } else {
+        if (currentBatchSize + size > CHUNK_SIZE) {
+          writeBatch()
+        }
+        addToBatch(artifact, size)
+      }
+    }
+    if (currentBatch.nonEmpty) {
+      writeBatch()
+    }
+    stream.onCompleted()
+    ThreadUtils.awaitResult(promise.future, Duration.Inf)
+    // TODO(SPARK-42658): Handle responses containing CRC failures.
+  }
+
+  /**
+   * Add a batch of artifacts to the stream. All the artifacts in this call are packaged into a
+   * single [[proto.AddArtifactsRequest]].
+   */
+  private def addBatchedArtifacts(
+      artifacts: Seq[Artifact],
+      stream: StreamObserver[proto.AddArtifactsRequest]): Unit = {
+    val builder = proto.AddArtifactsRequest
+      .newBuilder()
+      .setUserContext(userContext)
+    artifacts.foreach { artifact =>
+      val in = new CheckedInputStream(artifact.storage.asInstanceOf[LocalData].stream, new CRC32)
+      try {
+        val data = proto.AddArtifactsRequest.ArtifactChunk
+          .newBuilder()
+          .setData(ByteString.readFrom(in))
+          .setCrc(in.getChecksum.getValue)

Review Comment:
   I am not an expert on networking so just a question for my self education:
   
   so the gRPC level bytes transmission is not 100% reliable so we need another CRC to check nothing is corrupted? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org