You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Cody Koeninger <co...@koeninger.org> on 2014/11/04 18:34:30 UTC

Hadoop configuration for checkpointing

3 quick questions, then some background:

1.  Is there a reason not to document the fact that spark.hadoop.* is
copied from spark config into hadoop config?

2.  Is there a reason StreamingContext.getOrCreate defaults to a blank
hadoop configuration rather than
org.apache.spark.deploy.SparkHadoopUtil.get.conf,
which would pull values from spark config?

3.  If I submit a PR to address those issues, is it likely to be lost in
the 1.2 scramble :)


Background:

I have a streaming job that is not recoverable from checkpoint, because the
s3 credentials were originally set using
sparkContext.hadoopConfiguration.set.

Checkpointing saves the spark config, but not the transient spark context,
so does not save the s3 credentials unless they were originally present in
the spark config.

Providing a hadoop config to getOrCreate only uses that hadoop config for
CheckpointReader's initial load of the checkpoint file.  It does not copy
the hadoop config into the newly created spark context, and so the
immediately following attempt to restore DStreamCheckpointData fails for
lack of credentials.

I think the cleanest way to handle this would be to encourage people to set
hadoop configuration in the spark config, and for
StreamingContext.getOrCreate to use SparkHadoopUtil rather than a blank
config.


Relevant stack trace:

14/11/04 15:37:30 INFO CheckpointReader: Checkpoint files found: s3n://XXX

14/11/04 15:37:30 INFO CheckpointReader: Attempting to load checkpoint from
file s3n://XXX

14/11/04 15:37:30 INFO NativeS3FileSystem: Opening 's3n://XXX

14/11/04 15:37:31 INFO Checkpoint: Checkpoint for time 1415114220000 ms
validated

14/11/04 15:37:31 INFO CheckpointReader: Checkpoint successfully loaded
from file s3n://XXX

14/11/04 15:37:31 INFO CheckpointReader: Checkpoint was generated at time
1415114220000 ms

14/11/04 15:37:33 INFO DStreamGraph: Restoring checkpoint data

14/11/04 15:37:33 INFO ForEachDStream: Restoring checkpoint data

14/11/04 15:37:33 INFO StateDStream: Restoring checkpoint data

14/11/04 15:37:33 INFO DStreamCheckpointData: Restoring checkpointed RDD
for time 1415097420000 ms from file 's3n://XXX

Exception in thread "main" java.lang.IllegalArgumentException: AWS Access
Key ID and Secret Access Key must be specified as

the username or password (respectively) of a s3n URL, or by setting the
fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties
(respectively).

at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)

at
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:73)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)

at
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)

at org.apache.hadoop.fs.s3native.$Proxy8.initialize(Unknown Source)

at
org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:272)

at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2397)

at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)

at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)

at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)

at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)

at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)

at org.apache.spark.rdd.CheckpointRDD.<init>(CheckpointRDD.scala:42)

at org.apache.spark.SparkContext.checkpointFile(SparkContext.scala:824)

at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:112)

at
org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$restore$1.apply(DStreamCheckpointData.scala:109)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)

at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)

at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)

at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)

at
org.apache.spark.streaming.dstream.DStreamCheckpointData.restore(DStreamCheckpointData.scala:109)

at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:397)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:398)

at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:398)

at scala.collection.immutable.List.foreach(List.scala:318)

at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:398)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)

at
org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

at
org.apache.spark.streaming.DStreamGraph.restoreCheckpointData(DStreamGraph.scala:149)

at
org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:131)

at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:552)

at
org.apache.spark.streaming.StreamingContext$$anonfun$getOrCreate$1.apply(StreamingContext.scala:552)

at scala.Option.map(Option.scala:145)

at
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552)

Re: Hadoop configuration for checkpointing

Posted by Sean Owen <so...@cloudera.com>.
Let me crash this thread to suggest this *might* be related to this
problem I'm trying to solve:
https://issues.apache.org/jira/browse/SPARK-4196

Basically the question there is: this blank Configuration object gets
made on the driver in the saveAsNewAPIHadoopFiles call, and seems to
need to be serialized to use it in foreachRDD. This fails for me and
at least 2 other users I know. But I feel like I am missing something.

If you're investigating handling of Configuration when enabling
checkpointing with the getOrCreate method, have a look and see if you
have any comments vis-a-vis this JIRA.


On Tue, Nov 4, 2014 at 5:48 PM, Marcelo Vanzin <va...@cloudera.com> wrote:
> On Tue, Nov 4, 2014 at 9:34 AM, Cody Koeninger <co...@koeninger.org> wrote:
>> 2.  Is there a reason StreamingContext.getOrCreate defaults to a blank
>> hadoop configuration rather than
>> org.apache.spark.deploy.SparkHadoopUtil.get.conf,
>> which would pull values from spark config?
>
> This is probably something I overlooked when I changed the rest of the
> code to use SparkHadoopUtil.get.conf. Feel free to send a PR for it,
> we should get it into 1.2 so that all code creates configuration
> objects in a consistent way.
>
> --
> Marcelo
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Hadoop configuration for checkpointing

Posted by Cody Koeninger <co...@koeninger.org>.
Opened
https://issues.apache.org/jira/browse/SPARK-4229

Sent a PR
https://github.com/apache/spark/pull/3102

On Tue, Nov 4, 2014 at 11:48 AM, Marcelo Vanzin <va...@cloudera.com> wrote:

> On Tue, Nov 4, 2014 at 9:34 AM, Cody Koeninger <co...@koeninger.org> wrote:
> > 2.  Is there a reason StreamingContext.getOrCreate defaults to a blank
> > hadoop configuration rather than
> > org.apache.spark.deploy.SparkHadoopUtil.get.conf,
> > which would pull values from spark config?
>
> This is probably something I overlooked when I changed the rest of the
> code to use SparkHadoopUtil.get.conf. Feel free to send a PR for it,
> we should get it into 1.2 so that all code creates configuration
> objects in a consistent way.
>
> --
> Marcelo
>

Re: Hadoop configuration for checkpointing

Posted by Marcelo Vanzin <va...@cloudera.com>.
On Tue, Nov 4, 2014 at 9:34 AM, Cody Koeninger <co...@koeninger.org> wrote:
> 2.  Is there a reason StreamingContext.getOrCreate defaults to a blank
> hadoop configuration rather than
> org.apache.spark.deploy.SparkHadoopUtil.get.conf,
> which would pull values from spark config?

This is probably something I overlooked when I changed the rest of the
code to use SparkHadoopUtil.get.conf. Feel free to send a PR for it,
we should get it into 1.2 so that all code creates configuration
objects in a consistent way.

-- 
Marcelo

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org