You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Deepesh Maheshwari <de...@gmail.com> on 2015/08/11 08:14:24 UTC
Error while output JavaDStream to disk and mongodb
Hi,
I have successfully reduced my data and store it in JavaDStream<BSONObject>
Now, i want to save this data in mongodb for this i have used BSONObject
type.
But, when i try to save it, it is giving exception.
For this, i also try to save it just as *saveAsTextFile *but same exception.
Error Log : attached full log file
Excerpt from log file.
2015-08-11 11:18:52,663 INFO
(org.apache.spark.storage.BlockManagerMaster:59) - Updated info of block
broadcast_4_piece0
2015-08-11 11:18:52,664 INFO (org.apache.spark.SparkContext:59) - Created
broadcast 4 from broadcast at DAGScheduler.scala:839
2015-08-11 11:18:52,664 INFO (org.apache.spark.scheduler.DAGScheduler:59)
- Submitting 2 missing tasks from Stage 7 (MapPartitionsRDD[5] at foreach
at DirectStream.java:167)
2015-08-11 11:18:52,664 INFO
(org.apache.spark.scheduler.TaskSchedulerImpl:59) - Adding task set 7.0
with 2 tasks
2015-08-11 11:18:52,665 INFO
(org.apache.spark.scheduler.TaskSetManager:59) - Starting task 0.0 in stage
7.0 (TID 5, localhost, PROCESS_LOCAL, 1056 bytes)
2015-08-11 11:18:52,666 INFO
(org.apache.spark.scheduler.TaskSetManager:59) - Starting task 1.0 in stage
7.0 (TID 6, localhost, PROCESS_LOCAL, 1056 bytes)
2015-08-11 11:18:52,666 INFO (org.apache.spark.executor.Executor:59) -
Running task 0.0 in stage 7.0 (TID 5)
2015-08-11 11:18:52,666 INFO (org.apache.spark.executor.Executor:59) -
Running task 1.0 in stage 7.0 (TID 6)
2015-08-11 11:18:52,827 INFO
(org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Getting 2
non-empty blocks out of 2 blocks
2015-08-11 11:18:52,828 INFO
(org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Started 0
remote fetches in 1 ms
2015-08-11 11:18:52,846 INFO
(org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Getting 2
non-empty blocks out of 2 blocks
2015-08-11 11:18:52,847 INFO
(org.apache.spark.storage.ShuffleBlockFetcherIterator:59) - Started 0
remote fetches in 1 ms
2015-08-11 11:18:52,965 ERROR (org.apache.spark.executor.Executor:96)
- Exception
in task 1.0 in stage 7.0 (TID 6)
java.lang.NullPointerException
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
at org.apache.hadoop.util.Shell.runCommand(Shell.java:445)
at org.apache.hadoop.util.Shell.run(Shell.java:418)
at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:739)
at org.apache.hadoop.util.Shell.execCommand(Shell.java:722)
at
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:633)
at
org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:467)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:799)
at
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
2015-08-11 11:18:52,965 ERROR (org.apache.spark.executor.Executor:96) -
Exception in task 0.0 in stage 7.0 (TID 5)
java.lang.NullPointerException
Code for saving output :
// for MongoDB
Configuration outputConfig = new Configuration();
outputConfig.set("mongo.output.uri",
"mongodb://localhost:27017/test.spark");
outputConfig.set("mongo.output.format",
"com.mongodb.hadoop.MongoOutputFormat");
JavaDStream<BSONObject> suspectedStream
suspectedStream.foreach(new Function<JavaRDD<BSONObject>, Void>() {
private static final long serialVersionUID =
4414703053334523053L;
@Override
public Void call(JavaRDD<BSONObject> rdd) throws Exception {
logger.info(rdd.first());
rdd.saveAsTextFile("E://");
rdd.saveAsNewAPIHadoopFile("", Object.class,
BSONObject.class, MongoOutputFormat.class,outputConfig);
return null;
}
});
Regards,
Deepesh