You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Joseph Winston <jo...@gmail.com> on 2016/03/08 20:04:07 UTC

Subclassing iobase.Source and ptransform.PTransform in the Python SDK

I would like to use Apache Beam as my dataflow engine. Unfortunately for
me, the input to the dataflow isn't a text file and the result of the
pipeline is also not a text file. To see how difficult it would be to
create the desired classes, I've subclassed ptransform.PTransform as well
as iobase.Source and started on the read side of the problem.  I've cloned
and installed https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git
on my VM and I am working with the most recent commit 3a56ce7.

Next, I wrote the following code, which looks very close to the class Read
in google/cloud/dataflow/io/iobase.py and TextFileSource in
google/cloud/dataflow/io/iobase.py

import argparse
import logging
import sys

import google.cloud.dataflow as df

from google.cloud.dataflow.io import iobase

class DummyFileSource(iobase.Source):
    """A source for a GCS or local dummy file.
    """

    def __init__(self, params):
        self._params = params
        return

    @property
    def format(self):
        """Source format name required for remote execution."""
        return 'binary'

from google.cloud.dataflow import pvalue
from google.cloud.dataflow.transforms import core
from google.cloud.dataflow.transforms import ptransform

class DummyRead(ptransform.PTransform):
    """A transform that reads a PCollection."""

    def __init__(self, *args, **kwargs):
        """Initializes a DummyRead transform.

        Args:
        *args: A tuple of position arguments.
        **kwargs: A dictionary of keyword arguments.

        The *args, **kwargs are expected to be (label, source) or (source).
        """

        label, source = self.parse_label_and_arg(args, kwargs, 'source')
        super(DummyRead, self).__init__(label)
        self.source = source
        return

    def apply(self, pbegin):
        assert isinstance(pbegin, pvalue.PBegin)
        self.pipeline = pbegin.pipeline
        return pvalue.PCollection(pipeline=self.pipeline, transform=self)

    def get_windowing(self, unused_inputs):
        return core.Windowing(window.GlobalWindows())

def main(argv = None):
    if argv is None:
        argv = sys.argv

    DummyFileSource('vat')
    parser = argparse.ArgumentParser()
    parser.add_argument('--baseURI',
                        dest='baseURI',
                        default='http://localhost:3000',
                        help='Base URI.')

    parser.add_argument('--fakeData',
                        dest='fakeData',
                        default='fakeData',
                        help='Fake data')
    known_args, pipeline_args = parser.parse_known_args(argv)

    p = df.Pipeline(argv=pipeline_args)

    params = {}
    postStackDummy = p | DummyRead('read',
                                   DummyFileSource(params))

    #
    # Actually run the pipeline (all operations above are deferred).
    #

    p.run()

    return

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    sys.exit(main(sys.argv) or 0)

When I run this program, the following traceback is produced:

Traceback (most recent call last):
  File "sample.py", line 85, in <module>
    sys.exit(main(sys.argv) or 0)
  File "sample.py", line 79, in main
    p.run()
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 135, in run
    return self.runner.run(self)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 81, in run
    pipeline.visit(RunVisitor(self), node=node)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 168, in visit
    start_transform.visit(visitor, self, visited)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 376, in visit
    part.visit(visitor, pipeline, visited)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
line 379, in visit
    visitor.visit_transform(self)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 79, in visit_transform
    self.runner.run_transform(transform_node)
  File
"/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
line 155, in run_transform
    transform_node.transform, self))
NotImplementedError: Execution of [<DummyRead(PTransform) label=[read]>]
not implemented in runner
<google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner object at
0x7f42af19e750>.

What I am having a hard time seeing is the association of the label 'read'
in the constructor of the class DummyRead and where the method needs to be
implemented.  What am I missing?

Re: Subclassing iobase.Source and ptransform.PTransform in the Python SDK

Posted by Chamikara Jayalath <ch...@apache.org>.
Hi Joseph,

Python SDK currently does not support creating new sources. Sources that
are currently available are backed by Google Dataflow service.
Theoretically it should be possible to get new sources working just for
DirectPipelineRunner by hacking the SDK but this has not been tested
properly.

Please let us know if you run into any issues when using existing sources.
We hope to add a framework for creating new sources soon.

Thanks,
Cham


On Tue, Mar 8, 2016 at 11:11 AM, Davor Bonaci <da...@google.com.invalid>
wrote:

> Adding Silviu, who can comment more.
>
> On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston <
> josephwinstoniii@gmail.com>
> wrote:
>
> > I would like to use Apache Beam as my dataflow engine. Unfortunately for
> > me, the input to the dataflow isn't a text file and the result of the
> > pipeline is also not a text file. To see how difficult it would be to
> > create the desired classes, I've subclassed ptransform.PTransform as well
> > as iobase.Source and started on the read side of the problem.  I've
> cloned
> > and installed
> https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git
> > on my VM and I am working with the most recent commit 3a56ce7.
> >
> > Next, I wrote the following code, which looks very close to the class
> Read
> > in google/cloud/dataflow/io/iobase.py and TextFileSource in
> > google/cloud/dataflow/io/iobase.py
> >
> > import argparse
> > import logging
> > import sys
> >
> > import google.cloud.dataflow as df
> >
> > from google.cloud.dataflow.io import iobase
> >
> > class DummyFileSource(iobase.Source):
> >     """A source for a GCS or local dummy file.
> >     """
> >
> >     def __init__(self, params):
> >         self._params = params
> >         return
> >
> >     @property
> >     def format(self):
> >         """Source format name required for remote execution."""
> >         return 'binary'
> >
> > from google.cloud.dataflow import pvalue
> > from google.cloud.dataflow.transforms import core
> > from google.cloud.dataflow.transforms import ptransform
> >
> > class DummyRead(ptransform.PTransform):
> >     """A transform that reads a PCollection."""
> >
> >     def __init__(self, *args, **kwargs):
> >         """Initializes a DummyRead transform.
> >
> >         Args:
> >         *args: A tuple of position arguments.
> >         **kwargs: A dictionary of keyword arguments.
> >
> >         The *args, **kwargs are expected to be (label, source) or
> (source).
> >         """
> >
> >         label, source = self.parse_label_and_arg(args, kwargs, 'source')
> >         super(DummyRead, self).__init__(label)
> >         self.source = source
> >         return
> >
> >     def apply(self, pbegin):
> >         assert isinstance(pbegin, pvalue.PBegin)
> >         self.pipeline = pbegin.pipeline
> >         return pvalue.PCollection(pipeline=self.pipeline, transform=self)
> >
> >     def get_windowing(self, unused_inputs):
> >         return core.Windowing(window.GlobalWindows())
> >
> > def main(argv = None):
> >     if argv is None:
> >         argv = sys.argv
> >
> >     DummyFileSource('vat')
> >     parser = argparse.ArgumentParser()
> >     parser.add_argument('--baseURI',
> >                         dest='baseURI',
> >                         default='http://localhost:3000',
> >                         help='Base URI.')
> >
> >     parser.add_argument('--fakeData',
> >                         dest='fakeData',
> >                         default='fakeData',
> >                         help='Fake data')
> >     known_args, pipeline_args = parser.parse_known_args(argv)
> >
> >     p = df.Pipeline(argv=pipeline_args)
> >
> >     params = {}
> >     postStackDummy = p | DummyRead('read',
> >                                    DummyFileSource(params))
> >
> >     #
> >     # Actually run the pipeline (all operations above are deferred).
> >     #
> >
> >     p.run()
> >
> >     return
> >
> > if __name__ == '__main__':
> >     logging.getLogger().setLevel(logging.INFO)
> >     sys.exit(main(sys.argv) or 0)
> >
> > When I run this program, the following traceback is produced:
> >
> > Traceback (most recent call last):
> >   File "sample.py", line 85, in <module>
> >     sys.exit(main(sys.argv) or 0)
> >   File "sample.py", line 79, in main
> >     p.run()
> >   File
> >
> >
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> > line 135, in run
> >     return self.runner.run(self)
> >   File
> >
> >
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> > line 81, in run
> >     pipeline.visit(RunVisitor(self), node=node)
> >   File
> >
> >
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> > line 168, in visit
> >     start_transform.visit(visitor, self, visited)
> >   File
> >
> >
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> > line 376, in visit
> >     part.visit(visitor, pipeline, visited)
> >   File
> >
> >
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> > line 379, in visit
> >     visitor.visit_transform(self)
> >   File
> >
> >
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> > line 79, in visit_transform
> >     self.runner.run_transform(transform_node)
> >   File
> >
> >
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> > line 155, in run_transform
> >     transform_node.transform, self))
> > NotImplementedError: Execution of [<DummyRead(PTransform) label=[read]>]
> > not implemented in runner
> > <google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner object
> at
> > 0x7f42af19e750>.
> >
> > What I am having a hard time seeing is the association of the label
> 'read'
> > in the constructor of the class DummyRead and where the method needs to
> be
> > implemented.  What am I missing?
> >
>



-- 
Software Engineer.
Google Inc.

Re: Subclassing iobase.Source and ptransform.PTransform in the Python SDK

Posted by Davor Bonaci <da...@google.com.INVALID>.
Adding Silviu, who can comment more.

On Tue, Mar 8, 2016 at 11:04 AM, Joseph Winston <jo...@gmail.com>
wrote:

> I would like to use Apache Beam as my dataflow engine. Unfortunately for
> me, the input to the dataflow isn't a text file and the result of the
> pipeline is also not a text file. To see how difficult it would be to
> create the desired classes, I've subclassed ptransform.PTransform as well
> as iobase.Source and started on the read side of the problem.  I've cloned
> and installed https://github.com/GoogleCloudPlatform/DataflowPythonSDK.git
> on my VM and I am working with the most recent commit 3a56ce7.
>
> Next, I wrote the following code, which looks very close to the class Read
> in google/cloud/dataflow/io/iobase.py and TextFileSource in
> google/cloud/dataflow/io/iobase.py
>
> import argparse
> import logging
> import sys
>
> import google.cloud.dataflow as df
>
> from google.cloud.dataflow.io import iobase
>
> class DummyFileSource(iobase.Source):
>     """A source for a GCS or local dummy file.
>     """
>
>     def __init__(self, params):
>         self._params = params
>         return
>
>     @property
>     def format(self):
>         """Source format name required for remote execution."""
>         return 'binary'
>
> from google.cloud.dataflow import pvalue
> from google.cloud.dataflow.transforms import core
> from google.cloud.dataflow.transforms import ptransform
>
> class DummyRead(ptransform.PTransform):
>     """A transform that reads a PCollection."""
>
>     def __init__(self, *args, **kwargs):
>         """Initializes a DummyRead transform.
>
>         Args:
>         *args: A tuple of position arguments.
>         **kwargs: A dictionary of keyword arguments.
>
>         The *args, **kwargs are expected to be (label, source) or (source).
>         """
>
>         label, source = self.parse_label_and_arg(args, kwargs, 'source')
>         super(DummyRead, self).__init__(label)
>         self.source = source
>         return
>
>     def apply(self, pbegin):
>         assert isinstance(pbegin, pvalue.PBegin)
>         self.pipeline = pbegin.pipeline
>         return pvalue.PCollection(pipeline=self.pipeline, transform=self)
>
>     def get_windowing(self, unused_inputs):
>         return core.Windowing(window.GlobalWindows())
>
> def main(argv = None):
>     if argv is None:
>         argv = sys.argv
>
>     DummyFileSource('vat')
>     parser = argparse.ArgumentParser()
>     parser.add_argument('--baseURI',
>                         dest='baseURI',
>                         default='http://localhost:3000',
>                         help='Base URI.')
>
>     parser.add_argument('--fakeData',
>                         dest='fakeData',
>                         default='fakeData',
>                         help='Fake data')
>     known_args, pipeline_args = parser.parse_known_args(argv)
>
>     p = df.Pipeline(argv=pipeline_args)
>
>     params = {}
>     postStackDummy = p | DummyRead('read',
>                                    DummyFileSource(params))
>
>     #
>     # Actually run the pipeline (all operations above are deferred).
>     #
>
>     p.run()
>
>     return
>
> if __name__ == '__main__':
>     logging.getLogger().setLevel(logging.INFO)
>     sys.exit(main(sys.argv) or 0)
>
> When I run this program, the following traceback is produced:
>
> Traceback (most recent call last):
>   File "sample.py", line 85, in <module>
>     sys.exit(main(sys.argv) or 0)
>   File "sample.py", line 79, in main
>     p.run()
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> line 135, in run
>     return self.runner.run(self)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> line 81, in run
>     pipeline.visit(RunVisitor(self), node=node)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> line 168, in visit
>     start_transform.visit(visitor, self, visited)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> line 376, in visit
>     part.visit(visitor, pipeline, visited)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/pipeline.py",
> line 379, in visit
>     visitor.visit_transform(self)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> line 79, in visit_transform
>     self.runner.run_transform(transform_node)
>   File
>
> "/home/jbw/anaconda/lib/python2.7/site-packages/python_dataflow-0.2.0-py2.7-linux-x86_64.egg/google/cloud/dataflow/runners/runner.py",
> line 155, in run_transform
>     transform_node.transform, self))
> NotImplementedError: Execution of [<DummyRead(PTransform) label=[read]>]
> not implemented in runner
> <google.cloud.dataflow.runners.direct_runner.DirectPipelineRunner object at
> 0x7f42af19e750>.
>
> What I am having a hard time seeing is the association of the label 'read'
> in the constructor of the class DummyRead and where the method needs to be
> implemented.  What am I missing?
>