You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deepesh Maheshwari <de...@gmail.com> on 2015/08/11 08:14:24 UTC

Error while output JavaDStream to disk and mongodb

Hi,

I have successfully reduced my data and store it in JavaDStream<BSONObject>

Now, i want to save this data in mongodb for this i have used BSONObject
type.

But, when i try to save it, it is giving exception.
For this, i also try to save it just as *saveAsTextFile *but same exception.

Error Log : attached full log file

Excerpt from log file.

2015-08-11 11:18:52,663  INFO
(org.apache.spark.storage.BlockManagerMaster:59) - Updated info of block
broadcast_4_piece0
2015-08-11 11:18:52,664  INFO (org.apache.spark.SparkContext:59) - Created
broadcast 4 from broadcast at DAGScheduler.scala:839
2015-08-11 11:18:52,664  INFO (org.apache.spark.scheduler.DAGScheduler:59)
- Submitting 2 missing tasks from Stage 7 (MapPartitionsRDD[5] at foreach
at DirectStream.java:167)
2015-08-11 11:18:52,664  INFO
(org.apache.spark.scheduler.TaskSchedulerImpl:59) - Adding task set 7.0
with 2 tasks
2015-08-11 11:18:52,665  INFO
(org.apache.spark.scheduler.TaskSetManager:59) - Starting task 0.0 in stage
7.0 (TID 5, localhost, PROCESS_LOCAL, 1056 bytes)
2015-08-11 11:18:52,666  INFO
(org.apache.spark.scheduler.TaskSetManager:59) - Starting task 1.0 in stage
7.0 (TID 6, localhost, PROCESS_LOCAL, 1056 bytes)
2015-08-11 11:18:52,666  INFO (org.apache.spark.executor.Executor:59) -
Running task 0.0 in stage 7.0 (TID 5)
2015-08-11 11:18:52,666  INFO (org.apache.spark.executor.Executor:59) -
Running task 1.0 in stage 7.0 (TID 6)
2015-08-11 11:18:52,827  INFO
(org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Getting 2
non-empty blocks out of 2 blocks
2015-08-11 11:18:52,828  INFO
(org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Started 0
remote fetches in 1 ms
2015-08-11 11:18:52,846  INFO
(org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Getting 2
non-empty blocks out of 2 blocks
2015-08-11 11:18:52,847  INFO
(org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Started 0
remote fetches in 1 ms
2015-08-11 11:18:52,965 ERROR (org.apache.spark.executor.Executor:96)
- Exception
in task 1.0 in stage 7.0 (TID 6)
java.lang.NullPointerException
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
    at org.apache.hadoop.util.Shell.runCommand(Shell.java:445)
    at org.apache.hadoop.util.Shell.run(Shell.java:418)
    at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:739)
    at org.apache.hadoop.util.Shell.execCommand(Shell.java:722)
    at
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:633)
    at
org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:467)
    at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
    at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
    at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
    at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
    at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
    at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2015-08-11 11:18:52,965 ERROR (org.apache.spark.executor.Executor:96) -
Exception in task 0.0 in stage 7.0 (TID 5)
java.lang.NullPointerException


Code for saving output :

// for MongoDB
Configuration outputConfig = new Configuration();
 outputConfig.set("mongo.output.uri",
"mongodb://localhost:27017/test.spark");
 outputConfig.set("mongo.output.format",
"com.mongodb.hadoop.MongoOutputFormat");

JavaDStream<BSONObject> suspectedStream

suspectedStream.foreach(new Function<JavaRDD<BSONObject>, Void>() {

            private static final long serialVersionUID =
4414703053334523053L;

            @Override
            public Void call(JavaRDD<BSONObject> rdd) throws Exception {

                logger.info(rdd.first());

                rdd.saveAsTextFile("E://");

                rdd.saveAsNewAPIHadoopFile("", Object.class,
BSONObject.class, MongoOutputFormat.class,outputConfig);

                 return null;
            }
        });


Regards,
Deepesh