You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Igor Morgado <> on 2020/03/22 16:50:38 UTC

Code improvement in apache-beam.

I have a input with some transaction data in json input (in this case a

    { "timestamp": "2019-01-01 12:00:00", "customer": "customer-1", "page":
"product", "product": "product-1" }
    { "timestamp": "2019-01-01 12:02:00", "customer": "customer-1", "page":
"basket", "product": "product-1" }
    { "timestamp": "2019-01-01 12:04:00", "customer": "customer-1", "page":
"checkout" }
    { "timestamp": "2019-01-01 13:00:00", "customer": "customer-2", "page":
"product", "product": "product-2" }
    { "timestamp": "2019-01-01 13:02:00", "customer": "customer-2", "page":
"basket", "product": "product-2" }
    { "timestamp": "2019-01-01 14:00:00", "customer": "customer-3", "page":
"product", "product": "product-3" }
    { "timestamp": "2019-01-01 14:05:00", "customer": "customer-3", "page":
"basket", "product": "product-3" }
    { "timestamp": "2019-01-01 14:10:00", "customer": "customer-3", "page":
"product", "product": "product-4" }
    { "timestamp": "2019-01-01 14:16:00", "customer": "customer-3", "page":
"product", "product": "product-5" }
    { "timestamp": "2019-01-01 14:20:00", "customer": "customer-3", "page":
"basket", "product": "product-4" }
    { "timestamp": "2019-01-01 14:21:00", "customer": "customer-3", "page":
"checkout" }
    { "timestamp": "2019-01-01 15:00:00", "customer": "customer-9", "page":
"product", "product": "product-3" }
    { "timestamp": "2019-01-01 15:11:00", "customer": "customer-9", "page":
"basket", "product": "product-3" }

The idea is be able to identify abandoned sessions, for example with more
than 10 minutes idle. Transactions **checkedout** cannot be considered
abandoned. The code to process is is below

    import argparse
    import json
    import logging
    import os

    import arrow

    import apache_beam as beam
    from import ReadFromText, WriteToText
    from apache_beam import ParDo, Pipeline, Map, GroupByKey, Partition
    from apache_beam.options.pipeline_options import PipelineOptions,

    class JsonCoder:
        """A simple encoder/decoder for JsonNL format"""

        def encode(stream):
            return bytes(json.dumps(stream.as_dict()), 'utf8')

        def decode(stream):
            return json.loads(stream)

    class PageViews:
        """Datastructure to handle, store and view PageViews entries"""

        def arrow_to_json(self):
            return self.timestamp.format('YYYY-MM-DD HH:mm:ss')

        def __init__(self, timestamp, customer, page, product=None):
            self.timestamp = arrow.get(timestamp)
            self.customer = customer
   = page
            self.product = product

        def as_dict(self):
            json_dict = {'customer': self.customer,
                         'timestamp': self.arrow_to_json()}

            if self.product is not None:
                json_dict['product'] = self.product

            return json_dict

        def __str__(self):

        def __repr__(self):
            repr_fmt = 'PageViews(customer={}, timestamp={}, page={},
            return repr_fmt.format(self.customer, self.arrow_to_json(),, self.product)

    class JsonToPageViews(beam.DoFn):
        """A simple processor to be used while converting Json entries to
PageViews struct"""

        def process(self, element, **kwargs):
            # Necessary to "normalize" the Json readings
            if 'product' not in element:
                element['product'] = None

            pv = PageViews(timestamp=element['timestamp'],
            yield pv

    class OnlyExpired(beam.DoFn):
        def process(self, element, **kwargs):
            customer, entries = element

            o_timestamp = entries[0].timestamp.timestamp
            checkout = False
            expired = False
            last_entry = None
            for num, entry in enumerate(entries[0:]):
                n_timestamp = entry.timestamp.timestamp
                d_timestamp = n_timestamp - o_timestamp
                o_timestamp = n_timestamp
                if d_timestamp > 60*10:
                    expired = True

                if == 'checkout':
                    checkout = True

                last_entry = entry

            if not checkout:
                expired = True

            if expired:
                yield last_entry

    class GetTimestamp(beam.DoFn):
        """A Process just to check if timestamps where correctly assigned"""
        def process(self, element, timestamp=beam.DoFn.TimestampParam,
            yield '{} - {}'.format(timestamp.to_utc_datetime(), element)

    def partition_fn(pageview, num_partitions=1):
        """Partition the customer based on customer_id"""
        customer_id = int(pageview.customer.split('-')[1])
        return customer_id % num_partitions

    def run(argv=None, save_main_session=True):
        """Entry point for main code"""

        # Processing Command Line  Arguments
        parser = argparse.ArgumentParser()
        parser.add_argument('--input', help='Input files',
        parser.add_argument('--output', help='Output dir', default='output')
        known_args, pipeline_args = parser.parse_known_args(argv)
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(SetupOptions).save_main_session =

        # Create the pipeline
        p = Pipeline(options=pipeline_options)
        res = (p
               | 'Read JSON files' >> ReadFromText(known_args.input,
               | 'Convert JSON to Pageview' >> ParDo(JsonToPageViews())
               | 'Add Timestamps' >> Map(lambda entry:
beam.window.TimestampedValue(entry, entry.timestamp.timestamp))
               | 'Create Group Key' >> Map(lambda entry: (entry.customer,
               | GroupByKey()
               | 'Only Expired' >> ParDo(OnlyExpired())
               | 'Partition' >> Partition(partition_fn, 2)

        res[0] | 'Save Part0' >>
WriteToText(os.path.join(known_args.output, "abandoned-carts-part0"),
file_name_suffix='.json', coder=JsonCoder())
        res[1] | 'Save Part1' >>
WriteToText(os.path.join(known_args.output, 'abandoned-carts-part1'),
file_name_suffix='.json', coder=JsonCoder())

    if __name__ == '__main__':

I personally didn't like the way I have handled the idle sessions with a
custom class `OnlyExpired`, since I need to save a session variable. I
think is a bottleneck in case of streaming. How can I improve this code?

I also added a partition to split data when needed (in this case my
customer name).