You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by qinwei <we...@dewmobile.net> on 2014/11/07 10:23:09 UTC
about write mongodb in mapPartitions
Hi, everyone
� � I come across with a prolem about writing data to mongodb in mapPartitions, my code is as below:� ��� ��� ��� � �val sourceRDD = sc.textFile("hdfs://host:port/sourcePath")� ��� ��� // some transformations� ��� ��val rdd=�sourceRDD�.map(mapFunc).filter(filterFunc)� ��� � val newRDD =�rdd.mapPartitions(args => {�� ��� ��� ��val mongoClient = new MongoClient("host", port)�
� ��� ��� ��val db = mongoClient.getDB("db")�
� ��� ��� ��val coll = db.getCollection("collectionA")�
� ��� ��� ��args.map(arg => {�
� ��� ��� ��� ��coll.insert(new BasicDBObject("pkg", arg))�
� ��� ��� ��� ��arg�
� ����� ����})�
� ��� ��� ��mongoClient.close()�
� ��� ��� ��args�
� ��� ��})� ��� ��� ��newRDD.saveAsTextFile("hdfs://host:port/path")� ��� � The application saved data to HDFS correctly, but not mongodb, is there someting wrong?� � I know that collecting the newRDD to driver and then saving it to mongodb will success, but will the following�saveAsTextFile�read the filesystem once again?
� � Thanks� ��
qinwei
Re: Re: about write mongodb in mapPartitions
Posted by qinwei <we...@dewmobile.net>.
Thanks for your reply! According to your hint, the code should be like this: // i want to save data in rdd to mongodb and hdfs rdd.saveAsNewAPIHadoopFile() rdd.saveAsTextFile()
but will the application read hdfs twice?
qinwei
From: Akhil DasDate: 2014-11-07 18:32To: qinweiCC: userSubject: Re: about write mongodb in mapPartitionsWhy not saveAsNewAPIHadoopFile?
//Define your mongoDB confsval config = new Configuration() config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/sigmoid.output")
//Write everything to mongo rdd.saveAsNewAPIHadoopFile("file:///some/random", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
ThanksBest Regards
On Fri, Nov 7, 2014 at 2:53 PM, qinwei <we...@dewmobile.net> wrote:
Hi, everyone
I come across with a prolem about writing data to mongodb in mapPartitions, my code is as below: val sourceRDD = sc.textFile("hdfs://host:port/sourcePath") // some transformations val rdd= sourceRDD .map(mapFunc).filter(filterFunc) val newRDD = rdd.mapPartitions(args => { val mongoClient = new MongoClient("host", port)
val db = mongoClient.getDB("db")
val coll = db.getCollection("collectionA")
args.map(arg => {
coll.insert(new BasicDBObject("pkg", arg))
arg
})
mongoClient.close()
args
}) newRDD.saveAsTextFile("hdfs://host:port/path") The application saved data to HDFS correctly, but not mongodb, is there someting wrong? I know that collecting the newRDD to driver and then saving it to mongodb will success, but will the following saveAsTextFile read the filesystem once again?
Thanks
qinwei
Re: about write mongodb in mapPartitions
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Why not saveAsNewAPIHadoopFile?
//Define your mongoDB confs
val config = new Configuration()
config.set("mongo.output.uri", "mongodb://
127.0.0.1:27017/sigmoid.output")
//Write everything to mongo
rdd.saveAsNewAPIHadoopFile("file:///some/random", classOf[Any],
classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
config)
Thanks
Best Regards
On Fri, Nov 7, 2014 at 2:53 PM, qinwei <we...@dewmobile.net> wrote:
> Hi, everyone
>
> I come across with a prolem about writing data to mongodb in
> mapPartitions, my code is as below:
>
> val sourceRDD = sc.textFile("hdfs://host:port/sourcePath")
> // some transformations
> val rdd= sourceRDD .map(mapFunc).filter(filterFunc)
> val newRDD = rdd.mapPartitions(args => {
> val mongoClient = new MongoClient("host", port)
> val db = mongoClient.getDB("db")
> val coll = db.getCollection("collectionA")
>
> args.map(arg => {
> coll.insert(new BasicDBObject("pkg", arg))
> arg
> })
>
> mongoClient.close()
> args
> })
>
> newRDD.saveAsTextFile("hdfs://host:port/path")
>
> The application saved data to HDFS correctly, but not mongodb, is
> there someting wrong?
> I know that collecting the newRDD to driver and then saving it to
> mongodb will success, but will the following saveAsTextFile read the
> filesystem once again?
>
> Thanks
>
>
> ------------------------------
> qinwei
>
Re: Re: about write mongodb in mapPartitions
Posted by qinwei <we...@dewmobile.net>.
Thanks for your reply! As you mentioned , the insert clause is not executed as the results of args.map are never used anywhere, and after i modified the code , it works.
qinwei
From: Tobias PfeifferDate: 2014-11-07 18:04To: qinweiCC: userSubject: Re: about write mongodb in mapPartitionsHi,
On Fri, Nov 7, 2014 at 6:23 PM, qinwei <we...@dewmobile.net> wrote: args.map(arg => {
coll.insert(new BasicDBObject("pkg", arg))
arg
})
mongoClient.close()
args As the results of args.map are never used anywhere, I think the loop body is not executed at all. Maybe try:
val argsProcessed = args.map(arg => { coll.insert(new BasicDBObject("pkg", arg)) arg })
mongoClient.close() argsProcessed
Tobias
Re: about write mongodb in mapPartitions
Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,
On Fri, Nov 7, 2014 at 6:23 PM, qinwei <we...@dewmobile.net> wrote:
>
> args.map(arg => {
> coll.insert(new BasicDBObject("pkg", arg))
> arg
> })
>
> mongoClient.close()
> args
>
As the results of args.map are never used anywhere, I think the loop body
is not executed at all. Maybe try:
val argsProcessed = args.map(arg => {
coll.insert(new BasicDBObject("pkg", arg))
arg
})
mongoClient.close()
argsProcessed
Tobias