You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-dev@hadoop.apache.org by "Balaji Ganesan (Jira)" <ji...@apache.org> on 2022/01/28 00:01:00 UTC

[jira] [Created] (HADOOP-18097) StagingCommitter getFinalKey method can add an extra / if getS3KeyPrefix returns ""

Balaji Ganesan created HADOOP-18097:
---------------------------------------

             Summary: StagingCommitter getFinalKey method can add an extra / if getS3KeyPrefix returns ""
                 Key: HADOOP-18097
                 URL: https://issues.apache.org/jira/browse/HADOOP-18097
             Project: Hadoop Common
          Issue Type: Bug
          Components: fs/s3
    Affects Versions: 3.3.1
         Environment: apache-spark 3.2 with hadoop 3.3.1 on Ubuntu 20.04

 

My spark-default.conf file

 

--

spark.driver.extraClassPath=/home/bganesan//spark/dist/stocator/jars/*
spark.executor.extraClassPath=/home/bganesan/spark/dist/stocator/jars/*
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored=true
spark.hadoop.fs.s3a.path.style.access=true
spark.hadoop.fs.s3a.fast.upload=true
spark.hadoop.fs.s3a.committer.name=directory
spark.hadoop.fs.s3a.committer.magic.enabled=false
spark.hadoop.fs.s3a.commiter.staging.conflict-mode=replace
spark.hadoop.fs.s3a.committer.staging.unique-filenames=true
spark.hadoop.fs.s3a.committer.abort.pending.uploads=false
spark.hadoop.fs.s3a.committer.tmp.path=tmp/staging
spark.hadoop.fs.s3a.buffer.dir=/tmp/buffer
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a=org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory
spark.sql.sources.commitProtocolClass=org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class=org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter

--

 

I run terrasort as

 

--

export SPARK_HOME=$HOME/spark/dist
    rm -rf /tmp/staging
    rm -rf /tmp/buffer
    mkdir /tmp/staging
    mkdir /tmp/buffer
    ./bin/spark-submit \
    --master local \
    --driver-memory 2g \
    --num-executors 2 \
    --executor-cores 3 \
    --executor-memory 2G \
    --conf spark.default.parallelism=2 \
    --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
    --conf spark.hadoop.fs.s3a.endpoint="https://s3store.io" \
    --conf spark.hadoop.fs.s3a.access.key=$ACCESS_KEY_ID \
    --conf spark.hadoop.fs.s3a.secret.key=$SECRET_ACCESS_KEY \
    --conf spark.eventLog.dir=s3a://spark/spark-events/ \
    --conf spark.driver.extraJavaOptions='-Dcom.amazonaws.services.s3.enableV4' \
    --class com.github.ehiggs.spark.terasort.TeraGen \
    ./terasort/jars/spark-terasort-1.2-SNAPSHOT-jar-with-dependencies.jar \
    200m \
    s3a://terasort-s3-in/

 
            Reporter: Balaji Ganesan


I am trying to test staging committer against an on prem object store using spark terasort and ran into this issue. All my initiate MPU were failing with S3 error key not found. This object store doesn't support virtual host style request, so I had path style enabled. After adding some extra debug and building hadoop-aws locally, I found that staging committer was always adding a '/' prefix to my key.  

 

So instead of part part-r-00000-4ead11c8-bc20-4dee-9753-1b1f1ae4e578 I would end up with /part-r-00000-4ead11c8-bc20-4dee-9753-1b1f1ae4e578. I traced it to getFinalKey in StagingCommitter.java which had the following code

 
 *      return getS3KeyPrefix(context) + "/"
-          + Paths.addUUID(relative, getUUID());

If getS3KeyPrefix(context) is "", then we end up with /part-r... as the key. 

 

I made the following change locally and was able to resolve the issue

 

 

---

diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 59114f7ab73..6d76cf2d419 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -365,11 +365,16 @@ public Path getTempTaskAttemptPath(TaskAttemptContext context) {
    * @return the S3 key where the file will be uploaded
    */
   protected String getFinalKey(String relative, JobContext context) {
+    StringBuilder sb = new StringBuilder();
+    final String pfx = getS3KeyPrefix(context);
+    if (!pfx.isEmpty()) {
+        sb.append(pfx).append('/');
+    }
+
     if (uniqueFilenames) {
-      return getS3KeyPrefix(context) + "/"
-          + Paths.addUUID(relative, getUUID());
+        return sb.append(Paths.addUUID(relative, getUUID())).toString(); 
     } else {
-      return getS3KeyPrefix(context) + "/" + relative;
+        return sb.append(relative).toString();
     }
   }



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-dev-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-dev-help@hadoop.apache.org