You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Emre Sevinc <em...@gmail.com> on 2015/03/03 15:36:03 UTC

Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

Hello,

I have a Spark Streaming application (that uses Spark 1.2.1) that listens
to an input directory, and when new JSON files are copied to that directory
processes them, and writes them to an output directory.

It uses a 3rd party library to process the multi-line JSON files (
https://github.com/alexholmes/json-mapreduce). You can see the relevant
part of the streaming application at:

  https://gist.github.com/emres/ec18ee264e4eb0dd8f1a

When I run this application locally, it works perfectly fine. But then I
wanted to test whether it could recover from failure, e.g. if I stopped it
right in the middle of processing some files. I started the streaming
application, copied 100 files to the input directory, and hit Ctrl+C when
it has alread processed about 50 files:

...
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
process : 1
[Stage
0:==========================================================================================================================>
(65 + 4) / 100]
^C

Then I started the application again, expecting that it could recover from
the checkpoint. For a while it started to read files again and then gave an
exception:

...
2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
process : 1
2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
 * * * hadoopConfiguration: itemSet
2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
(TID 0)
java.io.IOException: Missing configuration value for
multilinejsoninputformat.member
    at
com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
    at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
    at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Since in the exception it refers to a missing configuration
"multilinejsoninputformat.member", I think it is about the following line:

   ssc.ssc().sc().hadoopConfiguration().set("multilinejsoninputformat.member
", "itemSet");

And this is why I also log the value of it, and as you can see above, just
before it gives the exception in the recovery process, it shows that
"multilinejsoninputformat.member"
is set to "itemSet". But somehow it is not found during the recovery. This
exception happens only when it tries to recover from a previously
interrupted run.

I've also tried moving the above line into the "createContext" method, but
still had the same exception.

Why is that?

And how can I work around it?

-- 
Emre Sevinç
http://www.bigindustries.be/

Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

Posted by Emre Sevinc <em...@gmail.com>.
I'm adding this 3rd party library to my Maven pom.xml file so that it's
embedded into the JAR I send to spark-submit:

  <dependency>
      <groupId>json-mapreduce</groupId>
      <artifactId>json-mapreduce</artifactId>
      <version>1.0-SNAPSHOT</version>
      <exclusions>
        <exclusion>
          <groupId>javax.servlet</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>commons-io</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>commons-lang</groupId>
          <artifactId>*</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
        </exclusion>
      </exclusions>
    </dependency>


Then I build my über JAR, and then I run my Spark Streaming application via
the command line:

 spark-submit --class com.example.schemavalidator.SchemaValidatorDriver
--master local[4] --deploy-mode client target/myapp-1.0-SNAPSHOT.jar

--
Emre Sevinç


On Wed, Mar 4, 2015 at 11:19 AM, Tathagata Das <td...@databricks.com> wrote:

> That could be a corner case bug. How do you add the 3rd party library to
> the class path of the driver? Through spark-submit? Could you give the
> command you used?
>
> TD
>
> On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc <em...@gmail.com>
> wrote:
>
>> I've also tried the following:
>>
>>     Configuration hadoopConfiguration = new Configuration();
>>     hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");
>>
>>     JavaStreamingContext ssc =
>> JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
>> factory, false);
>>
>>
>> but I still get the same exception.
>>
>> Why doesn't getOrCreate ignore that Hadoop configuration part (which
>> normally works, e.g. when not recovering)?
>>
>> --
>> Emre
>>
>>
>> On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <em...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> I have a Spark Streaming application (that uses Spark 1.2.1) that
>>> listens to an input directory, and when new JSON files are copied to that
>>> directory processes them, and writes them to an output directory.
>>>
>>> It uses a 3rd party library to process the multi-line JSON files (
>>> https://github.com/alexholmes/json-mapreduce). You can see the relevant
>>> part of the streaming application at:
>>>
>>>   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a
>>>
>>> When I run this application locally, it works perfectly fine. But then I
>>> wanted to test whether it could recover from failure, e.g. if I stopped it
>>> right in the middle of processing some files. I started the streaming
>>> application, copied 100 files to the input directory, and hit Ctrl+C when
>>> it has alread processed about 50 files:
>>>
>>> ...
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> [Stage
>>> 0:==========================================================================================================================>
>>> (65 + 4) / 100]
>>> ^C
>>>
>>> Then I started the application again, expecting that it could recover
>>> from the checkpoint. For a while it started to read files again and then
>>> gave an exception:
>>>
>>> ...
>>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>>> process : 1
>>> 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
>>>  * * * hadoopConfiguration: itemSet
>>> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
>>> 0.0 (TID 0)
>>> java.io.IOException: Missing configuration value for
>>> multilinejsoninputformat.member
>>>     at
>>> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
>>>     at
>>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
>>>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>>>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>     at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>     at
>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> Since in the exception it refers to a missing configuration
>>> "multilinejsoninputformat.member", I think it is about the following line:
>>>
>>>    ssc.ssc().sc().hadoopConfiguration().set("
>>> multilinejsoninputformat.member", "itemSet");
>>>
>>> And this is why I also log the value of it, and as you can see above,
>>> just before it gives the exception in the recovery process, it shows that "multilinejsoninputformat.member"
>>> is set to "itemSet". But somehow it is not found during the recovery.
>>> This exception happens only when it tries to recover from a previously
>>> interrupted run.
>>>
>>> I've also tried moving the above line into the "createContext" method,
>>> but still had the same exception.
>>>
>>> Why is that?
>>>
>>> And how can I work around it?
>>>
>>> --
>>> Emre Sevinç
>>> http://www.bigindustries.be/
>>>
>>>
>>
>>
>> --
>> Emre Sevinc
>>
>
>


-- 
Emre Sevinc

Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

Posted by Tathagata Das <td...@databricks.com>.
That could be a corner case bug. How do you add the 3rd party library to
the class path of the driver? Through spark-submit? Could you give the
command you used?

TD

On Wed, Mar 4, 2015 at 12:42 AM, Emre Sevinc <em...@gmail.com> wrote:

> I've also tried the following:
>
>     Configuration hadoopConfiguration = new Configuration();
>     hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");
>
>     JavaStreamingContext ssc =
> JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
> factory, false);
>
>
> but I still get the same exception.
>
> Why doesn't getOrCreate ignore that Hadoop configuration part (which
> normally works, e.g. when not recovering)?
>
> --
> Emre
>
>
> On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <em...@gmail.com> wrote:
>
>> Hello,
>>
>> I have a Spark Streaming application (that uses Spark 1.2.1) that listens
>> to an input directory, and when new JSON files are copied to that directory
>> processes them, and writes them to an output directory.
>>
>> It uses a 3rd party library to process the multi-line JSON files (
>> https://github.com/alexholmes/json-mapreduce). You can see the relevant
>> part of the streaming application at:
>>
>>   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a
>>
>> When I run this application locally, it works perfectly fine. But then I
>> wanted to test whether it could recover from failure, e.g. if I stopped it
>> right in the middle of processing some files. I started the streaming
>> application, copied 100 files to the input directory, and hit Ctrl+C when
>> it has alread processed about 50 files:
>>
>> ...
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> [Stage
>> 0:==========================================================================================================================>
>> (65 + 4) / 100]
>> ^C
>>
>> Then I started the application again, expecting that it could recover
>> from the checkpoint. For a while it started to read files again and then
>> gave an exception:
>>
>> ...
>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
>> process : 1
>> 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
>>  * * * hadoopConfiguration: itemSet
>> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage
>> 0.0 (TID 0)
>> java.io.IOException: Missing configuration value for
>> multilinejsoninputformat.member
>>     at
>> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
>>     at
>> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
>>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>     at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>     at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>     at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>     at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>     at java.lang.Thread.run(Thread.java:745)
>>
>> Since in the exception it refers to a missing configuration
>> "multilinejsoninputformat.member", I think it is about the following line:
>>
>>    ssc.ssc().sc().hadoopConfiguration().set("
>> multilinejsoninputformat.member", "itemSet");
>>
>> And this is why I also log the value of it, and as you can see above,
>> just before it gives the exception in the recovery process, it shows that "multilinejsoninputformat.member"
>> is set to "itemSet". But somehow it is not found during the recovery.
>> This exception happens only when it tries to recover from a previously
>> interrupted run.
>>
>> I've also tried moving the above line into the "createContext" method,
>> but still had the same exception.
>>
>> Why is that?
>>
>> And how can I work around it?
>>
>> --
>> Emre Sevinç
>> http://www.bigindustries.be/
>>
>>
>
>
> --
> Emre Sevinc
>

Re: Why can't Spark Streaming recover from the checkpoint directory when using a third party library for processingmulti-line JSON?

Posted by Emre Sevinc <em...@gmail.com>.
I've also tried the following:

    Configuration hadoopConfiguration = new Configuration();
    hadoopConfiguration.set("multilinejsoninputformat.member", "itemSet");

    JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkpointDirectory, hadoopConfiguration,
factory, false);


but I still get the same exception.

Why doesn't getOrCreate ignore that Hadoop configuration part (which
normally works, e.g. when not recovering)?

--
Emre


On Tue, Mar 3, 2015 at 3:36 PM, Emre Sevinc <em...@gmail.com> wrote:

> Hello,
>
> I have a Spark Streaming application (that uses Spark 1.2.1) that listens
> to an input directory, and when new JSON files are copied to that directory
> processes them, and writes them to an output directory.
>
> It uses a 3rd party library to process the multi-line JSON files (
> https://github.com/alexholmes/json-mapreduce). You can see the relevant
> part of the streaming application at:
>
>   https://gist.github.com/emres/ec18ee264e4eb0dd8f1a
>
> When I run this application locally, it works perfectly fine. But then I
> wanted to test whether it could recover from failure, e.g. if I stopped it
> right in the middle of processing some files. I started the streaming
> application, copied 100 files to the input directory, and hit Ctrl+C when
> it has alread processed about 50 files:
>
> ...
> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-03-03 15:06:20 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> [Stage
> 0:==========================================================================================================================>
> (65 + 4) / 100]
> ^C
>
> Then I started the application again, expecting that it could recover from
> the checkpoint. For a while it started to read files again and then gave an
> exception:
>
> ...
> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-03-03 15:06:39 INFO  FileInputFormat:280 - Total input paths to
> process : 1
> 2015-03-03 15:06:39 WARN  SchemaValidatorDriver:145 -
>  * * * hadoopConfiguration: itemSet
> 2015-03-03 15:06:40 ERROR Executor:96 - Exception in task 0.0 in stage 0.0
> (TID 0)
> java.io.IOException: Missing configuration value for
> multilinejsoninputformat.member
>     at
> com.alexholmes.json.mapreduce.MultiLineJsonInputFormat.createRecordReader(MultiLineJsonInputFormat.java:30)
>     at
> org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:107)
>     at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:69)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>     at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:245)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
> Since in the exception it refers to a missing configuration
> "multilinejsoninputformat.member", I think it is about the following line:
>
>    ssc.ssc().sc().hadoopConfiguration().set("
> multilinejsoninputformat.member", "itemSet");
>
> And this is why I also log the value of it, and as you can see above, just
> before it gives the exception in the recovery process, it shows that "multilinejsoninputformat.member"
> is set to "itemSet". But somehow it is not found during the recovery.
> This exception happens only when it tries to recover from a previously
> interrupted run.
>
> I've also tried moving the above line into the "createContext" method, but
> still had the same exception.
>
> Why is that?
>
> And how can I work around it?
>
> --
> Emre Sevinç
> http://www.bigindustries.be/
>
>


-- 
Emre Sevinc