You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nick Hryhoriev (Jira)" <ji...@apache.org> on 2019/11/06 09:38:00 UTC

[jira] [Comment Edited] (SPARK-19407) defaultFS is used FileSystem.get instead of getting it from uri scheme

    [ https://issues.apache.org/jira/browse/SPARK-19407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16968221#comment-16968221 ] 

Nick Hryhoriev edited comment on SPARK-19407 at 11/6/19 9:37 AM:
-----------------------------------------------------------------

I have the same issue with spark 2.4.4.
 When I Use spark on YARN in Client mode.
{code:java}
val sparkConf = SparkOnYarnAppController.sparkHadoopKeys(configuration)
 .foldLeft(new SparkConf()
 .setAppName(appName)
 .setIfMissing("spark.ui.enabled", "false")
 .setMaster("yarn")
 .setIfMissing("spark.hadoop.yarn.resourcemanager.hostname",s"hadoop-$cluster.com")
 .setIfMissing("spark.yarn.archive", s"hdfs:///sparkDistributions/$distribution.tgz")
 .setIfMissing("spark.dynamicAllocation.enabled", "false")
 .setIfMissing("spark.driver.memory", "1g")
 .setIfMissing("spark.driver.cores", "1")
 .setIfMissing("spark.executor.memory", "1g")
 .setIfMissing("spark.executor.instances", "1")
 .setIfMissing("spark.executor.cores", "1")
 .setIfMissing("spark.yarn.maxAppAttempts", "1")
 )((sparkConf, hadoopProps) => sparkConf.set(hadoopProps._1, hadoopProps._2))
SparkSession.builder().config(sparkConf).getOrCreate()
case class TestRecord(partition: Long, value: String)
object CommonTools {
implicit class MockDataframe(session: SparkSession) {
implicit val sqlContext: SQLContext = session.sqlContext
def mockDataFrame[T: Encoder](mockData: Seq[T]): DataFrame =
{ val mockStream = MemoryStream[T] mockStream.addData(mockData) mockStream.toDF() }
}
implicit class StreamSinkToHadoopFileSystem(dataFrame: DataFrame) {
def sinkToS3(s3path: String, format: String, checkpointDir: String, trigger: Trigger): StreamingQuery =
{ dataFrame.writeStream .format("parquet") .queryName("Test-TooManyVersionPerRootPrefixInS3") .trigger(trigger) .option("checkpointLocation", checkpointDir) .format(format) .partitionBy("partition") .option("path", s3path) .start() }
}
}
 
val stream = sparkSession
 .mockDataFrame[TestRecord]((0 to 100).map { i => TestRecord(i, s"i-${UUID.randomUUID().toString}") })
 .sinkToS3(
 s3path = s"$outputDir/TooManyVersionPerRootPrefixInS3/",
 format = "parquet",
 checkpointDir = s"$checkpointDir/TooManyVersionPerRootPrefixInS3-checkpoint",
 trigger = Trigger.ProcessingTime(5.seconds)
 ){code}
 


was (Author: hryhoriev.nick):
I have the same issue with spark 2.4.4.
When I Use spark on YARN in Client mode.

```

val sparkConf = SparkOnYarnAppController.sparkHadoopKeys(configuration)
 .foldLeft(new SparkConf()
 .setAppName(appName)
 .setIfMissing("spark.ui.enabled", "false")
 .setMaster("yarn")
 .setIfMissing("spark.hadoop.yarn.resourcemanager.hostname", s"hadoop-$cluster-prod.eu1.appsflyer.com")
 .setIfMissing("spark.yarn.archive", s"hdfs:///sparkDistributions/$distribution.tgz")
 .setIfMissing("spark.dynamicAllocation.enabled", "false")
 .setIfMissing("spark.driver.memory", "1g")
 .setIfMissing("spark.driver.cores", "1")
 .setIfMissing("spark.executor.memory", "1g")
 .setIfMissing("spark.executor.instances", "1")
 .setIfMissing("spark.executor.cores", "1")
 .setIfMissing("spark.yarn.maxAppAttempts", "1")
 )((sparkConf, hadoopProps) => sparkConf.set(hadoopProps._1, hadoopProps._2))

SparkSession.builder().config(sparkConf).getOrCreate()
```

```

case class TestRecord(partition: Long, value: String)

object CommonTools {

 implicit class MockDataframe(session: SparkSession) {

 implicit val sqlContext: SQLContext = session.sqlContext

 def mockDataFrame[T: Encoder](mockData: Seq[T]): DataFrame = {
 val mockStream = MemoryStream[T]
 mockStream.addData(mockData)
 mockStream.toDF()
 }

 }

 implicit class StreamSinkToHadoopFileSystem(dataFrame: DataFrame) {

 def sinkToS3(s3path: String, format: String, checkpointDir: String, trigger: Trigger): StreamingQuery = {
 dataFrame.writeStream
 .format("parquet")
 .queryName("Test-TooManyVersionPerRootPrefixInS3")
 .trigger(trigger)
 .option("checkpointLocation", checkpointDir)
 .format(format)
 .partitionBy("partition")
 .option("path", s3path)
 .start()
 }

 }

}

```


```

val stream = sparkSession
 .mockDataFrame[TestRecord]((0 to 100).map \{ i => TestRecord(i, s"i-${UUID.randomUUID().toString}") })
 .sinkToS3(
 s3path = s"$outputDir/TooManyVersionPerRootPrefixInS3/",
 format = "parquet",
 checkpointDir = s"$checkpointDir/TooManyVersionPerRootPrefixInS3-checkpoint",
 trigger = Trigger.ProcessingTime(5.seconds)
 )
```

> defaultFS is used FileSystem.get instead of getting it from uri scheme
> ----------------------------------------------------------------------
>
>                 Key: SPARK-19407
>                 URL: https://issues.apache.org/jira/browse/SPARK-19407
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>            Reporter: Amit Assudani
>            Assignee: Genmao Yu
>            Priority: Major
>              Labels: checkpoint, filesystem, starter, streaming
>             Fix For: 2.1.1, 2.2.0
>
>
> Caused by: java.lang.IllegalArgumentException: Wrong FS: s3a://**************/checkpoint/7b2231a3-d845-4740-bfa3-681850e5987f/metadata, expected: file:///
> 	at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:649)
> 	at org.apache.hadoop.fs.RawLocalFileSystem.pathToFile(RawLocalFileSystem.java:82)
> 	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:606)
> 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
> 	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
> 	at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
> 	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
> 	at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
> 	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
> 	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
> 	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
> Can easily replicate on spark standalone cluster by providing checkpoint location uri scheme anything other than "file://" and not overriding in config.
> WorkAround  --conf spark.hadoop.fs.defaultFS=s3a://somebucket or set it in sparkConf or spark-default.conf



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org