You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by igyu <ig...@21cn.com> on 2021/08/09 03:17:49 UTC

How can I read ftp

val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/"

val schemas = StructType(List(
        new StructField("name", DataTypes.StringType, true),
        new StructField("age", DataTypes.IntegerType, true),
        new StructField("remk", DataTypes.StringType, true)))   val DF = sparkSession.read.format("csv")
      .schema(schemas)
      .option("header","true")
      .load(ftpUrl)
//      .filter("created<=1602864000")

    DF.printSchema()
    DF.show()
I get error

Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX
at org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282)
at org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149)
at org.apache.commons.lang3.time.FastDatePrinter.<init>(FastDatePrinter.java:142)
at org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:384)
at org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:369)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88)
at org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
at org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165)
at org.apache.spark.sql.execution.datasources.csv.CSVOptions.<init>(CSVOptions.scala:139)
at org.apache.spark.sql.execution.datasources.csv.CSVOptions.<init>(CSVOptions.scala:41)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105)
at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330)
at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40)
at com.join.synctool$.main(synctool.scala:41)
at com.join.synctool.main(synctool.scala)
21/08/09 11:15:08 INFO SparkContext: Invoking stop() from shutdown hook



igyu

Re: How can I read ftp

Posted by Паша <pa...@gmail.com>.
We have solved it using an orchestrator, which copied data from FTP to
HDFS. But of course, you can just use a Java FTP client to just read files,
put them somewhere, and then read with Spark.

пн, 9 авг. 2021 г. в 06:39, Sean Owen <sr...@gmail.com>:

> FTP is definitely not supported. Read the files to distributed storage
> first then read from there.
>
> On Sun, Aug 8, 2021, 10:18 PM igyu <ig...@21cn.com> wrote:
>
>> val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/"
>>
>> val schemas = StructType(List(
>>         new StructField("name", DataTypes.StringType, true),
>>         new StructField("age", DataTypes.IntegerType, true),
>>         new StructField("remk", DataTypes.StringType, true)))
>>
>>    val DF = sparkSession.read.format("csv")
>>       .schema(schemas)
>>       .option("header","true")
>>       .load(ftpUrl)
>> //      .filter("created<=1602864000")
>>
>>     DF.printSchema()
>>     DF.show()
>>
>> I get error
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Illegal
>> pattern component: XXX
>> at
>> org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282)
>> at
>> org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149)
>> at
>> org.apache.commons.lang3.time.FastDatePrinter.<init>(FastDatePrinter.java:142)
>> at
>> org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:384)
>> at
>> org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:369)
>> at
>> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91)
>> at
>> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88)
>> at
>> org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
>> at
>> org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165)
>> at
>> org.apache.spark.sql.execution.datasources.csv.CSVOptions.<init>(CSVOptions.scala:139)
>> at
>> org.apache.spark.sql.execution.datasources.csv.CSVOptions.<init>(CSVOptions.scala:41)
>> at
>> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105)
>> at
>> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>> at
>> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>> at
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312)
>> at
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310)
>> at
>> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330)
>> at
>> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>> at
>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>> at
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>> at
>> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
>> at
>> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
>> at org.apache.spark.sql.Dataset.org
>> $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
>> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
>> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
>> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
>> at
>> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
>> at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
>> at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
>> at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
>> at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
>> at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
>> at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
>> at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
>> at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40)
>> at com.join.synctool$.main(synctool.scala:41)
>> at com.join.synctool.main(synctool.scala)
>> 21/08/09 11:15:08 INFO SparkContext: Invoking stop() from shutdown hook
>>
>> ------------------------------
>> igyu
>>
>

Re: How can I read ftp

Posted by Sean Owen <sr...@gmail.com>.
FTP is definitely not supported. Read the files to distributed storage
first then read from there.

On Sun, Aug 8, 2021, 10:18 PM igyu <ig...@21cn.com> wrote:

> val ftpUrl = "ftp://ftpuser:ftpuser@10.3.87.51:21/sparkftp/"
>
> val schemas = StructType(List(
>         new StructField("name", DataTypes.StringType, true),
>         new StructField("age", DataTypes.IntegerType, true),
>         new StructField("remk", DataTypes.StringType, true)))
>
>    val DF = sparkSession.read.format("csv")
>       .schema(schemas)
>       .option("header","true")
>       .load(ftpUrl)
> //      .filter("created<=1602864000")
>
>     DF.printSchema()
>     DF.show()
>
> I get error
>
> Exception in thread "main" java.lang.IllegalArgumentException: Illegal
> pattern component: XXX
> at
> org.apache.commons.lang3.time.FastDatePrinter.parsePattern(FastDatePrinter.java:282)
> at
> org.apache.commons.lang3.time.FastDatePrinter.init(FastDatePrinter.java:149)
> at
> org.apache.commons.lang3.time.FastDatePrinter.<init>(FastDatePrinter.java:142)
> at
> org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:384)
> at
> org.apache.commons.lang3.time.FastDateFormat.<init>(FastDateFormat.java:369)
> at
> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:91)
> at
> org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:88)
> at
> org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
> at
> org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:165)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVOptions.<init>(CSVOptions.scala:139)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVOptions.<init>(CSVOptions.scala:41)
> at
> org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:105)
> at
> org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
> at
> org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:312)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:310)
> at
> org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:330)
> at
> org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:615)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> at
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
> at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
> at
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
> at org.apache.spark.sql.Dataset.org
> $apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
> at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
> at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
> at
> org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> at
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
> at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
> at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
> at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
> at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:745)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:704)
> at org.apache.spark.sql.Dataset.show(Dataset.scala:713)
> at com.join.ftp.reader.FtpReader.readFrom(FtpReader.scala:40)
> at com.join.synctool$.main(synctool.scala:41)
> at com.join.synctool.main(synctool.scala)
> 21/08/09 11:15:08 INFO SparkContext: Invoking stop() from shutdown hook
>
> ------------------------------
> igyu
>