You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Steve Loughran (JIRA)" <ji...@apache.org> on 2017/01/05 11:26:58 UTC

[jira] [Comment Edited] (SPARK-19013) java.util.ConcurrentModificationException when using s3 path as checkpointLocation

    [ https://issues.apache.org/jira/browse/SPARK-19013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15801059#comment-15801059 ] 

Steve Loughran edited comment on SPARK-19013 at 1/5/17 11:26 AM:
-----------------------------------------------------------------

ok, that is potentially the problem. One thing here, this is using s3://, which implies to me that this is is EMR, which has the option of 

# I think someone could rework S3a create, so that on ovewrite=true, the check for the file existing is skipped: we only need to make sure the destination isn't a directory. HADOOP-13950 covers that. As usual, code with tests welcome.
# For object stores where close() does PUT, and PUT is atomic, Spark could bypass the write+ rename strategy altogether. It could just be a direct write, knowing that the output will atomically overwrite then destination when it has finished. This actually delivers better atomicity/consistency than rename, as well as being faster.

For point #1, this'd be moot for Tim if he's using AWS S3. Even on ASF S3A, he'd have to wait until we shipped it in Apache Hadoop 2.8.1 or later (i.e: it hasn't made the 2.8.0 cut).

For point #2, the hard part is having Spark know that the operation is going to have that characteristic. I could put it in the FS API. HADOOP-9565 has long discussed this, but it's been hard to come up with a good strategy here which supports very different behaviours, not just across store schemas, but potentially against different endpoints. Example; enterprise S3 endpoints may have different semantics than AWS S3.

How about if hadoop added a configuration option to list all object store schemas; such as {{fs.schemas.object-stores.atomic-put-on-close}}, which would default to" "s3n, s3a, wasb, gcs", on EMR amazon could add s3 there. (ASF deprecated s3 is different, see). Custom deployments could say different things, just through reconfiguration. While this doesn't address different semantics on different buckets, it will let spark, hive, etc, see that they are working with an object store *without needing any changes in the FS API*

This could be probed with a different policy
{code}
val putDirect = conf.getStringCollection("fs.schemas.object-stores.atomic-put-on-close").contains(dest.toURI.getSchema)
if (putDirect) { /* do a direct create+write */ } else { do write + rename }
{code}


This will make a big different on large file checkpoints. Currently, the time to close() is data/upload-bandwidth; rename is data/store-copy-bandwidth (~6MB/s on S3).
With the latest s3a block upload, you get uploads during the write, but the PUT is still atomic in the close(). With that and no-rename, way faster.



was (Author: stevel@apache.org):
ok, that is potentially the problem. One thing here, this is using s3://, which implies to me that this is is EMR, which has the option of 

# I think someone could rework S3a create, so that on ovewrite=true, the check for the file existing is skipped: we only need to make sure the destination isn't a directory. HADOOP-13950 covers that. As usual, code with tests welcome.
# For object stores where close() does PUT, and PUT is atomic, Spark could bypass the write+ rename strategy altogether. It could just be a direct write, knowing that the output will atomically overwrite then destination when it has finished. This actually delivers better atomicity/consistency than rename, as well as being faster.

For point #2, the hard part is having Spark know that the operation is going to have that characteristic. I could put it in the FS API. HADOOP-9565 has long discussed this, but it's been hard to come up with a good strategy here which supports very different behaviours, not just across store schemas, but potentially against different endpoints. Example; enterprise S3 endpoints may have different semantics than AWS S3.

How about if hadoop added a configuration option to list all object store schemas; such as {{fs.schemas.object-stores.atomic-put-on-close}}, which would default to" "s3n, s3a, wasb, gcs", on EMR amazon could add s3 there. (ASF deprecated s3 is different, see). Custom deployments could say different things, just through reconfiguration. While this doesn't address different semantics on different buckets, it will let spark, hive, etc, see that they are working with an object store *without needing any changes in the FS API*

This could be probed with a different policy
{code}
val putDirect = conf.getStringCollection("fs.schemas.object-stores.atomic-put-on-close").contains(dest.toURI.getSchema)
if (putDirect) { /* do a direct create+write */ } else { do write + rename }
{code}


This will make a big different on large file checkpoints. Currently, the time to close() is data/upload-bandwidth; rename is data/store-copy-bandwidth (~6MB/s on S3).
With the latest s3a block upload, you get uploads during the write, but the PUT is still atomic in the close(). With that and no-rename, way faster.


> java.util.ConcurrentModificationException when using s3 path as checkpointLocation 
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-19013
>                 URL: https://issues.apache.org/jira/browse/SPARK-19013
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.0.2
>            Reporter: Tim Chan
>
> I have a structured stream job running on EMR. The job will fail due to this
> {code}
> Multiple HDFSMetadataLog are using s3://mybucket/myapp org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatch(HDFSMetadataLog.scala:162)
> {code}
> There is only one instance of this stream job running.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org