You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/12/10 21:52:40 UTC

[GitHub] [spark] hiboyang opened a new pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

hiboyang opened a new pull request #34864:
URL: https://github.com/apache/spark/pull/34864


   ### What changes were proposed in this pull request?
   This PR provides support to store shuffle files on external shuffle storage like S3. It helps Dynamic
   Allocation on Kubernetes. Spark driver could release idle executors without worrying about losing
   shuffle data because the shuffle data is store on external shuffle storage which are different
   from executors.
   
   This could be viewed as a followup work for https://issues.apache.org/jira/browse/SPARK-25299.
   
   ### Why are the changes needed?
   
   To better support Dynamic Allocation on Kubernetes, we need to decouple shuffle data from Spark
   executor. This PR implements another Shuffle Manager and support writing shuffle data on S3.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, this PR adds two Spark config like following to plug in another StarShuffleManager and store
   shuffle data on provided S3 location.
   ```
   spark.shuffle.manager=org.apache.spark.shuffle.StarShuffleManager
   spark.shuffle.star.rootDir=s3://my_bucket_name/my_shuffle_folder
   ```
   
   ### How was this patch tested?
   
   Added a unit test for StartShuffleManager. A lot of classes are copied from Spark, thus not add tests
   for those classes. We will work with the community to get feedback first, then work on removing code
   copy/duplication.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
steveloughran commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-1005770261


   
   Obviously I am biased, but I believe that rather than trying to use the AWS APIs yourself, you should just use the hadoop file system APIs and interact with S3 through the s3a connector.
   
   For a high-performance upload of a local file, use `FileSystem.copyFromLocalFile` in s3a on hadoop 3.3.2 this uses the same transfer manager class as this PR does but adds: exception handling/mapping, encryption settings, auditing. And the s3a integration tests verify all this works... By the time you get to use it here you can assume the S3 upload works, and it becomes a matter of linking it up to spark.
   
   
   As `copyFromLocalFile` is implemented for all filesystems, it means the component will also work with other stores including google cloud and azure abfs, even if they do not override the base method for a high-performance implementation -yet.
   
   This also means that you could write tests for the feature using file:// as the destination store and include these in the spark module; if you design such tests to be overrideable to work with other file systems, they could be picked up and reused as the actual integration test suites in an external module.
   
   And, because someone else owns the problem of the s3 connector binding, you get to avoid fielding support calls related to configuring of AWS endpoint, region, support for third-party s3 stores, qualifying AWS SDK updates, etc.
   
   Accordingly, I would propose 
   
   * ripping out `StarS3ShuffleFileManager` and using FileSystem APIs
   * write tests which can be pointed at the local FS, but then targeted at real object stores in a downstream test module, as https://github.com/hortonworks-spark/cloud-integration does for committer testing.
   
   Getting integration tests set up is inevitably going to be somewhat complicated. I can provide a bit of consultation there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on a change in pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on a change in pull request #34864:
URL: https://github.com/apache/spark/pull/34864#discussion_r779213531



##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarS3ShuffleFileManager.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.client.builder.ExecutorFactory;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.SparkHadoopUtil;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class read/write shuffle file on external storage like S3.
+ */
+public class StarS3ShuffleFileManager implements StarShuffleFileManager {
+    private static final Logger logger = LoggerFactory.getLogger(StarS3ShuffleFileManager.class);
+
+    // TODO make following values configurable
+    public final static int S3_PUT_TIMEOUT_MILLISEC = 180 * 1000;
+
+    // Following constants are copied from:
+    // https://github.com/apache/hadoop/blob/6c6d1b64d4a7cd5288fcded78043acaf23228f96/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+    public static final long DEFAULT_MULTIPART_SIZE = 67108864; // 64M
+    public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = 134217728; // 128M
+    public static final String MAX_THREADS = "fs.s3a.threads.max";
+    public static final int DEFAULT_MAX_THREADS = 10;
+    public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
+    public static final int DEFAULT_KEEPALIVE_TIME = 60;
+
+    public static final String AWS_REGION = "fs.s3a.endpoint.region";
+    public static final String DEFAULT_AWS_REGION = Regions.US_WEST_2.getName();
+
+    private static TransferManager transferManager;
+    private static Object transferManagerLock = new Object();
+
+    private final String awsRegion;
+    private final int maxThreads;
+    private final long keepAliveTime;
+
+    public StarS3ShuffleFileManager(SparkConf conf) {
+        Configuration hadoopConf = SparkHadoopUtil.get().newConfiguration(conf);
+
+        awsRegion = hadoopConf.get(AWS_REGION, DEFAULT_AWS_REGION);
+
+        int threads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
+        if (threads < 2) {
+            logger.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
+            threads = 2;
+        }
+        maxThreads = threads;
+
+        keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
+    }
+
+    @Override
+    public String createFile(String root) {
+        if (!root.endsWith("/")) {
+            root = root + "/";
+        }
+        String fileName = String.format("shuffle-%s.data", UUID.randomUUID());
+        return root + fileName;
+    }
+
+    @Override
+    public void write(InputStream data, long size, String file) {
+        logger.info("Writing to shuffle file: {}", file);
+        writeS3(data, size, file);
+    }
+
+    @Override
+    public InputStream read(String file, long offset, long size) {
+        logger.info("Opening shuffle file: {}, offset: {}, size: {}", file, offset, size);
+        return readS3(file, offset, size);
+    }
+
+    private void writeS3(InputStream inputStream, long size, String s3Url) {
+        logger.info("Uploading shuffle file to s3: {}, size: {}", s3Url, size);
+
+        S3BucketAndKey bucketAndKey = S3BucketAndKey.getFromUrl(s3Url);
+        String bucket = bucketAndKey.getBucket();
+        String key = bucketAndKey.getKey();
+
+        TransferManager transferManager = getTransferManager();
+
+        ObjectMetadata metadata = new ObjectMetadata();
+        metadata.setContentType("application/octet-stream");
+        metadata.setContentLength(size);
+
+        PutObjectRequest request = new PutObjectRequest(bucket,
+                key,
+                inputStream,
+                metadata);
+
+        AtomicLong totalTransferredBytes = new AtomicLong(0);
+
+        request.setGeneralProgressListener(new ProgressListener() {
+            private long lastLogTime = 0;
+
+            @Override
+            public void progressChanged(ProgressEvent progressEvent) {
+                long count = progressEvent.getBytesTransferred();
+                long total = totalTransferredBytes.addAndGet(count);
+                long currentTime = System.currentTimeMillis();
+                long logInterval = 10000;
+                if (currentTime - lastLogTime >= logInterval) {
+                    logger.info("S3 upload progress: {}, recent transferred {} bytes, total transferred {}", key, count, total);
+                    lastLogTime = currentTime;
+                }
+            }
+        });
+
+        // https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
+        request.getRequestClientOptions().setReadLimit((int) DEFAULT_MULTIPART_SIZE + 1);
+        request.setSdkRequestTimeout(S3_PUT_TIMEOUT_MILLISEC);
+        request.setSdkClientExecutionTimeout(S3_PUT_TIMEOUT_MILLISEC);
+        try {
+            long startTime = System.currentTimeMillis();
+            transferManager.upload(request).waitForCompletion();
+            long duration = System.currentTimeMillis() - startTime;
+            double mbs = 0;
+            if (duration != 0) {
+                mbs = ((double) size) / (1000 * 1000) / ((double) duration / 1000);
+            }
+            logger.info("S3 upload finished: {}, file size: {} bytes, total transferred: {}, throughput: {} mbs",
+                    s3Url, size, totalTransferredBytes.get(), mbs);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Failed to upload to s3: " + key, e);
+        } finally {
+            transferManager.shutdownNow();
+        }
+    }
+
+    private InputStream readS3(String s3Url, long offset, long size) {
+        logger.info("Downloading shuffle file from s3: {}, size: {}", s3Url, size);
+
+        S3BucketAndKey bucketAndKey = S3BucketAndKey.getFromUrl(s3Url);
+
+        File downloadTempFile;
+        try {
+            downloadTempFile = File.createTempFile("shuffle-download", ".data");
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create temp file for downloading shuffle file");
+        }
+
+        TransferManager transferManager = getTransferManager();
+
+        GetObjectRequest getObjectRequest = new GetObjectRequest(bucketAndKey.getBucket(), bucketAndKey.getKey())
+                .withRange(offset, offset + size);
+
+        AtomicLong totalTransferredBytes = new AtomicLong(0);
+
+        getObjectRequest.setGeneralProgressListener(new ProgressListener() {
+            private long lastLogTime = 0;
+
+            @Override
+            public void progressChanged(ProgressEvent progressEvent) {
+                long count = progressEvent.getBytesTransferred();
+                long total = totalTransferredBytes.addAndGet(count);
+                long currentTime = System.currentTimeMillis();
+                long logInterval = 10000;
+                if (currentTime - lastLogTime >= logInterval) {
+                    logger.info("S3 download progress: {}, recent transferred {} bytes, total transferred {}", s3Url, count, total);
+                    lastLogTime = currentTime;
+                }
+            }
+        });
+
+        try {
+            long startTime = System.currentTimeMillis();
+            transferManager.download(getObjectRequest, downloadTempFile).waitForCompletion();
+            long duration = System.currentTimeMillis() - startTime;
+            double mbs = 0;
+            if (duration != 0) {
+                mbs = ((double) size) / (1000 * 1000) / ((double) duration / 1000);
+            }
+            logger.info("S3 download finished: {}, file size: {} bytes, total transferred: {}, throughput: {} mbs",
+                    s3Url, size, totalTransferredBytes.get(), mbs);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(String.format(
+                    "Failed to download shuffle file %s", s3Url));
+        } finally {
+            transferManager.shutdownNow();

Review comment:
       This was a code mistake. I should not shutdown shuffle manager here. Will remove this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-992961044


   **[Test build #146147 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146147/testReport)** for PR 34864 at commit [`8222f38`](https://github.com/apache/spark/commit/8222f383e25af2c66a14c96cb3f8153d6828cbad).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991336315






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991398174


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146081/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-998215993


   Add a [design doc](https://docs.google.com/document/d/10rhvjXUlbQfWg-zh02_aqRqDT_ZnwYmICPAR--aRv64) for this prototype.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-992987507


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50620/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-993032526


   **[Test build #146147 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146147/testReport)** for PR 34864 at commit [`8222f38`](https://github.com/apache/spark/commit/8222f383e25af2c66a14c96cb3f8153d6828cbad).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991382857






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991398174


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146081/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996565782


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146303/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996305332


   **[Test build #146303 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146303/testReport)** for PR 34864 at commit [`761fe2a`](https://github.com/apache/spark/commit/761fe2a850a4e3ee73caf4cf37378914c1acc8b8).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-1006171185


   > Obviously I am biased, but I believe that rather than trying to use the AWS APIs yourself, you should just use the hadoop file system APIs and interact with S3 through the s3a connector.
   > 
   > For a high-performance upload of a local file, use `FileSystem.copyFromLocalFile` in s3a on hadoop 3.3.2 this uses the same transfer manager class as this PR does but adds: exception handling/mapping, encryption settings, auditing. And the s3a integration tests verify all this works... By the time you get to use it here you can assume the S3 upload works, and it becomes a matter of linking it up to spark.
   > 
   > As `copyFromLocalFile` is implemented for all filesystems, it means the component will also work with other stores including google cloud and azure abfs, even if they do not override the base method for a high-performance implementation -yet.
   > 
   > This also means that you could write tests for the feature using file:// as the destination store and include these in the spark module; if you design such tests to be overrideable to work with other file systems, they could be picked up and reused as the actual integration test suites in an external module.
   > 
   > And, because someone else owns the problem of the s3 connector binding, you get to avoid fielding support calls related to configuring of AWS endpoint, region, support for third-party s3 stores, qualifying AWS SDK updates, etc.
   > 
   > Accordingly, I would propose
   > 
   > * ripping out `StarS3ShuffleFileManager` and using FileSystem APIs
   > * write tests which can be pointed at the local FS, but then targeted at real object stores in a downstream test module, as https://github.com/hortonworks-spark/cloud-integration does for committer testing.
   > 
   > Getting integration tests set up is inevitably going to be somewhat complicated. I can provide a bit of consultation there.
   
   Yes, these are great suggestions! Thanks again! I will find time to make change for this, and may also reach out to your for consultation when adding integration test :)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996305332


   **[Test build #146303 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146303/testReport)** for PR 34864 at commit [`761fe2a`](https://github.com/apache/spark/commit/761fe2a850a4e3ee73caf4cf37378914c1acc8b8).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991391300


   > @hiboyang, thanks for the work here! Could you create a design doc for this? That might help get more people attention and easier for them to understand.
   
   Yes, good suggestion. Will create some design doc.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-992740381


   You are completely wrong because you already know the worker decommission feature.
   >  but it will not work well when there is shuffle data distributed on many executors (those executors cannot be released).
   
   You should mention this in the PR description explicitly instead of misleading the users.
   > The work here (storing shuffle data on S3) does not conflict with worker decommission feature. The eventual goal is to store shuffle data on S3 or other external storage directly. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-993027904


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50620/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991371738


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50556/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991394024


   **[Test build #146081 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146081/testReport)** for PR 34864 at commit [`4b67c8a`](https://github.com/apache/spark/commit/4b67c8a193b3389ca400e276f0dcea2ba4749d26).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds the following public classes _(experimental)_:
     * `public class ByteBufUtils `
     * `public class StarBlockStoreClient extends BlockStoreClient `
     * `public class StarBypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> `
     * `public class StarLocalFileShuffleFileManager implements StarShuffleFileManager `
     * `public class StarMapResultFileInfo `
     * `public class StarS3ShuffleFileManager implements StarShuffleFileManager `
     * `    public static class S3BucketAndKey `
     * `public class StarUtils `
     * `public class StartFileSegmentWriter `
     * `class StarShuffleManager(conf: SparkConf) extends ShuffleManager with Logging `
     * `final class StarShuffleBlockFetcherIterator(`
     * `  case class FetchRequest(`
     * `  case class DeferFetchRequestResult(fetchRequest: FetchRequest) extends FetchResult`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991390684


   @hiboyang, thanks for the work here! Could you create a design doc for this? That might help get more people attention and easier for them to understand.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996540236


   **[Test build #146303 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146303/testReport)** for PR 34864 at commit [`761fe2a`](https://github.com/apache/spark/commit/761fe2a850a4e3ee73caf4cf37378914c1acc8b8).
    * This patch **fails from timeout after a configured wait of `500m`**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] steveloughran commented on a change in pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
steveloughran commented on a change in pull request #34864:
URL: https://github.com/apache/spark/pull/34864#discussion_r778705352



##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarBlockStoreClient.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import org.apache.spark.SparkEnv;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.netty.SparkTransportConf;
+import org.apache.spark.network.shuffle.*;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.ShuffleBlockId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class fetches shuffle blocks from external storage like S3
+ */
+public class StarBlockStoreClient extends BlockStoreClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(StarBlockStoreClient.class);
+
+    // Fetch shuffle blocks from external shuffle storage.
+    // The shuffle location is encoded in the host argument. In the future, we should enhance
+    // Spark internal code to support abstraction of shuffle storage location.
+    @Override
+    public void fetchBlocks(String host, int port, String execId, String[] blockIds, BlockFetchingListener listener, DownloadFileManager downloadFileManager) {
+        for (int i = 0; i < blockIds.length; i++) {
+            String blockId = blockIds[i];
+            CompletableFuture.runAsync(() -> fetchBlock(host, execId, blockId, listener, downloadFileManager));
+        }
+    }
+
+    private void fetchBlock(String host, String execId, String blockIdStr, BlockFetchingListener listener, DownloadFileManager downloadFileManager) {
+        BlockId blockId = BlockId.apply(blockIdStr);
+        if (blockId instanceof ShuffleBlockId) {
+            ShuffleBlockId shuffleBlockId = (ShuffleBlockId)blockId;
+            StarMapResultFileInfo mapResultFileInfo = StarMapResultFileInfo.deserializeFromString(host);
+            long offset = 0;
+            for (int i = 0; i < shuffleBlockId.reduceId(); i++) {
+                offset += mapResultFileInfo.getPartitionLengths()[i];
+            }
+            long size = mapResultFileInfo.getPartitionLengths()[shuffleBlockId.reduceId()];
+            StarShuffleFileManager streamProvider = StarUtils.createShuffleFileManager(SparkEnv.get().conf(),
+                    mapResultFileInfo.getLocation());
+            if (downloadFileManager != null) {
+                try (InputStream inputStream = streamProvider.read(mapResultFileInfo.getLocation(), offset, size)) {
+                    TransportConf transportConf = SparkTransportConf.fromSparkConf(
+                            SparkEnv.get().conf(), "starShuffle", 1, Option.empty());
+                    DownloadFile downloadFile = downloadFileManager.createTempFile(transportConf);
+                    downloadFileManager.registerTempFileToClean(downloadFile);
+                    DownloadFileWritableChannel downloadFileWritableChannel = downloadFile.openForWriting();
+
+                    int bufferSize = 64 * 1024;
+                    byte[] bytes = new byte[bufferSize];
+                    int readBytes = 0;
+                    while (readBytes < size) {
+                        int toReadBytes = Math.min((int)size - readBytes, bufferSize);
+                        int n = inputStream.read(bytes, 0, toReadBytes);
+                        if (n == -1) {
+                            throw new RuntimeException(String.format(
+                                    "Failed to read file %s for shuffle block %s, hit end with remaining %s bytes",
+                                    mapResultFileInfo.getLocation(),
+                                    blockId,
+                                    size - readBytes));
+                        }
+                        readBytes += n;
+                        downloadFileWritableChannel.write(ByteBuffer.wrap(bytes, 0, n));
+                    }
+                    ManagedBuffer managedBuffer = downloadFileWritableChannel.closeAndRead();
+                    listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
+                } catch (IOException e) {
+                    throw new RuntimeException(String.format(

Review comment:
       include the inner exception text in the message and supply the exception as the inner exception in the constructor

##########
File path: external-shuffle-storage/pom.xml
##########
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-parent_2.12</artifactId>
+        <version>3.3.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>external-shuffle-storage_2.12</artifactId>
+    <packaging>jar</packaging>
+    <name>External Shuffle Storage</name>
+    <url>http://spark.apache.org/</url>
+
+    <properties>
+        <sbt.project.name>external-shuffle-storage</sbt.project.name>
+        <build.testJarPhase>none</build.testJarPhase>
+        <build.copyDependenciesPhase>package</build.copyDependenciesPhase>
+        <hadoop.deps.scope>provided</hadoop.deps.scope>
+        <hive.deps.scope>provided</hive.deps.scope>
+        <parquet.deps.scope>provided</parquet.deps.scope>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalacheck</groupId>
+            <artifactId>scalacheck_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>

Review comment:
       You should pull in spark-hadoop-cloud and so indirectly get its shaded full aws sdk. yes, it's big, but iat guarantees that it has a consistent set of its own dependencies (http client, jackson etc) and because it includes support for services like STS and s3 events, lets you add new features with guaranteed consistency of aws artifacts.

##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarS3ShuffleFileManager.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.client.builder.ExecutorFactory;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.SparkHadoopUtil;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class read/write shuffle file on external storage like S3.
+ */
+public class StarS3ShuffleFileManager implements StarShuffleFileManager {
+    private static final Logger logger = LoggerFactory.getLogger(StarS3ShuffleFileManager.class);
+
+    // TODO make following values configurable
+    public final static int S3_PUT_TIMEOUT_MILLISEC = 180 * 1000;
+
+    // Following constants are copied from:
+    // https://github.com/apache/hadoop/blob/6c6d1b64d4a7cd5288fcded78043acaf23228f96/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+    public static final long DEFAULT_MULTIPART_SIZE = 67108864; // 64M
+    public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = 134217728; // 128M
+    public static final String MAX_THREADS = "fs.s3a.threads.max";
+    public static final int DEFAULT_MAX_THREADS = 10;
+    public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
+    public static final int DEFAULT_KEEPALIVE_TIME = 60;
+
+    public static final String AWS_REGION = "fs.s3a.endpoint.region";
+    public static final String DEFAULT_AWS_REGION = Regions.US_WEST_2.getName();
+
+    private static TransferManager transferManager;
+    private static Object transferManagerLock = new Object();

Review comment:
       final

##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarBypassMergeSortShuffleWriter.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.Partitioner;
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.internal.config.package$;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.shuffle.StarBypassMergeSortShuffleHandle;
+import org.apache.spark.shuffle.StarOpts;
+import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
+import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
+import org.apache.spark.shuffle.sort.SortShuffleManager;
+import org.apache.spark.storage.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Option;
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.Iterator;
+
+import javax.annotation.Nullable;
+import java.io.*;

Review comment:
       bit brittle against jvm releases adding new classes here

##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarS3ShuffleFileManager.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.client.builder.ExecutorFactory;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.SparkHadoopUtil;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class read/write shuffle file on external storage like S3.
+ */
+public class StarS3ShuffleFileManager implements StarShuffleFileManager {
+    private static final Logger logger = LoggerFactory.getLogger(StarS3ShuffleFileManager.class);
+
+    // TODO make following values configurable
+    public final static int S3_PUT_TIMEOUT_MILLISEC = 180 * 1000;
+
+    // Following constants are copied from:
+    // https://github.com/apache/hadoop/blob/6c6d1b64d4a7cd5288fcded78043acaf23228f96/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+    public static final long DEFAULT_MULTIPART_SIZE = 67108864; // 64M
+    public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = 134217728; // 128M
+    public static final String MAX_THREADS = "fs.s3a.threads.max";
+    public static final int DEFAULT_MAX_THREADS = 10;
+    public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
+    public static final int DEFAULT_KEEPALIVE_TIME = 60;
+
+    public static final String AWS_REGION = "fs.s3a.endpoint.region";
+    public static final String DEFAULT_AWS_REGION = Regions.US_WEST_2.getName();
+
+    private static TransferManager transferManager;
+    private static Object transferManagerLock = new Object();
+
+    private final String awsRegion;
+    private final int maxThreads;
+    private final long keepAliveTime;
+
+    public StarS3ShuffleFileManager(SparkConf conf) {
+        Configuration hadoopConf = SparkHadoopUtil.get().newConfiguration(conf);
+
+        awsRegion = hadoopConf.get(AWS_REGION, DEFAULT_AWS_REGION);
+
+        int threads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
+        if (threads < 2) {
+            logger.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
+            threads = 2;
+        }
+        maxThreads = threads;
+
+        keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
+    }
+
+    @Override
+    public String createFile(String root) {
+        if (!root.endsWith("/")) {
+            root = root + "/";
+        }
+        String fileName = String.format("shuffle-%s.data", UUID.randomUUID());
+        return root + fileName;
+    }
+
+    @Override
+    public void write(InputStream data, long size, String file) {
+        logger.info("Writing to shuffle file: {}", file);
+        writeS3(data, size, file);
+    }
+
+    @Override
+    public InputStream read(String file, long offset, long size) {
+        logger.info("Opening shuffle file: {}, offset: {}, size: {}", file, offset, size);
+        return readS3(file, offset, size);
+    }
+
+    private void writeS3(InputStream inputStream, long size, String s3Url) {
+        logger.info("Uploading shuffle file to s3: {}, size: {}", s3Url, size);
+
+        S3BucketAndKey bucketAndKey = S3BucketAndKey.getFromUrl(s3Url);
+        String bucket = bucketAndKey.getBucket();
+        String key = bucketAndKey.getKey();
+
+        TransferManager transferManager = getTransferManager();
+
+        ObjectMetadata metadata = new ObjectMetadata();
+        metadata.setContentType("application/octet-stream");
+        metadata.setContentLength(size);
+
+        PutObjectRequest request = new PutObjectRequest(bucket,
+                key,
+                inputStream,
+                metadata);
+
+        AtomicLong totalTransferredBytes = new AtomicLong(0);
+
+        request.setGeneralProgressListener(new ProgressListener() {
+            private long lastLogTime = 0;
+
+            @Override
+            public void progressChanged(ProgressEvent progressEvent) {
+                long count = progressEvent.getBytesTransferred();
+                long total = totalTransferredBytes.addAndGet(count);
+                long currentTime = System.currentTimeMillis();
+                long logInterval = 10000;
+                if (currentTime - lastLogTime >= logInterval) {
+                    logger.info("S3 upload progress: {}, recent transferred {} bytes, total transferred {}", key, count, total);
+                    lastLogTime = currentTime;
+                }
+            }
+        });
+
+        // https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/best-practices.html
+        request.getRequestClientOptions().setReadLimit((int) DEFAULT_MULTIPART_SIZE + 1);
+        request.setSdkRequestTimeout(S3_PUT_TIMEOUT_MILLISEC);
+        request.setSdkClientExecutionTimeout(S3_PUT_TIMEOUT_MILLISEC);
+        try {
+            long startTime = System.currentTimeMillis();
+            transferManager.upload(request).waitForCompletion();
+            long duration = System.currentTimeMillis() - startTime;
+            double mbs = 0;
+            if (duration != 0) {
+                mbs = ((double) size) / (1000 * 1000) / ((double) duration / 1000);
+            }
+            logger.info("S3 upload finished: {}, file size: {} bytes, total transferred: {}, throughput: {} mbs",
+                    s3Url, size, totalTransferredBytes.get(), mbs);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Failed to upload to s3: " + key, e);
+        } finally {
+            transferManager.shutdownNow();
+        }
+    }
+
+    private InputStream readS3(String s3Url, long offset, long size) {
+        logger.info("Downloading shuffle file from s3: {}, size: {}", s3Url, size);
+
+        S3BucketAndKey bucketAndKey = S3BucketAndKey.getFromUrl(s3Url);
+
+        File downloadTempFile;
+        try {
+            downloadTempFile = File.createTempFile("shuffle-download", ".data");
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create temp file for downloading shuffle file");
+        }
+
+        TransferManager transferManager = getTransferManager();
+
+        GetObjectRequest getObjectRequest = new GetObjectRequest(bucketAndKey.getBucket(), bucketAndKey.getKey())
+                .withRange(offset, offset + size);
+
+        AtomicLong totalTransferredBytes = new AtomicLong(0);
+
+        getObjectRequest.setGeneralProgressListener(new ProgressListener() {
+            private long lastLogTime = 0;
+
+            @Override
+            public void progressChanged(ProgressEvent progressEvent) {
+                long count = progressEvent.getBytesTransferred();
+                long total = totalTransferredBytes.addAndGet(count);
+                long currentTime = System.currentTimeMillis();
+                long logInterval = 10000;
+                if (currentTime - lastLogTime >= logInterval) {
+                    logger.info("S3 download progress: {}, recent transferred {} bytes, total transferred {}", s3Url, count, total);
+                    lastLogTime = currentTime;
+                }
+            }
+        });
+
+        try {
+            long startTime = System.currentTimeMillis();
+            transferManager.download(getObjectRequest, downloadTempFile).waitForCompletion();
+            long duration = System.currentTimeMillis() - startTime;
+            double mbs = 0;
+            if (duration != 0) {
+                mbs = ((double) size) / (1000 * 1000) / ((double) duration / 1000);
+            }
+            logger.info("S3 download finished: {}, file size: {} bytes, total transferred: {}, throughput: {} mbs",
+                    s3Url, size, totalTransferredBytes.get(), mbs);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(String.format(
+                    "Failed to download shuffle file %s", s3Url));
+        } finally {
+            transferManager.shutdownNow();

Review comment:
       What if the transfer manager is reused?

##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarBlockStoreClient.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import org.apache.spark.SparkEnv;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.netty.SparkTransportConf;
+import org.apache.spark.network.shuffle.*;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.ShuffleBlockId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class fetches shuffle blocks from external storage like S3
+ */
+public class StarBlockStoreClient extends BlockStoreClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(StarBlockStoreClient.class);
+
+    // Fetch shuffle blocks from external shuffle storage.
+    // The shuffle location is encoded in the host argument. In the future, we should enhance
+    // Spark internal code to support abstraction of shuffle storage location.
+    @Override
+    public void fetchBlocks(String host, int port, String execId, String[] blockIds, BlockFetchingListener listener, DownloadFileManager downloadFileManager) {
+        for (int i = 0; i < blockIds.length; i++) {
+            String blockId = blockIds[i];
+            CompletableFuture.runAsync(() -> fetchBlock(host, execId, blockId, listener, downloadFileManager));
+        }
+    }
+
+    private void fetchBlock(String host, String execId, String blockIdStr, BlockFetchingListener listener, DownloadFileManager downloadFileManager) {
+        BlockId blockId = BlockId.apply(blockIdStr);
+        if (blockId instanceof ShuffleBlockId) {
+            ShuffleBlockId shuffleBlockId = (ShuffleBlockId)blockId;
+            StarMapResultFileInfo mapResultFileInfo = StarMapResultFileInfo.deserializeFromString(host);
+            long offset = 0;
+            for (int i = 0; i < shuffleBlockId.reduceId(); i++) {
+                offset += mapResultFileInfo.getPartitionLengths()[i];
+            }
+            long size = mapResultFileInfo.getPartitionLengths()[shuffleBlockId.reduceId()];
+            StarShuffleFileManager streamProvider = StarUtils.createShuffleFileManager(SparkEnv.get().conf(),
+                    mapResultFileInfo.getLocation());
+            if (downloadFileManager != null) {
+                try (InputStream inputStream = streamProvider.read(mapResultFileInfo.getLocation(), offset, size)) {
+                    TransportConf transportConf = SparkTransportConf.fromSparkConf(
+                            SparkEnv.get().conf(), "starShuffle", 1, Option.empty());
+                    DownloadFile downloadFile = downloadFileManager.createTempFile(transportConf);
+                    downloadFileManager.registerTempFileToClean(downloadFile);
+                    DownloadFileWritableChannel downloadFileWritableChannel = downloadFile.openForWriting();
+
+                    int bufferSize = 64 * 1024;
+                    byte[] bytes = new byte[bufferSize];
+                    int readBytes = 0;
+                    while (readBytes < size) {
+                        int toReadBytes = Math.min((int)size - readBytes, bufferSize);
+                        int n = inputStream.read(bytes, 0, toReadBytes);
+                        if (n == -1) {
+                            throw new RuntimeException(String.format(
+                                    "Failed to read file %s for shuffle block %s, hit end with remaining %s bytes",
+                                    mapResultFileInfo.getLocation(),
+                                    blockId,
+                                    size - readBytes));
+                        }
+                        readBytes += n;
+                        downloadFileWritableChannel.write(ByteBuffer.wrap(bytes, 0, n));
+                    }
+                    ManagedBuffer managedBuffer = downloadFileWritableChannel.closeAndRead();
+                    listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
+                } catch (IOException e) {
+                    throw new RuntimeException(String.format(
+                            "Failed to read file %s for shuffle block %s",
+                            mapResultFileInfo.getLocation(),
+                            blockId));
+                }
+            } else {
+                try (InputStream inputStream = streamProvider.read(mapResultFileInfo.getLocation(), offset, size)) {
+                    ByteBuffer byteBuffer = ByteBuffer.allocate((int)size);
+                    int b = inputStream.read();
+                    while (b != -1) {
+                        byteBuffer.put((byte)b);
+                        if (byteBuffer.position() == size) {
+                            break;
+                        }
+                        b = inputStream.read();
+                    }
+                    byteBuffer.flip();
+                    NioManagedBuffer managedBuffer = new NioManagedBuffer(byteBuffer);
+                    listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
+                } catch (IOException e) {
+                    throw new RuntimeException(String.format(

Review comment:
       Again, pass on inner exception details.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991382857






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991382857


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50556/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on a change in pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on a change in pull request #34864:
URL: https://github.com/apache/spark/pull/34864#discussion_r779211288



##########
File path: external-shuffle-storage/pom.xml
##########
@@ -0,0 +1,143 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.spark</groupId>
+        <artifactId>spark-parent_2.12</artifactId>
+        <version>3.3.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>external-shuffle-storage_2.12</artifactId>
+    <packaging>jar</packaging>
+    <name>External Shuffle Storage</name>
+    <url>http://spark.apache.org/</url>
+
+    <properties>
+        <sbt.project.name>external-shuffle-storage</sbt.project.name>
+        <build.testJarPhase>none</build.testJarPhase>
+        <build.copyDependenciesPhase>package</build.copyDependenciesPhase>
+        <hadoop.deps.scope>provided</hadoop.deps.scope>
+        <hive.deps.scope>provided</hive.deps.scope>
+        <parquet.deps.scope>provided</parquet.deps.scope>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math3</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scalacheck</groupId>
+            <artifactId>scalacheck_${scala.binary.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.amazonaws</groupId>

Review comment:
       Thanks for the suggestion! Yes, I was thinking to use that hadoop library as well, then did not do it due to wanting to start small with this prototype. It sounds a good idea to switch to hadoop library.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang closed pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang closed pull request #34864:
URL: https://github.com/apache/spark/pull/34864


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996347796


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50775/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996382121


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50775/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on a change in pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on a change in pull request #34864:
URL: https://github.com/apache/spark/pull/34864#discussion_r779213004



##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarS3ShuffleFileManager.java
##########
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.client.builder.ExecutorFactory;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.regions.Regions;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.SparkHadoopUtil;
+import org.apache.spark.network.util.LimitedInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.net.URI;
+import java.util.UUID;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * This class read/write shuffle file on external storage like S3.
+ */
+public class StarS3ShuffleFileManager implements StarShuffleFileManager {
+    private static final Logger logger = LoggerFactory.getLogger(StarS3ShuffleFileManager.class);
+
+    // TODO make following values configurable
+    public final static int S3_PUT_TIMEOUT_MILLISEC = 180 * 1000;
+
+    // Following constants are copied from:
+    // https://github.com/apache/hadoop/blob/6c6d1b64d4a7cd5288fcded78043acaf23228f96/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+    public static final long DEFAULT_MULTIPART_SIZE = 67108864; // 64M
+    public static final long DEFAULT_MIN_MULTIPART_THRESHOLD = 134217728; // 128M
+    public static final String MAX_THREADS = "fs.s3a.threads.max";
+    public static final int DEFAULT_MAX_THREADS = 10;
+    public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
+    public static final int DEFAULT_KEEPALIVE_TIME = 60;
+
+    public static final String AWS_REGION = "fs.s3a.endpoint.region";
+    public static final String DEFAULT_AWS_REGION = Regions.US_WEST_2.getName();
+
+    private static TransferManager transferManager;
+    private static Object transferManagerLock = new Object();

Review comment:
       yes!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-992961044


   **[Test build #146147 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146147/testReport)** for PR 34864 at commit [`8222f38`](https://github.com/apache/spark/commit/8222f383e25af2c66a14c96cb3f8153d6828cbad).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-993027904


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50620/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-993032992


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146147/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991390684


   @hiboyang, thanks for the work here! Could you create a design doc for this? That might help get more people attention and easier for them to understand.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991354018


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50556/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991382857


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50556/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-992727322


   > Hi, All. Thank you!
   > 
   > BTW, for the record, Apache Spark 3.1+ already stores its shuffle files into the external storage like S3 and reads back from it.
   > 
   > * [[SPARK-33545](https://issues.apache.org/jira/browse/SPARK-33545)][CORE] Support Fallback Storage during Worker decommission (Apache Spark 3.1.0)
   > * [[SPARK-34142](https://issues.apache.org/jira/browse/SPARK-34142)][CORE] Support Fallback Storage Cleanup during stopping SparkContext (Apache Spark 3.2.0)
   > * [[SPARK-37509](https://issues.apache.org/jira/browse/SPARK-37509)][CORE] Improve Fallback Storage upload speed by avoiding S3 rate limiter (Apache Spark 3.3.0)
   > 
   > It would be great not to ignore the existing Spark feature and avoid over-claiming.
   > 
   > Dynamic allocation is the same. Apache Spark has been supporting Dynamic Allocation in K8s too.
   
   Right, Spark has shuffle tracking to support Dynamic Allocation on Kubernetes, but it will not work well when there is shuffle data distributed on many executors (those executors cannot be released).
   
   The work here (storing shuffle data on S3) does not conflict with worker decommission feature. The eventual goal is to store shuffle data on S3 or other external storage directly. Before getting there, people could still use the worker decommission feature.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on a change in pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on a change in pull request #34864:
URL: https://github.com/apache/spark/pull/34864#discussion_r779212003



##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarBlockStoreClient.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import org.apache.spark.SparkEnv;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.netty.SparkTransportConf;
+import org.apache.spark.network.shuffle.*;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.ShuffleBlockId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class fetches shuffle blocks from external storage like S3
+ */
+public class StarBlockStoreClient extends BlockStoreClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(StarBlockStoreClient.class);
+
+    // Fetch shuffle blocks from external shuffle storage.
+    // The shuffle location is encoded in the host argument. In the future, we should enhance
+    // Spark internal code to support abstraction of shuffle storage location.
+    @Override
+    public void fetchBlocks(String host, int port, String execId, String[] blockIds, BlockFetchingListener listener, DownloadFileManager downloadFileManager) {
+        for (int i = 0; i < blockIds.length; i++) {
+            String blockId = blockIds[i];
+            CompletableFuture.runAsync(() -> fetchBlock(host, execId, blockId, listener, downloadFileManager));
+        }
+    }
+
+    private void fetchBlock(String host, String execId, String blockIdStr, BlockFetchingListener listener, DownloadFileManager downloadFileManager) {
+        BlockId blockId = BlockId.apply(blockIdStr);
+        if (blockId instanceof ShuffleBlockId) {
+            ShuffleBlockId shuffleBlockId = (ShuffleBlockId)blockId;
+            StarMapResultFileInfo mapResultFileInfo = StarMapResultFileInfo.deserializeFromString(host);
+            long offset = 0;
+            for (int i = 0; i < shuffleBlockId.reduceId(); i++) {
+                offset += mapResultFileInfo.getPartitionLengths()[i];
+            }
+            long size = mapResultFileInfo.getPartitionLengths()[shuffleBlockId.reduceId()];
+            StarShuffleFileManager streamProvider = StarUtils.createShuffleFileManager(SparkEnv.get().conf(),
+                    mapResultFileInfo.getLocation());
+            if (downloadFileManager != null) {
+                try (InputStream inputStream = streamProvider.read(mapResultFileInfo.getLocation(), offset, size)) {
+                    TransportConf transportConf = SparkTransportConf.fromSparkConf(
+                            SparkEnv.get().conf(), "starShuffle", 1, Option.empty());
+                    DownloadFile downloadFile = downloadFileManager.createTempFile(transportConf);
+                    downloadFileManager.registerTempFileToClean(downloadFile);
+                    DownloadFileWritableChannel downloadFileWritableChannel = downloadFile.openForWriting();
+
+                    int bufferSize = 64 * 1024;
+                    byte[] bytes = new byte[bufferSize];
+                    int readBytes = 0;
+                    while (readBytes < size) {
+                        int toReadBytes = Math.min((int)size - readBytes, bufferSize);
+                        int n = inputStream.read(bytes, 0, toReadBytes);
+                        if (n == -1) {
+                            throw new RuntimeException(String.format(
+                                    "Failed to read file %s for shuffle block %s, hit end with remaining %s bytes",
+                                    mapResultFileInfo.getLocation(),
+                                    blockId,
+                                    size - readBytes));
+                        }
+                        readBytes += n;
+                        downloadFileWritableChannel.write(ByteBuffer.wrap(bytes, 0, n));
+                    }
+                    ManagedBuffer managedBuffer = downloadFileWritableChannel.closeAndRead();
+                    listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
+                } catch (IOException e) {
+                    throw new RuntimeException(String.format(
+                            "Failed to read file %s for shuffle block %s",
+                            mapResultFileInfo.getLocation(),
+                            blockId));
+                }
+            } else {
+                try (InputStream inputStream = streamProvider.read(mapResultFileInfo.getLocation(), offset, size)) {
+                    ByteBuffer byteBuffer = ByteBuffer.allocate((int)size);
+                    int b = inputStream.read();
+                    while (b != -1) {
+                        byteBuffer.put((byte)b);
+                        if (byteBuffer.position() == size) {
+                            break;
+                        }
+                        b = inputStream.read();
+                    }
+                    byteBuffer.flip();
+                    NioManagedBuffer managedBuffer = new NioManagedBuffer(byteBuffer);
+                    listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
+                } catch (IOException e) {
+                    throw new RuntimeException(String.format(

Review comment:
       Yes, good catch! Will add inner excepion!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-993032992


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146147/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-992832070


   Hi Dongjoon, you got some misunderstandings here. I am writing a design doc for this PR. Hope that will help you to understand more and address your questions.
   
   > You are completely wrong because you already know the worker decommission feature.
   > 
   > > but it will not work well when there is shuffle data distributed on many executors (those executors cannot be released).
   > 
   > You should mention this in the PR description explicitly instead of misleading the users.
   > 
   > > The work here (storing shuffle data on S3) does not conflict with worker decommission feature. The eventual goal is to store shuffle data on S3 or other external storage directly.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-993013404


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50620/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-992722920


   > Quickly glanced through the code, seems for writing shuffle data we are writing locally first and then upload to S3, similarly for reading shuffle data we are downloading data to a local temp file first and then read.
   > 
   > We should be able to write/read direct to/from S3, right?
   
   Thanks for looking! Yes, we should be able to write/read direct on S3. This PR is a prototype. Still need to improve the code and performance of writing/reading shuffle data on S3.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991391300


   > @hiboyang, thanks for the work here! Could you create a design doc for this? That might help get more people attention and easier for them to understand.
   
   Yes, good suggestion. Will create some design doc.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996384036


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50775/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996565782


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/146303/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on a change in pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on a change in pull request #34864:
URL: https://github.com/apache/spark/pull/34864#discussion_r779212003



##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarBlockStoreClient.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import org.apache.spark.SparkEnv;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.netty.SparkTransportConf;
+import org.apache.spark.network.shuffle.*;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.storage.BlockId;
+import org.apache.spark.storage.ShuffleBlockId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This class fetches shuffle blocks from external storage like S3
+ */
+public class StarBlockStoreClient extends BlockStoreClient {
+
+    private static final Logger logger = LoggerFactory.getLogger(StarBlockStoreClient.class);
+
+    // Fetch shuffle blocks from external shuffle storage.
+    // The shuffle location is encoded in the host argument. In the future, we should enhance
+    // Spark internal code to support abstraction of shuffle storage location.
+    @Override
+    public void fetchBlocks(String host, int port, String execId, String[] blockIds, BlockFetchingListener listener, DownloadFileManager downloadFileManager) {
+        for (int i = 0; i < blockIds.length; i++) {
+            String blockId = blockIds[i];
+            CompletableFuture.runAsync(() -> fetchBlock(host, execId, blockId, listener, downloadFileManager));
+        }
+    }
+
+    private void fetchBlock(String host, String execId, String blockIdStr, BlockFetchingListener listener, DownloadFileManager downloadFileManager) {
+        BlockId blockId = BlockId.apply(blockIdStr);
+        if (blockId instanceof ShuffleBlockId) {
+            ShuffleBlockId shuffleBlockId = (ShuffleBlockId)blockId;
+            StarMapResultFileInfo mapResultFileInfo = StarMapResultFileInfo.deserializeFromString(host);
+            long offset = 0;
+            for (int i = 0; i < shuffleBlockId.reduceId(); i++) {
+                offset += mapResultFileInfo.getPartitionLengths()[i];
+            }
+            long size = mapResultFileInfo.getPartitionLengths()[shuffleBlockId.reduceId()];
+            StarShuffleFileManager streamProvider = StarUtils.createShuffleFileManager(SparkEnv.get().conf(),
+                    mapResultFileInfo.getLocation());
+            if (downloadFileManager != null) {
+                try (InputStream inputStream = streamProvider.read(mapResultFileInfo.getLocation(), offset, size)) {
+                    TransportConf transportConf = SparkTransportConf.fromSparkConf(
+                            SparkEnv.get().conf(), "starShuffle", 1, Option.empty());
+                    DownloadFile downloadFile = downloadFileManager.createTempFile(transportConf);
+                    downloadFileManager.registerTempFileToClean(downloadFile);
+                    DownloadFileWritableChannel downloadFileWritableChannel = downloadFile.openForWriting();
+
+                    int bufferSize = 64 * 1024;
+                    byte[] bytes = new byte[bufferSize];
+                    int readBytes = 0;
+                    while (readBytes < size) {
+                        int toReadBytes = Math.min((int)size - readBytes, bufferSize);
+                        int n = inputStream.read(bytes, 0, toReadBytes);
+                        if (n == -1) {
+                            throw new RuntimeException(String.format(
+                                    "Failed to read file %s for shuffle block %s, hit end with remaining %s bytes",
+                                    mapResultFileInfo.getLocation(),
+                                    blockId,
+                                    size - readBytes));
+                        }
+                        readBytes += n;
+                        downloadFileWritableChannel.write(ByteBuffer.wrap(bytes, 0, n));
+                    }
+                    ManagedBuffer managedBuffer = downloadFileWritableChannel.closeAndRead();
+                    listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
+                } catch (IOException e) {
+                    throw new RuntimeException(String.format(
+                            "Failed to read file %s for shuffle block %s",
+                            mapResultFileInfo.getLocation(),
+                            blockId));
+                }
+            } else {
+                try (InputStream inputStream = streamProvider.read(mapResultFileInfo.getLocation(), offset, size)) {
+                    ByteBuffer byteBuffer = ByteBuffer.allocate((int)size);
+                    int b = inputStream.read();
+                    while (b != -1) {
+                        byteBuffer.put((byte)b);
+                        if (byteBuffer.position() == size) {
+                            break;
+                        }
+                        b = inputStream.read();
+                    }
+                    byteBuffer.flip();
+                    NioManagedBuffer managedBuffer = new NioManagedBuffer(byteBuffer);
+                    listener.onBlockFetchSuccess(blockIdStr, managedBuffer);
+                } catch (IOException e) {
+                    throw new RuntimeException(String.format(

Review comment:
       Yes, good catch! Will add inner exception!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] linzebing commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
linzebing commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991397006


   Quickly glanced through the code, seems for writing shuffle data we are writing locally first and then upload to S3, similarly for reading shuffle data we are downloading data to a local temp file first and then read.
   
   We should be able to write/read direct to/from S3, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991336315


   **[Test build #146081 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146081/testReport)** for PR 34864 at commit [`4b67c8a`](https://github.com/apache/spark/commit/4b67c8a193b3389ca400e276f0dcea2ba4749d26).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991336315


   **[Test build #146081 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146081/testReport)** for PR 34864 at commit [`4b67c8a`](https://github.com/apache/spark/commit/4b67c8a193b3389ca400e276f0dcea2ba4749d26).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] linzebing commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
linzebing commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991397006


   Quickly glanced through the code, seems for writing shuffle data we are writing locally first and then upload to S3, similarly for reading shuffle data we are downloading data to a local temp file first and then read.
   
   We should be able to write/read direct to/from S3, right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-991336315


   **[Test build #146081 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/146081/testReport)** for PR 34864 at commit [`4b67c8a`](https://github.com/apache/spark/commit/4b67c8a193b3389ca400e276f0dcea2ba4749d26).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34864:
URL: https://github.com/apache/spark/pull/34864#issuecomment-996384036


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/50775/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] hiboyang commented on a change in pull request #34864: [SHUFFLE] [WIP] Prototype: store shuffle file on external storage like S3

Posted by GitBox <gi...@apache.org>.
hiboyang commented on a change in pull request #34864:
URL: https://github.com/apache/spark/pull/34864#discussion_r779212802



##########
File path: external-shuffle-storage/src/main/java/org/apache/spark/starshuffle/StarBypassMergeSortShuffleWriter.java
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.starshuffle;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.Partitioner;
+import org.apache.spark.ShuffleDependency;
+import org.apache.spark.SparkConf;
+import org.apache.spark.internal.config.package$;
+import org.apache.spark.scheduler.MapStatus;
+import org.apache.spark.serializer.Serializer;
+import org.apache.spark.serializer.SerializerInstance;
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
+import org.apache.spark.shuffle.ShuffleWriter;
+import org.apache.spark.shuffle.StarBypassMergeSortShuffleHandle;
+import org.apache.spark.shuffle.StarOpts;
+import org.apache.spark.shuffle.api.ShuffleExecutorComponents;
+import org.apache.spark.shuffle.api.ShuffleMapOutputWriter;
+import org.apache.spark.shuffle.sort.SortShuffleManager;
+import org.apache.spark.storage.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.None$;
+import scala.Option;
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.Iterator;
+
+import javax.annotation.Nullable;
+import java.io.*;

Review comment:
       Good point, let me remove .* here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org