You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Vasyl Harasymiv <va...@gmail.com> on 2018/01/23 22:58:45 UTC

S3 token times out during data frame "write.csv"

Hi Spark Community,

Saving a data frame into a file on S3 using:

*df.write.csv(s3_location)*

If run for longer than 30 mins, the following error persists:

*The provided token has expired. (Service: Amazon S3; Status Code: 400;
Error Code: ExpiredToken;`)*

Potentially, because there is a hardcoded session limit in temporary S3
connection from Spark.

One can specify the duration as per here:

https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html

One can, of course, chunk data into sub-30 min writes. However, Is there a
way to change the token expiry parameter directly in Spark before using
"write.csv"?

Thanks a lot for any help!
Vasyl





On Tue, Jan 23, 2018 at 2:46 PM, Toy <no...@gmail.com> wrote:

> Thanks, I get this error when I switched to s3a://
>
> Exception in thread "streaming-job-executor-0"
> java.lang.NoSuchMethodError: com.amazonaws.services.s3.
> transfer.TransferManager.<init>(Lcom/amazonaws/services/
> s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
> at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(
> S3AFileSystem.java:287)
> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>
> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <pa...@hortonworks.com>
> wrote:
>
>> Spark cannot read locally from S3 without an S3a protocol; you’ll more
>> than likely need a local copy of the data or you’ll need to utilize the
>> proper jars to enable S3 communication from the edge to the datacenter.
>>
>>
>>
>> https://stackoverflow.com/questions/30385981/how-to-
>> access-s3a-files-from-apache-spark
>>
>>
>>
>> Here are the jars: https://mvnrepository.com/artifact/org.apache.hadoop/
>> hadoop-aws
>>
>>
>>
>> Looks like you already have them, in which case you’ll have to make
>> small configuration changes, e.g. s3 à s3a
>>
>>
>>
>> Keep in mind: *The Amazon JARs have proven very brittle: the version of
>> the Amazon libraries must match the versions against which the Hadoop
>> binaries were built.*
>>
>>
>>
>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.
>> html#using-the-s3a-filesystem-client
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From: *Toy <no...@gmail.com>
>> *Date: *Tuesday, January 23, 2018 at 11:33 AM
>> *To: *"user@spark.apache.org" <us...@spark.apache.org>
>> *Subject: *I can't save DataFrame from running Spark locally
>>
>>
>>
>> Hi,
>>
>>
>>
>> First of all, my Spark application runs fine in AWS EMR. However, I'm
>> trying to run it locally to debug some issue. My application is just to
>> parse log files and convert to DataFrame then convert to ORC and save to
>> S3. However, when I run locally I get this error
>>
>>
>>
>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>>
>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(
>> Jets3tFileSystemStore.java:170)
>>
>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(
>> Jets3tFileSystemStore.java:221)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>
>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>> NativeMethodAccessorImpl.java:62)
>>
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>> DelegatingMethodAccessorImpl.java:43)
>>
>> at java.lang.reflect.Method.invoke(Method.java:497)
>>
>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(
>> RetryInvocationHandler.java:191)
>>
>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(
>> RetryInvocationHandler.java:102)
>>
>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>>
>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(
>> S3FileSystem.java:340)
>>
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>
>> at org.apache.spark.sql.execution.datasources.
>> InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationComm
>> and.scala:77)
>>
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
>> sideEffectResult$lzycompute(commands.scala:58)
>>
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
>> sideEffectResult(commands.scala:56)
>>
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(
>> commands.scala:74)
>>
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> execute$1.apply(SparkPlan.scala:114)
>>
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> execute$1.apply(SparkPlan.scala:114)
>>
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> executeQuery$1.apply(SparkPlan.scala:135)
>>
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:151)
>>
>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:
>> 132)
>>
>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>>
>> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(
>> QueryExecution.scala:87)
>>
>> at org.apache.spark.sql.execution.QueryExecution.
>> toRdd(QueryExecution.scala:87)
>>
>> at org.apache.spark.sql.execution.datasources.
>> DataSource.write(DataSource.scala:492)
>>
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>>
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>>
>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>>
>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>>
>>
>>
>> Here's what I have in sbt
>>
>>
>>
>> scalaVersion := "2.11.8"
>>
>>
>>
>> val sparkVersion = "2.1.0"
>>
>> val hadoopVersion = "2.7.3"
>>
>> val awsVersion = "1.11.155"
>>
>>
>>
>> lazy val sparkAndDependencies = Seq(
>>
>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>>
>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>
>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>>
>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>>
>>
>>
>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>>
>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>>
>> )
>>
>>
>>
>> And this is where the code failed
>>
>>
>>
>> val sparrowWriter = sparrowCastedDf.write.mode("
>> append").format("orc").option("compression", "zlib")
>>
>> sparrowWriter.save(sparrowOutputPath)
>>
>>
>>
>> sparrowOutputPath is something like s3://bucket/folder and it exists I
>> checked it with aws command line
>>
>>
>>
>> I put a breakpoint there and the full path looks like this
>> s3://bucket/orc/dt=2018-01-23 which exists.
>>
>>
>>
>> I have also set up the credentials like this
>>
>>
>>
>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>>
>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>>
>>
>>
>> My confusion is this code runs fine in the cluster but I get this error
>> running locally.
>>
>>
>>
>>
>>
>

Re: S3 token times out during data frame "write.csv"

Posted by Jean Georges Perrin <jg...@jgp.net>.
Are you writing from an Amazon instance or from a on premise install to S3?
How many partitions are you writing from? Maybe you can try to “play” with repartitioning to see how it behaves?

> On Jan 23, 2018, at 17:09, Vasyl Harasymiv <va...@gmail.com> wrote:
> 
> It is about 400 million rows. S3 automatically chunks the file on their end while writing, so that's fine, e.g. creates the same file name with alphanumeric suffixes. 
> However, the write session expires due to token expiration. 
> 
> On Tue, Jan 23, 2018 at 5:03 PM, Jörn Franke <jornfranke@gmail.com <ma...@gmail.com>> wrote:
>  How large is the file?
> 
> If it is very large then you should have anyway several partitions for the output. This is also important in case you need to read again from S3 - having several files there enables parallel reading.
> 
> On 23. Jan 2018, at 23:58, Vasyl Harasymiv <vasyl.harasymiv@gmail.com <ma...@gmail.com>> wrote:
> 
>> Hi Spark Community,
>> 
>> Saving a data frame into a file on S3 using:
>> 
>> df.write.csv(s3_location)
>> 
>> If run for longer than 30 mins, the following error persists:
>> 
>> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken;`)
>> 
>> Potentially, because there is a hardcoded session limit in temporary S3 connection from Spark.
>> 
>> One can specify the duration as per here:
>> 
>> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html <https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html>
>> 
>> One can, of course, chunk data into sub-30 min writes. However, Is there a way to change the token expiry parameter directly in Spark before using "write.csv"?
>> 
>> Thanks a lot for any help!
>> Vasyl
>> 
>> 
>> 
>> 
>> 
>> On Tue, Jan 23, 2018 at 2:46 PM, Toy <noppanit.c@gmail.com <ma...@gmail.com>> wrote:
>> Thanks, I get this error when I switched to s3a://
>> 
>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>> 	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
>> 	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>> 
>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <palwell@hortonworks.com <ma...@hortonworks.com>> wrote:
>> Spark cannot read locally from S3 without an S3a protocol; you’ll more than likely need a local copy of the data or you’ll need to utilize the proper jars to enable S3 communication from the edge to the datacenter.
>> 
>>  
>> 
>> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark <https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark>
>>  
>> 
>> Here are the jars: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws <https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws>
>>  
>> 
>> Looks like you already have them, in which case you’ll have to make small configuration changes, e.g. s3 à s3a
>> 
>>  
>> 
>> Keep in mind: The Amazon JARs have proven very brittle: the version of the Amazon libraries must match the versions against which the Hadoop binaries were built.
>> 
>>  
>> 
>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client <https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client>
>>  
>> 
>>  
>> 
>>  
>> 
>>  
>> 
>> From: Toy <noppanit.c@gmail.com <ma...@gmail.com>>
>> Date: Tuesday, January 23, 2018 at 11:33 AM
>> To: "user@spark.apache.org <ma...@spark.apache.org>" <user@spark.apache.org <ma...@spark.apache.org>>
>> Subject: I can't save DataFrame from running Spark locally
>> 
>>  
>> 
>> Hi,
>> 
>>  
>> 
>> First of all, my Spark application runs fine in AWS EMR. However, I'm trying to run it locally to debug some issue. My application is just to parse log files and convert to DataFrame then convert to ORC and save to S3. However, when I run locally I get this error
>> 
>>  
>> 
>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>> 
>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
>> 
>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
>> 
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> 
>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>> 
>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>> 
>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>> 
>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
>> 
>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>> 
>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>> 
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>> 
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>> 
>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>> 
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>> 
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>> 
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>> 
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>> 
>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>> 
>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>> 
>> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>> 
>> at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>> 
>> at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>> 
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>> 
>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>> 
>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>> 
>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>> 
>>  
>> 
>> Here's what I have in sbt
>> 
>>  
>> 
>> scalaVersion := "2.11.8"
>> 
>>  
>> 
>> val sparkVersion = "2.1.0"
>> 
>> val hadoopVersion = "2.7.3"
>> 
>> val awsVersion = "1.11.155"
>> 
>>  
>> 
>> lazy val sparkAndDependencies = Seq(
>> 
>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>> 
>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>> 
>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>> 
>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>> 
>>  
>> 
>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>> 
>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>> 
>> )
>> 
>>  
>> 
>> And this is where the code failed
>> 
>>  
>> 
>> val sparrowWriter = sparrowCastedDf.write.mode("append").format("orc").option("compression", "zlib")
>> 
>> sparrowWriter.save(sparrowOutputPath)
>> 
>>  
>> 
>> sparrowOutputPath is something like s3://bucket/folder and it exists I checked it with aws command line
>> 
>>  
>> 
>> I put a breakpoint there and the full path looks like this s3://bucket/orc/dt=2018-01-23 which exists.
>> 
>>  
>> 
>> I have also set up the credentials like this
>> 
>>  
>> 
>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>> 
>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>> 
>>  
>> 
>> My confusion is this code runs fine in the cluster but I get this error running locally.
>> 
>>  
>> 
>>  
>> 
>> 
> 


Re: S3 token times out during data frame "write.csv"

Posted by Vasyl Harasymiv <va...@gmail.com>.
It is about 400 million rows. S3 automatically chunks the file on their end
while writing, so that's fine, e.g. creates the same file name with
alphanumeric suffixes.
However, the write session expires due to token expiration.

On Tue, Jan 23, 2018 at 5:03 PM, Jörn Franke <jo...@gmail.com> wrote:

>  How large is the file?
>
> If it is very large then you should have anyway several partitions for the
> output. This is also important in case you need to read again from S3 -
> having several files there enables parallel reading.
>
> On 23. Jan 2018, at 23:58, Vasyl Harasymiv <va...@gmail.com>
> wrote:
>
> Hi Spark Community,
>
> Saving a data frame into a file on S3 using:
>
> *df.write.csv(s3_location)*
>
> If run for longer than 30 mins, the following error persists:
>
> *The provided token has expired. (Service: Amazon S3; Status Code: 400;
> Error Code: ExpiredToken;`)*
>
> Potentially, because there is a hardcoded session limit in temporary S3
> connection from Spark.
>
> One can specify the duration as per here:
>
> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_
> credentials_temp_request.html
>
> One can, of course, chunk data into sub-30 min writes. However, Is there a
> way to change the token expiry parameter directly in Spark before using
> "write.csv"?
>
> Thanks a lot for any help!
> Vasyl
>
>
>
>
>
> On Tue, Jan 23, 2018 at 2:46 PM, Toy <no...@gmail.com> wrote:
>
>> Thanks, I get this error when I switched to s3a://
>>
>> Exception in thread "streaming-job-executor-0"
>> java.lang.NoSuchMethodError: com.amazonaws.services.s3.tran
>> sfer.TransferManager.<init>(Lcom/amazonaws/services/s3/
>> AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSys
>> tem.java:287)
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>>
>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <pa...@hortonworks.com>
>> wrote:
>>
>>> Spark cannot read locally from S3 without an S3a protocol; you’ll more
>>> than likely need a local copy of the data or you’ll need to utilize the
>>> proper jars to enable S3 communication from the edge to the datacenter.
>>>
>>>
>>>
>>> https://stackoverflow.com/questions/30385981/how-to-access-
>>> s3a-files-from-apache-spark
>>>
>>>
>>>
>>> Here are the jars: https://mvnrepository.com/arti
>>> fact/org.apache.hadoop/hadoop-aws
>>>
>>>
>>>
>>> Looks like you already have them, in which case you’ll have to make
>>> small configuration changes, e.g. s3 à s3a
>>>
>>>
>>>
>>> Keep in mind: *The Amazon JARs have proven very brittle: the version of
>>> the Amazon libraries must match the versions against which the Hadoop
>>> binaries were built.*
>>>
>>>
>>>
>>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.htm
>>> l#using-the-s3a-filesystem-client
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From: *Toy <no...@gmail.com>
>>> *Date: *Tuesday, January 23, 2018 at 11:33 AM
>>> *To: *"user@spark.apache.org" <us...@spark.apache.org>
>>> *Subject: *I can't save DataFrame from running Spark locally
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> First of all, my Spark application runs fine in AWS EMR. However, I'm
>>> trying to run it locally to debug some issue. My application is just to
>>> parse log files and convert to DataFrame then convert to ORC and save to
>>> S3. However, when I run locally I get this error
>>>
>>>
>>>
>>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>>>
>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFile
>>> SystemStore.java:170)
>>>
>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(
>>> Jets3tFileSystemStore.java:221)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>>
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>
>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMeth
>>> od(RetryInvocationHandler.java:191)
>>>
>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(Ret
>>> ryInvocationHandler.java:102)
>>>
>>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>>>
>>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSys
>>> tem.java:340)
>>>
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>>
>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>>>
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.s
>>> ideEffectResult$lzycompute(commands.scala:58)
>>>
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.s
>>> ideEffectResult(commands.scala:56)
>>>
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
>>> doExecute(commands.scala:74)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.
>>> apply(SparkPlan.scala:114)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.
>>> apply(SparkPlan.scala:114)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQue
>>> ry$1.apply(SparkPlan.scala:135)
>>>
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:151)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(
>>> SparkPlan.scala:132)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>>>
>>> at org.apache.spark.sql.execution.QueryExecution.toRdd$
>>> lzycompute(QueryExecution.scala:87)
>>>
>>> at org.apache.spark.sql.execution.QueryExecution.toRdd(
>>> QueryExecution.scala:87)
>>>
>>> at org.apache.spark.sql.execution.datasources.DataSource.write(
>>> DataSource.scala:492)
>>>
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>>>
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>>>
>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>>>
>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>>>
>>>
>>>
>>> Here's what I have in sbt
>>>
>>>
>>>
>>> scalaVersion := "2.11.8"
>>>
>>>
>>>
>>> val sparkVersion = "2.1.0"
>>>
>>> val hadoopVersion = "2.7.3"
>>>
>>> val awsVersion = "1.11.155"
>>>
>>>
>>>
>>> lazy val sparkAndDependencies = Seq(
>>>
>>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>>>
>>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>>
>>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>>>
>>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>>>
>>>
>>>
>>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>>>
>>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>>>
>>> )
>>>
>>>
>>>
>>> And this is where the code failed
>>>
>>>
>>>
>>> val sparrowWriter = sparrowCastedDf.write.mode("ap
>>> pend").format("orc").option("compression", "zlib")
>>>
>>> sparrowWriter.save(sparrowOutputPath)
>>>
>>>
>>>
>>> sparrowOutputPath is something like s3://bucket/folder and it exists I
>>> checked it with aws command line
>>>
>>>
>>>
>>> I put a breakpoint there and the full path looks like this
>>> s3://bucket/orc/dt=2018-01-23 which exists.
>>>
>>>
>>>
>>> I have also set up the credentials like this
>>>
>>>
>>>
>>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>>>
>>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>>>
>>>
>>>
>>> My confusion is this code runs fine in the cluster but I get this error
>>> running locally.
>>>
>>>
>>>
>>>
>>>
>>
>

Re: S3 token times out during data frame "write.csv"

Posted by Jörn Franke <jo...@gmail.com>.
 How large is the file?

If it is very large then you should have anyway several partitions for the output. This is also important in case you need to read again from S3 - having several files there enables parallel reading.

> On 23. Jan 2018, at 23:58, Vasyl Harasymiv <va...@gmail.com> wrote:
> 
> Hi Spark Community,
> 
> Saving a data frame into a file on S3 using:
> 
> df.write.csv(s3_location)
> 
> If run for longer than 30 mins, the following error persists:
> 
> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken;`)
> 
> Potentially, because there is a hardcoded session limit in temporary S3 connection from Spark.
> 
> One can specify the duration as per here:
> 
> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html
> 
> One can, of course, chunk data into sub-30 min writes. However, Is there a way to change the token expiry parameter directly in Spark before using "write.csv"?
> 
> Thanks a lot for any help!
> Vasyl
> 
> 
> 
> 
> 
>> On Tue, Jan 23, 2018 at 2:46 PM, Toy <no...@gmail.com> wrote:
>> Thanks, I get this error when I switched to s3a://
>> 
>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>> 	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
>> 	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>> 
>>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <pa...@hortonworks.com> wrote:
>>> Spark cannot read locally from S3 without an S3a protocol; you’ll more than likely need a local copy of the data or you’ll need to utilize the proper jars to enable S3 communication from the edge to the datacenter.
>>> 
>>>  
>>> 
>>> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
>>> 
>>>  
>>> 
>>> Here are the jars: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
>>> 
>>>  
>>> 
>>> Looks like you already have them, in which case you’ll have to make small configuration changes, e.g. s3 à s3a
>>> 
>>>  
>>> 
>>> Keep in mind: The Amazon JARs have proven very brittle: the version of the Amazon libraries must match the versions against which the Hadoop binaries were built.
>>> 
>>>  
>>> 
>>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client
>>> 
>>>  
>>> 
>>>  
>>> 
>>>  
>>> 
>>>  
>>> 
>>> From: Toy <no...@gmail.com>
>>> Date: Tuesday, January 23, 2018 at 11:33 AM
>>> To: "user@spark.apache.org" <us...@spark.apache.org>
>>> Subject: I can't save DataFrame from running Spark locally
>>> 
>>>  
>>> 
>>> Hi,
>>> 
>>>  
>>> 
>>> First of all, my Spark application runs fine in AWS EMR. However, I'm trying to run it locally to debug some issue. My application is just to parse log files and convert to DataFrame then convert to ORC and save to S3. However, when I run locally I get this error
>>> 
>>>  
>>> 
>>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>>> 
>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
>>> 
>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
>>> 
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> 
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> 
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> 
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>> 
>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>> 
>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>> 
>>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>>> 
>>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
>>> 
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>> 
>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>>> 
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>>> 
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>>> 
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>>> 
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>>> 
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>>> 
>>> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>>> 
>>> at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>>> 
>>> at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>>> 
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>>> 
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>>> 
>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>>> 
>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>>> 
>>>  
>>> 
>>> Here's what I have in sbt
>>> 
>>>  
>>> 
>>> scalaVersion := "2.11.8"
>>> 
>>>  
>>> 
>>> val sparkVersion = "2.1.0"
>>> 
>>> val hadoopVersion = "2.7.3"
>>> 
>>> val awsVersion = "1.11.155"
>>> 
>>>  
>>> 
>>> lazy val sparkAndDependencies = Seq(
>>> 
>>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>>> 
>>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>> 
>>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>>> 
>>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>>> 
>>>  
>>> 
>>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>>> 
>>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>>> 
>>> )
>>> 
>>>  
>>> 
>>> And this is where the code failed
>>> 
>>>  
>>> 
>>> val sparrowWriter = sparrowCastedDf.write.mode("append").format("orc").option("compression", "zlib")
>>> 
>>> sparrowWriter.save(sparrowOutputPath)
>>> 
>>>  
>>> 
>>> sparrowOutputPath is something like s3://bucket/folder and it exists I checked it with aws command line
>>> 
>>>  
>>> 
>>> I put a breakpoint there and the full path looks like this s3://bucket/orc/dt=2018-01-23 which exists.
>>> 
>>>  
>>> 
>>> I have also set up the credentials like this
>>> 
>>>  
>>> 
>>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>>> 
>>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>>> 
>>>  
>>> 
>>> My confusion is this code runs fine in the cluster but I get this error running locally.
>>> 
>>>  
>>> 
>>>  
>>> 
> 

Re: S3 token times out during data frame "write.csv"

Posted by Jörn Franke <jo...@gmail.com>.
He is using CSV and either ORC or parquet would be fine.

> On 28. Jan 2018, at 06:49, Gourav Sengupta <go...@gmail.com> wrote:
> 
> Hi,
> 
> There is definitely a parameter while creating temporary security credential to mention the number of minutes those credentials will be active. There is an upper limit ofcourse which is around 3 days in case I remember correctly and the default, as you can see, is 30 mins.
> 
> Can you let me know:
> 1. how you are generating the credentials? (the exact code)
> 2. doing S3 writes from local network is super suboptimal anyway given the network latency and cost associated with it. So why are you doing it?
> 3. when you are porting your code to EMR, do you still use accesskeys or do you have to change your code?
> 4. Any particular reason why your partition value has "-" in it, therefore I am trying to understand why is the partition value 2018-01-23 instead of 20180123? Are you considering the partition type to be String?
> 5. Have you heard of and tried using spot instances, the cost is so ridiculously low at that point of time, that there is no need to be running the code locally (I am expecting that since you can run the code locally, therefore the EMR instance size and node type would be small)
> 6. Why are you not using Parquet format and using ORC instead? I think that many more products use Parquet and only HIVE uses ORC format.
> 
> Regards,
> Gourav Sengupta
> 
>> On Tue, Jan 23, 2018 at 10:58 PM, Vasyl Harasymiv <va...@gmail.com> wrote:
>> Hi Spark Community,
>> 
>> Saving a data frame into a file on S3 using:
>> 
>> df.write.csv(s3_location)
>> 
>> If run for longer than 30 mins, the following error persists:
>> 
>> The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken;`)
>> 
>> Potentially, because there is a hardcoded session limit in temporary S3 connection from Spark.
>> 
>> One can specify the duration as per here:
>> 
>> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_temp_request.html
>> 
>> One can, of course, chunk data into sub-30 min writes. However, Is there a way to change the token expiry parameter directly in Spark before using "write.csv"?
>> 
>> Thanks a lot for any help!
>> Vasyl
>> 
>> 
>> 
>> 
>> 
>>> On Tue, Jan 23, 2018 at 2:46 PM, Toy <no...@gmail.com> wrote:
>>> Thanks, I get this error when I switched to s3a://
>>> 
>>> Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError: com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>>> 	at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
>>> 	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>>> 
>>>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <pa...@hortonworks.com> wrote:
>>>> Spark cannot read locally from S3 without an S3a protocol; you’ll more than likely need a local copy of the data or you’ll need to utilize the proper jars to enable S3 communication from the edge to the datacenter.
>>>> 
>>>>  
>>>> 
>>>> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
>>>> 
>>>>  
>>>> 
>>>> Here are the jars: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
>>>> 
>>>>  
>>>> 
>>>> Looks like you already have them, in which case you’ll have to make small configuration changes, e.g. s3 à s3a
>>>> 
>>>>  
>>>> 
>>>> Keep in mind: The Amazon JARs have proven very brittle: the version of the Amazon libraries must match the versions against which the Hadoop binaries were built.
>>>> 
>>>>  
>>>> 
>>>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>>>> From: Toy <no...@gmail.com>
>>>> Date: Tuesday, January 23, 2018 at 11:33 AM
>>>> To: "user@spark.apache.org" <us...@spark.apache.org>
>>>> Subject: I can't save DataFrame from running Spark locally
>>>> 
>>>>  
>>>> 
>>>> Hi,
>>>> 
>>>>  
>>>> 
>>>> First of all, my Spark application runs fine in AWS EMR. However, I'm trying to run it locally to debug some issue. My application is just to parse log files and convert to DataFrame then convert to ORC and save to S3. However, when I run  locally I get this error
>>>> 
>>>>  
>>>> 
>>>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>>>> 
>>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
>>>> 
>>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
>>>> 
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> 
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> 
>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> 
>>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>> 
>>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>>>> 
>>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>>>> 
>>>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>>>> 
>>>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
>>>> 
>>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>>> 
>>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>>>> 
>>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>>>> 
>>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>>>> 
>>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>>>> 
>>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>>>> 
>>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>>>> 
>>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>>>> 
>>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 
>>>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>>>> 
>>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>>>> 
>>>> at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>>>> 
>>>> at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>>>> 
>>>> at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>>>> 
>>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>>>> 
>>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>>>> 
>>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>>>> 
>>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>>>> 
>>>>  
>>>> 
>>>> Here's what I have in sbt
>>>> 
>>>>  
>>>> 
>>>> scalaVersion := "2.11.8"
>>>> 
>>>>  
>>>> 
>>>> val sparkVersion = "2.1.0"
>>>> 
>>>> val hadoopVersion = "2.7.3"
>>>> 
>>>> val awsVersion = "1.11.155"
>>>> 
>>>>  
>>>> 
>>>> lazy val sparkAndDependencies = Seq(
>>>> 
>>>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>>>> 
>>>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>>> 
>>>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>>>> 
>>>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>>>> 
>>>>  
>>>> 
>>>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>>>> 
>>>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>>>> 
>>>> )
>>>> 
>>>>  
>>>> 
>>>> And this is where the code failed
>>>> 
>>>>  
>>>> 
>>>> val sparrowWriter = sparrowCastedDf.write.mode("append").format("orc").option("compression", "zlib")
>>>> 
>>>> sparrowWriter.save(sparrowOutputPath)
>>>> 
>>>>  
>>>> 
>>>> sparrowOutputPath is something like s3://bucket/folder and it exists I checked it with aws command line
>>>> 
>>>>  
>>>> 
>>>> I put a breakpoint there and the full path looks like this s3://bucket/orc/dt=2018-01-23 which exists.
>>>> 
>>>>  
>>>> 
>>>> I have also set up the credentials like this
>>>> 
>>>>  
>>>> 
>>>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>>>> 
>>>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>>>> 
>>>>  
>>>> 
>>>> My confusion is this code runs fine in the cluster but I get this error running locally.
>>>> 
>>>>  
>>>> 
>>>>  
>>>> 
>> 
> 

Re: S3 token times out during data frame "write.csv"

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

There is definitely a parameter while creating temporary security
credential to mention the number of minutes those credentials will be
active. There is an upper limit ofcourse which is around 3 days in case I
remember correctly and the default, as you can see, is 30 mins.

Can you let me know:
1. how you are generating the credentials? (the exact code)
2. doing S3 writes from local network is super suboptimal anyway given the
network latency and cost associated with it. So why are you doing it?
3. when you are porting your code to EMR, do you still use accesskeys or do
you have to change your code?
4. Any particular reason why your partition value has "-" in it, therefore
I am trying to understand why is the partition value 2018-01-23 instead of
20180123? Are you considering the partition type to be String?
5. Have you heard of and tried using spot instances, the cost is so
ridiculously low at that point of time, that there is no need to be running
the code locally (I am expecting that since you can run the code locally,
therefore the EMR instance size and node type would be small)
6. Why are you not using Parquet format and using ORC instead? I think that
many more products use Parquet and only HIVE uses ORC format.

Regards,
Gourav Sengupta

On Tue, Jan 23, 2018 at 10:58 PM, Vasyl Harasymiv <vasyl.harasymiv@gmail.com
> wrote:

> Hi Spark Community,
>
> Saving a data frame into a file on S3 using:
>
> *df.write.csv(s3_location)*
>
> If run for longer than 30 mins, the following error persists:
>
> *The provided token has expired. (Service: Amazon S3; Status Code: 400;
> Error Code: ExpiredToken;`)*
>
> Potentially, because there is a hardcoded session limit in temporary S3
> connection from Spark.
>
> One can specify the duration as per here:
>
> https://docs.aws.amazon.com/IAM/latest/UserGuide/id_
> credentials_temp_request.html
>
> One can, of course, chunk data into sub-30 min writes. However, Is there a
> way to change the token expiry parameter directly in Spark before using
> "write.csv"?
>
> Thanks a lot for any help!
> Vasyl
>
>
>
>
>
> On Tue, Jan 23, 2018 at 2:46 PM, Toy <no...@gmail.com> wrote:
>
>> Thanks, I get this error when I switched to s3a://
>>
>> Exception in thread "streaming-job-executor-0"
>> java.lang.NoSuchMethodError: com.amazonaws.services.s3.tran
>> sfer.TransferManager.<init>(Lcom/amazonaws/services/s3/
>> AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
>> at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSys
>> tem.java:287)
>> at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)
>>
>> On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <pa...@hortonworks.com>
>> wrote:
>>
>>> Spark cannot read locally from S3 without an S3a protocol; you’ll more
>>> than likely need a local copy of the data or you’ll need to utilize the
>>> proper jars to enable S3 communication from the edge to the datacenter.
>>>
>>>
>>>
>>> https://stackoverflow.com/questions/30385981/how-to-access-
>>> s3a-files-from-apache-spark
>>>
>>>
>>>
>>> Here are the jars: https://mvnrepository.com/arti
>>> fact/org.apache.hadoop/hadoop-aws
>>>
>>>
>>>
>>> Looks like you already have them, in which case you’ll have to make
>>> small configuration changes, e.g. s3 à s3a
>>>
>>>
>>>
>>> Keep in mind: *The Amazon JARs have proven very brittle: the version of
>>> the Amazon libraries must match the versions against which the Hadoop
>>> binaries were built.*
>>>
>>>
>>>
>>> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.htm
>>> l#using-the-s3a-filesystem-client
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From: *Toy <no...@gmail.com>
>>> *Date: *Tuesday, January 23, 2018 at 11:33 AM
>>> *To: *"user@spark.apache.org" <us...@spark.apache.org>
>>> *Subject: *I can't save DataFrame from running Spark locally
>>>
>>>
>>>
>>> Hi,
>>>
>>>
>>>
>>> First of all, my Spark application runs fine in AWS EMR. However, I'm
>>> trying to run it locally to debug some issue. My application is just to
>>> parse log files and convert to DataFrame then convert to ORC and save to
>>> S3. However, when I run locally I get this error
>>>
>>>
>>>
>>> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>>>
>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFile
>>> SystemStore.java:170)
>>>
>>> at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(
>>> Jets3tFileSystemStore.java:221)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>>
>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>
>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMeth
>>> od(RetryInvocationHandler.java:191)
>>>
>>> at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(Ret
>>> ryInvocationHandler.java:102)
>>>
>>> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>>>
>>> at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSys
>>> tem.java:340)
>>>
>>> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>>>
>>> at org.apache.spark.sql.execution.datasources.InsertIntoHadoopF
>>> sRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>>>
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.s
>>> ideEffectResult$lzycompute(commands.scala:58)
>>>
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.s
>>> ideEffectResult(commands.scala:56)
>>>
>>> at org.apache.spark.sql.execution.command.ExecutedCommandExec.
>>> doExecute(commands.scala:74)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.
>>> apply(SparkPlan.scala:114)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.
>>> apply(SparkPlan.scala:114)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQue
>>> ry$1.apply(SparkPlan.scala:135)
>>>
>>> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperati
>>> onScope.scala:151)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan.executeQuery(
>>> SparkPlan.scala:132)
>>>
>>> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>>>
>>> at org.apache.spark.sql.execution.QueryExecution.toRdd$
>>> lzycompute(QueryExecution.scala:87)
>>>
>>> at org.apache.spark.sql.execution.QueryExecution.toRdd(
>>> QueryExecution.scala:87)
>>>
>>> at org.apache.spark.sql.execution.datasources.DataSource.write(
>>> DataSource.scala:492)
>>>
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>>>
>>> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>>>
>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>>>
>>> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>>>
>>>
>>>
>>> Here's what I have in sbt
>>>
>>>
>>>
>>> scalaVersion := "2.11.8"
>>>
>>>
>>>
>>> val sparkVersion = "2.1.0"
>>>
>>> val hadoopVersion = "2.7.3"
>>>
>>> val awsVersion = "1.11.155"
>>>
>>>
>>>
>>> lazy val sparkAndDependencies = Seq(
>>>
>>>   "org.apache.spark" %% "spark-core" % sparkVersion,
>>>
>>>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>>
>>>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>>>
>>>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>>>
>>>
>>>
>>>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>>>
>>>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>>>
>>> )
>>>
>>>
>>>
>>> And this is where the code failed
>>>
>>>
>>>
>>> val sparrowWriter = sparrowCastedDf.write.mode("ap
>>> pend").format("orc").option("compression", "zlib")
>>>
>>> sparrowWriter.save(sparrowOutputPath)
>>>
>>>
>>>
>>> sparrowOutputPath is something like s3://bucket/folder and it exists I
>>> checked it with aws command line
>>>
>>>
>>>
>>> I put a breakpoint there and the full path looks like this
>>> s3://bucket/orc/dt=2018-01-23 which exists.
>>>
>>>
>>>
>>> I have also set up the credentials like this
>>>
>>>
>>>
>>> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>>>
>>> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>>>
>>>
>>>
>>> My confusion is this code runs fine in the cluster but I get this error
>>> running locally.
>>>
>>>
>>>
>>>
>>>
>>
>