You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Samarth Mailinglist <ma...@gmail.com> on 2014/06/04 18:57:06 UTC

Re: Using mongo with PySpark

Thanks a lot, sorry for the really late reply! (Didn't have my laptop)

This is working, but it's dreadfully slow and seems to not run in parallel?


On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath <ni...@gmail.com>
wrote:

> You need to use mapPartitions (or foreachPartition) to instantiate your
> client in each partition as it is not serializable by the pickle library.
> Something like
>
> def mapper(iter):
>     db = MongoClient()['spark_test_db']
> *    collec = db['programs']*
> *    for val in iter:*
>         asc = val.encode('ascii','ignore')
>         json = convertToJSON(asc, indexMap)
>         yield collec.insert(json)
>
>
>
> def convertToJSON(string, indexMap):
>     values = string.strip().split(",")
>     json = {}
>     for i in range(len(values)):
>         json[indexMap[i]] = values[i]
>     return json
>
> *doc_ids = data.mapPartitions(mapper)*
>
>
>
>
> On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist <
> mailinglistsamarth@gmail.com> wrote:
>
>> db = MongoClient()['spark_test_db']
>> *collec = db['programs']*
>>
>> def mapper(val):
>>     asc = val.encode('ascii','ignore')
>>     json = convertToJSON(asc, indexMap)
>>     collec.insert(json) # *this is not working*
>>
>> def convertToJSON(string, indexMap):
>>     values = string.strip().split(",")
>>      json = {}
>>     for i in range(len(values)):
>>         json[indexMap[i]] = values[i]
>>     return json
>>
>> *jsons = data.map(mapper)*
>>
>>
>>
>> *The last line does the mapping. I am very new to Spark, can you explain
>> what explicit serialization, etc is in the context of spark?  The error I
>> am getting:*
>> *Traceback (most recent call last):  File "<stdin>", line 1, in <module>
>> File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in
>> saveAsTextFile
>> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
>> "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd
>> pickled_command = CloudPickleSerializer().dumps(command)   File
>> "/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps
>>   def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps
>>     cp.dump(obj)  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump
>>   return pickle.Pickler.dump(self, obj)  File
>> "/usr/lib/python2.7/pickle.py", line 224, in dump     self.save(obj)  File
>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>> unbound method with explicit self  File "/usr/lib/python2.7/pickle.py",
>> line 548, in save_tuple     save(element)  File
>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>> unbound method with explicit self  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
>> save_function     self.save_function_tuple(obj, [themodule])  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
>> save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
>> line 286, in save     f(self, obj) # Call unbound method with explicit
>> self  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>> self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
>> 633, in _batch_appends     save(x)  File "/usr/lib/python2.7/pickle.py",
>> line 286, in save    f(self, obj) # Call unbound method with explicit self
>> File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
>> save_function     self.save_function_tuple(obj, [themodule])  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
>> save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
>> line 286, in save     f(self, obj) # Call unbound method with explicit
>> self  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>> self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
>> 636, in _batch_appends     save(tmp[0])  File
>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>> unbound method with explicit self  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in
>> save_function     self.save_function_tuple(obj, modList)  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in
>> save_function_tuple    save(f_globals)  File
>> "/usr/lib/python2.7/pickle.py", line 286, in save     f(self, obj) # Call
>> unbound method with explicit self  File
>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in
>> save_dict    pickle.Pickler.save_dict(self, obj)   File
>> "/usr/lib/python2.7/pickle.py", line 649, in save_dict
>> self._batch_setitems(obj.iteritems())  File "/usr/lib/python2.7/pickle.py",
>> line 681, in _batch_setitems     save(v)  File
>> "/usr/lib/python2.7/pickle.py", line 306, in save    rv =
>> reduce(self.proto)  File
>> "/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489,
>> in __call__     self.__name.split(".")[-1])TypeError: 'Collection' object
>> is not callable. If you meant to call the '__getnewargs__' method on a
>> 'Collection' object it is failing because no such method exists. *
>>
>>
>> On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <ma...@gmail.com>
>> wrote:
>>
>>> You have to ideally pass the mongoclient object along with your data in
>>> the mapper(python should be try to serialize your mongoclient, but explicit
>>> is better)....
>>> if client is serializable then all should end well.. if not then you are
>>> better off using map partition & initilizing the driver in each iteration &
>>> load data of each partition. Thr is a similar discussion in the list in the
>>> past.
>>> Regards
>>> Mayur
>>>
>>> Mayur Rustagi
>>> Ph: +1 (760) 203 3257
>>> http://www.sigmoidanalytics.com
>>>  @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>
>>>
>>>
>>> On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas <
>>> nicholas.chammas@gmail.com> wrote:
>>>
>>>> Where's your driver code (the code interacting with the RDDs)? Are you
>>>> getting serialization errors?
>>>>
>>>> 2014년 5월 17일 토요일, Samarth Mailinglist<ma...@gmail.com>님이
>>>> 작성한 메시지:
>>>>
>>>> Hi all,
>>>>>
>>>>> I am trying to store the results of a reduce into mongo.
>>>>> I want to share the variable "collection" in the mappers.
>>>>>
>>>>>
>>>>> Here's what I have so far (I'm using pymongo)
>>>>>
>>>>> db = MongoClient()['spark_test_db']
>>>>> collec = db['programs']
>>>>> db = MongoClient()['spark_test_db']
>>>>> *collec = db['programs']*
>>>>>
>>>>> def mapper(val):
>>>>>     asc = val.encode('ascii','ignore')
>>>>>     json = convertToJSON(asc, indexMap)
>>>>>     collec.insert(json) # *this is not working*
>>>>>
>>>>> def convertToJSON(string, indexMap):
>>>>>     values = string.strip().split(",")
>>>>>     json = {}
>>>>>     for i in range(len(values)):
>>>>>         json[indexMap[i]] = values[i]
>>>>>     return json
>>>>>
>>>>> How do I do this?
>>>>>
>>>>
>>>
>>
>

Re: Using mongo with PySpark

Posted by Mayur Rustagi <ma...@gmail.com>.
Yes initialization each turn is hard.. you seem to using python. Another
risky thing you can try is to serialize the mongoclient object using any
serializer (like kryo wrappers in python) & pass it on to mappers.. then in
each mapper you'll just have to unserialize it & use it directly... This
may or may not work for you depending on internals of Mongodb client.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Wed, Jun 4, 2014 at 10:27 PM, Samarth Mailinglist <
mailinglistsamarth@gmail.com> wrote:

> Thanks a lot, sorry for the really late reply! (Didn't have my laptop)
>
> This is working, but it's dreadfully slow and seems to not run in
> parallel?
>
>
> On Mon, May 19, 2014 at 2:54 PM, Nick Pentreath <ni...@gmail.com>
> wrote:
>
>> You need to use mapPartitions (or foreachPartition) to instantiate your
>> client in each partition as it is not serializable by the pickle library.
>> Something like
>>
>> def mapper(iter):
>>     db = MongoClient()['spark_test_db']
>> *    collec = db['programs']*
>> *    for val in iter:*
>>         asc = val.encode('ascii','ignore')
>>         json = convertToJSON(asc, indexMap)
>>         yield collec.insert(json)
>>
>>
>>
>> def convertToJSON(string, indexMap):
>>     values = string.strip().split(",")
>>     json = {}
>>     for i in range(len(values)):
>>         json[indexMap[i]] = values[i]
>>     return json
>>
>> *doc_ids = data.mapPartitions(mapper)*
>>
>>
>>
>>
>> On Mon, May 19, 2014 at 8:00 AM, Samarth Mailinglist <
>> mailinglistsamarth@gmail.com> wrote:
>>
>>> db = MongoClient()['spark_test_db']
>>> *collec = db['programs']*
>>>
>>> def mapper(val):
>>>     asc = val.encode('ascii','ignore')
>>>     json = convertToJSON(asc, indexMap)
>>>     collec.insert(json) # *this is not working*
>>>
>>> def convertToJSON(string, indexMap):
>>>     values = string.strip().split(",")
>>>      json = {}
>>>     for i in range(len(values)):
>>>         json[indexMap[i]] = values[i]
>>>     return json
>>>
>>> *jsons = data.map(mapper)*
>>>
>>>
>>>
>>> *The last line does the mapping. I am very new to Spark, can you explain
>>> what explicit serialization, etc is in the context of spark?  The error I
>>> am getting:*
>>> *Traceback (most recent call last):  File "<stdin>", line 1, in
>>> <module>  File "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 712, in
>>> saveAsTextFile
>>> keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)  File
>>> "/usr/local/spark-0.9.1/python/pyspark/rdd.py", line 1178, in _jrdd
>>> pickled_command = CloudPickleSerializer().dumps(command)   File
>>> "/usr/local/spark-0.9.1/python/pyspark/serializers.py", line 275, in dumps
>>>   def dumps(self, obj): return cloudpickle.dumps(obj, 2)  File
>>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 801, in dumps
>>>     cp.dump(obj)  File
>>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 140, in dump
>>>   return pickle.Pickler.dump(self, obj)  File
>>> "/usr/lib/python2.7/pickle.py", line 224, in dump     self.save(obj)  File
>>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>>> unbound method with explicit self  File "/usr/lib/python2.7/pickle.py",
>>> line 548, in save_tuple     save(element)  File
>>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>>> unbound method with explicit self  File
>>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
>>> save_function     self.save_function_tuple(obj, [themodule])  File
>>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
>>> save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
>>> line 286, in save     f(self, obj) # Call unbound method with explicit
>>> self  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>>> self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
>>> 633, in _batch_appends     save(x)  File "/usr/lib/python2.7/pickle.py",
>>> line 286, in save    f(self, obj) # Call unbound method with explicit self
>>> File "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 259, in
>>> save_function     self.save_function_tuple(obj, [themodule])  File
>>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 316, in
>>> save_function_tuple    save(closure)  File "/usr/lib/python2.7/pickle.py",
>>> line 286, in save     f(self, obj) # Call unbound method with explicit
>>> self  File "/usr/lib/python2.7/pickle.py", line 600, in save_list
>>> self._batch_appends(iter(obj))  File "/usr/lib/python2.7/pickle.py", line
>>> 636, in _batch_appends     save(tmp[0])  File
>>> "/usr/lib/python2.7/pickle.py", line 286, in save    f(self, obj) # Call
>>> unbound method with explicit self  File
>>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 254, in
>>> save_function     self.save_function_tuple(obj, modList)  File
>>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 314, in
>>> save_function_tuple    save(f_globals)  File
>>> "/usr/lib/python2.7/pickle.py", line 286, in save     f(self, obj) # Call
>>> unbound method with explicit self  File
>>> "/usr/local/spark-0.9.1/python/pyspark/cloudpickle.py", line 181, in
>>> save_dict    pickle.Pickler.save_dict(self, obj)   File
>>> "/usr/lib/python2.7/pickle.py", line 649, in save_dict
>>> self._batch_setitems(obj.iteritems())  File "/usr/lib/python2.7/pickle.py",
>>> line 681, in _batch_setitems     save(v)  File
>>> "/usr/lib/python2.7/pickle.py", line 306, in save    rv =
>>> reduce(self.proto)  File
>>> "/usr/local/lib/python2.7/dist-packages/pymongo/collection.py", line 1489,
>>> in __call__     self.__name.split(".")[-1])TypeError: 'Collection' object
>>> is not callable. If you meant to call the '__getnewargs__' method on a
>>> 'Collection' object it is failing because no such method exists. *
>>>
>>>
>>> On Sat, May 17, 2014 at 9:30 PM, Mayur Rustagi <ma...@gmail.com>
>>> wrote:
>>>
>>>> You have to ideally pass the mongoclient object along with your data in
>>>> the mapper(python should be try to serialize your mongoclient, but explicit
>>>> is better)....
>>>> if client is serializable then all should end well.. if not then you
>>>> are better off using map partition & initilizing the driver in each
>>>> iteration & load data of each partition. Thr is a similar discussion in the
>>>> list in the past.
>>>> Regards
>>>> Mayur
>>>>
>>>> Mayur Rustagi
>>>> Ph: +1 (760) 203 3257
>>>> http://www.sigmoidanalytics.com
>>>>  @mayur_rustagi <https://twitter.com/mayur_rustagi>
>>>>
>>>>
>>>>
>>>> On Sat, May 17, 2014 at 8:58 PM, Nicholas Chammas <
>>>> nicholas.chammas@gmail.com> wrote:
>>>>
>>>>> Where's your driver code (the code interacting with the RDDs)? Are you
>>>>> getting serialization errors?
>>>>>
>>>>> 2014년 5월 17일 토요일, Samarth Mailinglist<ma...@gmail.com>님이
>>>>> 작성한 메시지:
>>>>>
>>>>> Hi all,
>>>>>>
>>>>>> I am trying to store the results of a reduce into mongo.
>>>>>> I want to share the variable "collection" in the mappers.
>>>>>>
>>>>>>
>>>>>> Here's what I have so far (I'm using pymongo)
>>>>>>
>>>>>> db = MongoClient()['spark_test_db']
>>>>>> collec = db['programs']
>>>>>> db = MongoClient()['spark_test_db']
>>>>>> *collec = db['programs']*
>>>>>>
>>>>>> def mapper(val):
>>>>>>     asc = val.encode('ascii','ignore')
>>>>>>     json = convertToJSON(asc, indexMap)
>>>>>>     collec.insert(json) # *this is not working*
>>>>>>
>>>>>> def convertToJSON(string, indexMap):
>>>>>>     values = string.strip().split(",")
>>>>>>     json = {}
>>>>>>     for i in range(len(values)):
>>>>>>         json[indexMap[i]] = values[i]
>>>>>>     return json
>>>>>>
>>>>>> How do I do this?
>>>>>>
>>>>>
>>>>
>>>
>>
>