You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by qinwei <we...@dewmobile.net> on 2014/11/07 10:23:09 UTC

about write mongodb in mapPartitions






Hi, everyone
� � I come across with a prolem about writing data to mongodb in mapPartitions, my code is as below:� ��� ��� ��� � �val sourceRDD = sc.textFile("hdfs://host:port/sourcePath")� ��� ��� // some transformations� ��� ��val rdd=�sourceRDD�.map(mapFunc).filter(filterFunc)� ��� � val newRDD =�rdd.mapPartitions(args => {�� ��� ��� ��val mongoClient = new MongoClient("host", port)�
� ��� ��� ��val db = mongoClient.getDB("db")�
� ��� ��� ��val coll = db.getCollection("collectionA")�

� ��� ��� ��args.map(arg => {�
� ��� ��� ��� ��coll.insert(new BasicDBObject("pkg", arg))�
� ��� ��� ��� ��arg�
� ����� ����})�

� ��� ��� ��mongoClient.close()�
� ��� ��� ��args�
� ��� ��})� ��� ��� ��newRDD.saveAsTextFile("hdfs://host:port/path")� ��� � The application saved data to HDFS correctly, but not mongodb, is there someting wrong?� � I know that collecting the newRDD to driver and then saving it to mongodb will success, but will the following�saveAsTextFile�read the filesystem once again?
� � Thanks� ��

qinwei


Re: Re: about write mongodb in mapPartitions

Posted by qinwei <we...@dewmobile.net>.





Thanks for your reply!    According to your hint, the code should be like this:       // i want to save data in rdd to mongodb and hdfs        rdd.saveAsNewAPIHadoopFile()        rdd.saveAsTextFile()
    but will the application read hdfs twice?



qinwei
 From: Akhil DasDate: 2014-11-07 18:32To: qinweiCC: userSubject: Re: about write mongodb in mapPartitionsWhy not saveAsNewAPIHadoopFile?
//Define your mongoDB confsval config = new Configuration()     config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/sigmoid.output")
//Write everything to mongo rdd.saveAsNewAPIHadoopFile("file:///some/random", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)

ThanksBest Regards

On Fri, Nov 7, 2014 at 2:53 PM, qinwei <we...@dewmobile.net> wrote:

Hi, everyone
    I come across with a prolem about writing data to mongodb in mapPartitions, my code is as below:                 val sourceRDD = sc.textFile("hdfs://host:port/sourcePath")          // some transformations        val rdd= sourceRDD .map(mapFunc).filter(filterFunc)        val newRDD = rdd.mapPartitions(args => {             val mongoClient = new MongoClient("host", port) 
            val db = mongoClient.getDB("db") 
            val coll = db.getCollection("collectionA") 

            args.map(arg => { 
                coll.insert(new BasicDBObject("pkg", arg)) 
                arg 
            }) 

            mongoClient.close() 
            args 
        })            newRDD.saveAsTextFile("hdfs://host:port/path")        The application saved data to HDFS correctly, but not mongodb, is there someting wrong?    I know that collecting the newRDD to driver and then saving it to mongodb will success, but will the following saveAsTextFile read the filesystem once again?
    Thanks    

qinwei




Re: about write mongodb in mapPartitions

Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Why not saveAsNewAPIHadoopFile?


//Define your mongoDB confs

val config = new Configuration()

     config.set("mongo.output.uri", "mongodb://
127.0.0.1:27017/sigmoid.output")

//Write everything to mongo
 rdd.saveAsNewAPIHadoopFile("file:///some/random", classOf[Any],
classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]],
config)


Thanks
Best Regards

On Fri, Nov 7, 2014 at 2:53 PM, qinwei <we...@dewmobile.net> wrote:

> Hi, everyone
>
>     I come across with a prolem about writing data to mongodb in
> mapPartitions, my code is as below:
>
>          val sourceRDD = sc.textFile("hdfs://host:port/sourcePath")
>           // some transformations
>         val rdd= sourceRDD .map(mapFunc).filter(filterFunc)
>         val newRDD = rdd.mapPartitions(args => {
>             val mongoClient = new MongoClient("host", port)
>             val db = mongoClient.getDB("db")
>             val coll = db.getCollection("collectionA")
>
>             args.map(arg => {
>                 coll.insert(new BasicDBObject("pkg", arg))
>                 arg
>             })
>
>             mongoClient.close()
>             args
>         })
>
>         newRDD.saveAsTextFile("hdfs://host:port/path")
>
>     The application saved data to HDFS correctly, but not mongodb, is
> there someting wrong?
>     I know that collecting the newRDD to driver and then saving it to
> mongodb will success, but will the following saveAsTextFile read the
> filesystem once again?
>
>     Thanks
>
>
> ------------------------------
> qinwei
>

Re: Re: about write mongodb in mapPartitions

Posted by qinwei <we...@dewmobile.net>.





Thanks for your reply! As you mentioned , the insert clause is not executed as the results of args.map are never used anywhere, and after i modified the code , it works.


qinwei
 From: Tobias PfeifferDate: 2014-11-07 18:04To: qinweiCC: userSubject: Re: about write mongodb in mapPartitionsHi,

On Fri, Nov 7, 2014 at 6:23 PM, qinwei <we...@dewmobile.net> wrote:            args.map(arg => { 
                coll.insert(new BasicDBObject("pkg", arg)) 
                arg 
            }) 

            mongoClient.close() 
            args  As the results of args.map are never used anywhere, I think the loop body is not executed at all. Maybe try:
            val argsProcessed = args.map(arg => {                 coll.insert(new BasicDBObject("pkg", arg))                 arg             }) 
            mongoClient.close()             argsProcessed
Tobias




Re: about write mongodb in mapPartitions

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Fri, Nov 7, 2014 at 6:23 PM, qinwei <we...@dewmobile.net> wrote:
>
>             args.map(arg => {
>                 coll.insert(new BasicDBObject("pkg", arg))
>                 arg
>             })
>
>             mongoClient.close()
>             args
>

As the results of args.map are never used anywhere, I think the loop body
is not executed at all. Maybe try:

            val argsProcessed = args.map(arg => {
                coll.insert(new BasicDBObject("pkg", arg))
                arg
            })

            mongoClient.close()
            argsProcessed

Tobias