You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Niklas Hansson <ni...@gmail.com> on 2019/08/07 20:19:11 UTC

Python DoFn.setup()

Hi!

I have a streaming (unbound) pipeline in which i like to use a sklearn(
http://scikit-learn.github.io/stable) model for making predictions based
upon input from a pubsub stream. In order to avoid loading the model
multiple times I would like to use the DoFn.Setup() feature that where
implemented from 2.14(at least as far as I understand), PR[562](
https://github.com/apache/beam/pull/7994).

My DoFn
class PredictSklearn(beam.DoFn):
""" Format the input to the desired shape"""

def __init__(self):
self._model = None

def setup(self):
model_name = "model.joblib"
download_blob(bucket_name="dataflowsklearnstreaming",source_blob_name
=model_name)
self._model = joblib.load(model_name)

def process(self,element):
element["prediction"] = self._model.predict(element["data"])
return [element]

My pipeline:
input_pubsub = ( p | 'Read from PubSub 2' >>
beam.io.gcp.pubsub.ReadFromPubSub(topic=known_args.topic,with_attributes=
True))
_ = (input_pubsub | "format the data correctly" >> beam.ParDo(FormatInput())
| "transform the data" >> beam.ParDo(PredictSklearn())
| "print the data" >> beam.Map(printy)
)

However i get the error message:""" AttributeError: 'NoneType' object has
no attribute 'predict' [while running 'transform the data']""" Due to that
the model is not loaded.

For the full code: https://github.com/NikeNano/DataflowSklearnStreaming

I have made an issus in Jira but realised that I probably should have asked
here first. Would be super happy for some help on this :)

Thanks!

Re: Python DoFn.setup()

Posted by Ahmet Altay <al...@google.com>.
Replied on the JIRA issue[1] you filed. Let's keep the conversation in one
place in the JIRA.

[1] https://issues.apache.org/jira/browse/BEAM-7885

Ahmet

On Wed, Aug 7, 2019 at 1:19 PM Niklas Hansson <ni...@gmail.com>
wrote:

> Hi!
>
> I have a streaming (unbound) pipeline in which i like to use a sklearn(
> http://scikit-learn.github.io/stable) model for making predictions based
> upon input from a pubsub stream. In order to avoid loading the model
> multiple times I would like to use the DoFn.Setup() feature that where
> implemented from 2.14(at least as far as I understand), PR[562](
> https://github.com/apache/beam/pull/7994).
>
> My DoFn
> class PredictSklearn(beam.DoFn):
> """ Format the input to the desired shape"""
>
> def __init__(self):
> self._model = None
>
> def setup(self):
> model_name = "model.joblib"
> download_blob(bucket_name="dataflowsklearnstreaming",source_blob_name
> =model_name)
> self._model = joblib.load(model_name)
>
> def process(self,element):
> element["prediction"] = self._model.predict(element["data"])
> return [element]
>
> My pipeline:
> input_pubsub = ( p | 'Read from PubSub 2' >>
> beam.io.gcp.pubsub.ReadFromPubSub(topic=known_args.topic,with_attributes=
> True))
> _ = (input_pubsub | "format the data correctly" >>
> beam.ParDo(FormatInput())
> | "transform the data" >> beam.ParDo(PredictSklearn())
> | "print the data" >> beam.Map(printy)
> )
>
> However i get the error message:""" AttributeError: 'NoneType' object has
> no attribute 'predict' [while running 'transform the data']""" Due to
> that the model is not loaded.
>
> For the full code: https://github.com/NikeNano/DataflowSklearnStreaming
>
> I have made an issus in Jira but realised that I probably should have
> asked here first. Would be super happy for some help on this :)
>
> Thanks!
>
>
>