You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Toy <no...@gmail.com> on 2018/01/23 19:33:11 UTC

I can't save DataFrame from running Spark locally

Hi,

First of all, my Spark application runs fine in AWS EMR. However, I'm
trying to run it locally to debug some issue. My application is just to
parse log files and convert to DataFrame then convert to ORC and save to
S3. However, when I run locally I get this error

java.io.IOException: /orc/dt=2018-01-23 doesn't exist
at
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at
org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)

Here's what I have in sbt

scalaVersion := "2.11.8"

val sparkVersion = "2.1.0"
val hadoopVersion = "2.7.3"
val awsVersion = "1.11.155"

lazy val sparkAndDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,

  "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
  "org.apache.hadoop" % "hadoop-common" % hadoopVersion
)

And this is where the code failed

val sparrowWriter =
sparrowCastedDf.write.mode("append").format("orc").option("compression",
"zlib")
sparrowWriter.save(sparrowOutputPath)

sparrowOutputPath is something like s3://bucket/folder and it exists I
checked it with aws command line

I put a breakpoint there and the full path looks like this
s3://bucket/orc/dt=2018-01-23 which exists.

I have also set up the credentials like this

sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")

My confusion is this code runs fine in the cluster but I get this error
running locally.

Re: I can't save DataFrame from running Spark locally

Posted by Toy <no...@gmail.com>.
Thanks, I get this error when I switched to s3a://

Exception in thread "streaming-job-executor-0" java.lang.NoSuchMethodError:
com.amazonaws.services.s3.transfer.TransferManager.<init>(Lcom/amazonaws/services/s3/AmazonS3;Ljava/util/concurrent/ThreadPoolExecutor;)V
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:287)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2669)

On Tue, 23 Jan 2018 at 15:05 Patrick Alwell <pa...@hortonworks.com> wrote:

> Spark cannot read locally from S3 without an S3a protocol; you’ll more
> than likely need a local copy of the data or you’ll need to utilize the
> proper jars to enable S3 communication from the edge to the datacenter.
>
>
>
>
> https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark
>
>
>
> Here are the jars:
> https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
>
>
>
> Looks like you already have them, in which case you’ll have to make small
> configuration changes, e.g. s3 à s3a
>
>
>
> Keep in mind: *The Amazon JARs have proven very brittle: the version of
> the Amazon libraries must match the versions against which the Hadoop
> binaries were built.*
>
>
>
>
> https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client
>
>
>
>
>
>
>
>
>
> *From: *Toy <no...@gmail.com>
> *Date: *Tuesday, January 23, 2018 at 11:33 AM
> *To: *"user@spark.apache.org" <us...@spark.apache.org>
> *Subject: *I can't save DataFrame from running Spark locally
>
>
>
> Hi,
>
>
>
> First of all, my Spark application runs fine in AWS EMR. However, I'm
> trying to run it locally to debug some issue. My application is just to
> parse log files and convert to DataFrame then convert to ORC and save to
> S3. However, when I run locally I get this error
>
>
>
> java.io.IOException: /orc/dt=2018-01-23 doesn't exist
>
> at
> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
>
> at
> org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:497)
>
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
>
> at
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>
> at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
>
> at
> org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
>
> at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
>
> at
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
>
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
>
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
>
> at
> org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
>
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
>
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>
> at
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
>
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
>
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
>
> at
> org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
>
> at
> org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
>
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
>
> at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
>
> at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
>
> at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)
>
>
>
> Here's what I have in sbt
>
>
>
> scalaVersion := "2.11.8"
>
>
>
> val sparkVersion = "2.1.0"
>
> val hadoopVersion = "2.7.3"
>
> val awsVersion = "1.11.155"
>
>
>
> lazy val sparkAndDependencies = Seq(
>
>   "org.apache.spark" %% "spark-core" % sparkVersion,
>
>   "org.apache.spark" %% "spark-sql" % sparkVersion,
>
>   "org.apache.spark" %% "spark-hive" % sparkVersion,
>
>   "org.apache.spark" %% "spark-streaming" % sparkVersion,
>
>
>
>   "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
>
>   "org.apache.hadoop" % "hadoop-common" % hadoopVersion
>
> )
>
>
>
> And this is where the code failed
>
>
>
> val sparrowWriter =
> sparrowCastedDf.write.mode("append").format("orc").option("compression",
> "zlib")
>
> sparrowWriter.save(sparrowOutputPath)
>
>
>
> sparrowOutputPath is something like s3://bucket/folder and it exists I
> checked it with aws command line
>
>
>
> I put a breakpoint there and the full path looks like this
> s3://bucket/orc/dt=2018-01-23 which exists.
>
>
>
> I have also set up the credentials like this
>
>
>
> sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
>
> sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")
>
>
>
> My confusion is this code runs fine in the cluster but I get this error
> running locally.
>
>
>
>
>

Re: I can't save DataFrame from running Spark locally

Posted by Patrick Alwell <pa...@hortonworks.com>.
Spark cannot read locally from S3 without an S3a protocol; you’ll more than likely need a local copy of the data or you’ll need to utilize the proper jars to enable S3 communication from the edge to the datacenter.

https://stackoverflow.com/questions/30385981/how-to-access-s3a-files-from-apache-spark

Here are the jars: https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws

Looks like you already have them, in which case you’ll have to make small configuration changes, e.g. s3 --> s3a

Keep in mind: The Amazon JARs have proven very brittle: the version of the Amazon libraries must match the versions against which the Hadoop binaries were built.

https://hortonworks.github.io/hdp-aws/s3-s3aclient/index.html#using-the-s3a-filesystem-client




From: Toy <no...@gmail.com>
Date: Tuesday, January 23, 2018 at 11:33 AM
To: "user@spark.apache.org" <us...@spark.apache.org>
Subject: I can't save DataFrame from running Spark locally

Hi,

First of all, my Spark application runs fine in AWS EMR. However, I'm trying to run it locally to debug some issue. My application is just to parse log files and convert to DataFrame then convert to ORC and save to S3. However, when I run locally I get this error

java.io.IOException: /orc/dt=2018-01-23 doesn't exist
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.get(Jets3tFileSystemStore.java:170)
at org.apache.hadoop.fs.s3.Jets3tFileSystemStore.retrieveINode(Jets3tFileSystemStore.java:221)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy22.retrieveINode(Unknown Source)
at org.apache.hadoop.fs.s3.S3FileSystem.getFileStatus(S3FileSystem.java:340)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:77)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:193)
at Vivace$$anonfun$processStream$1.apply(vivace.scala:170)

Here's what I have in sbt

scalaVersion := "2.11.8"

val sparkVersion = "2.1.0"
val hadoopVersion = "2.7.3"
val awsVersion = "1.11.155"

lazy val sparkAndDependencies = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-hive" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion,

  "org.apache.hadoop" % "hadoop-aws" % hadoopVersion,
  "org.apache.hadoop" % "hadoop-common" % hadoopVersion
)

And this is where the code failed

val sparrowWriter = sparrowCastedDf.write.mode("append").format("orc").option("compression", "zlib")
sparrowWriter.save(sparrowOutputPath)

sparrowOutputPath is something like s3://bucket/folder and it exists I checked it with aws command line

I put a breakpoint there and the full path looks like this s3://bucket/orc/dt=2018-01-23 which exists.

I have also set up the credentials like this

sc.hadoopConfiguration.set("fs.s3.awsAccessKeyId", "key")
sc.hadoopConfiguration.set("fs.s3.awsSecretAccessKey", "secret")

My confusion is this code runs fine in the cluster but I get this error running locally.