You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Igor Morgado <mo...@gmail.com> on 2020/03/22 16:50:38 UTC
Code improvement in apache-beam.
I have a input with some transaction data in json input (in this case a
file)
{ "timestamp": "2019-01-01 12:00:00", "customer": "customer-1", "page":
"product", "product": "product-1" }
{ "timestamp": "2019-01-01 12:02:00", "customer": "customer-1", "page":
"basket", "product": "product-1" }
{ "timestamp": "2019-01-01 12:04:00", "customer": "customer-1", "page":
"checkout" }
{ "timestamp": "2019-01-01 13:00:00", "customer": "customer-2", "page":
"product", "product": "product-2" }
{ "timestamp": "2019-01-01 13:02:00", "customer": "customer-2", "page":
"basket", "product": "product-2" }
{ "timestamp": "2019-01-01 14:00:00", "customer": "customer-3", "page":
"product", "product": "product-3" }
{ "timestamp": "2019-01-01 14:05:00", "customer": "customer-3", "page":
"basket", "product": "product-3" }
{ "timestamp": "2019-01-01 14:10:00", "customer": "customer-3", "page":
"product", "product": "product-4" }
{ "timestamp": "2019-01-01 14:16:00", "customer": "customer-3", "page":
"product", "product": "product-5" }
{ "timestamp": "2019-01-01 14:20:00", "customer": "customer-3", "page":
"basket", "product": "product-4" }
{ "timestamp": "2019-01-01 14:21:00", "customer": "customer-3", "page":
"checkout" }
{ "timestamp": "2019-01-01 15:00:00", "customer": "customer-9", "page":
"product", "product": "product-3" }
{ "timestamp": "2019-01-01 15:11:00", "customer": "customer-9", "page":
"basket", "product": "product-3" }
The idea is be able to identify abandoned sessions, for example with more
than 10 minutes idle. Transactions **checkedout** cannot be considered
abandoned. The code to process is is below
import argparse
import json
import logging
import os
import arrow
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam import ParDo, Pipeline, Map, GroupByKey, Partition
from apache_beam.options.pipeline_options import PipelineOptions,
SetupOptions
class JsonCoder:
"""A simple encoder/decoder for JsonNL format"""
@staticmethod
def encode(stream):
return bytes(json.dumps(stream.as_dict()), 'utf8')
@staticmethod
def decode(stream):
return json.loads(stream)
class PageViews:
"""Datastructure to handle, store and view PageViews entries"""
def arrow_to_json(self):
return self.timestamp.format('YYYY-MM-DD HH:mm:ss')
def __init__(self, timestamp, customer, page, product=None):
self.timestamp = arrow.get(timestamp)
self.customer = customer
self.page = page
self.product = product
def as_dict(self):
json_dict = {'customer': self.customer,
'page': self.page,
'timestamp': self.arrow_to_json()}
if self.product is not None:
json_dict['product'] = self.product
return json_dict
def __str__(self):
return
f'{self.customer};{self.arrow_to_json()};{self.page};{self.product}
'
def __repr__(self):
repr_fmt = 'PageViews(customer={}, timestamp={}, page={},
product={}'
return repr_fmt.format(self.customer, self.arrow_to_json(),
self.page, self.product)
class JsonToPageViews(beam.DoFn):
"""A simple processor to be used while converting Json entries to
PageViews struct"""
def process(self, element, **kwargs):
# Necessary to "normalize" the Json readings
if 'product' not in element:
element['product'] = None
pv = PageViews(timestamp=element['timestamp'],
customer=element['customer'],
page=element['page'],
product=element['product'])
yield pv
class OnlyExpired(beam.DoFn):
def process(self, element, **kwargs):
customer, entries = element
o_timestamp = entries[0].timestamp.timestamp
checkout = False
expired = False
last_entry = None
for num, entry in enumerate(entries[0:]):
n_timestamp = entry.timestamp.timestamp
d_timestamp = n_timestamp - o_timestamp
o_timestamp = n_timestamp
if d_timestamp > 60*10:
expired = True
if entry.page == 'checkout':
checkout = True
last_entry = entry
if not checkout:
expired = True
if expired:
yield last_entry
class GetTimestamp(beam.DoFn):
"""A Process just to check if timestamps where correctly assigned"""
def process(self, element, timestamp=beam.DoFn.TimestampParam,
**kwargs):
yield '{} - {}'.format(timestamp.to_utc_datetime(), element)
def partition_fn(pageview, num_partitions=1):
"""Partition the customer based on customer_id"""
customer_id = int(pageview.customer.split('-')[1])
return customer_id % num_partitions
def run(argv=None, save_main_session=True):
"""Entry point for main code"""
# Processing Command Line Arguments
parser = argparse.ArgumentParser()
parser.add_argument('--input', help='Input files',
default='input/page-views.json')
parser.add_argument('--output', help='Output dir', default='output')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session =
save_main_session
# Create the pipeline
p = Pipeline(options=pipeline_options)
res = (p
| 'Read JSON files' >> ReadFromText(known_args.input,
coder=JsonCoder())
| 'Convert JSON to Pageview' >> ParDo(JsonToPageViews())
| 'Add Timestamps' >> Map(lambda entry:
beam.window.TimestampedValue(entry, entry.timestamp.timestamp))
| 'Create Group Key' >> Map(lambda entry: (entry.customer,
entry))
| GroupByKey()
| 'Only Expired' >> ParDo(OnlyExpired())
| 'Partition' >> Partition(partition_fn, 2)
)
res[0] | 'Save Part0' >>
WriteToText(os.path.join(known_args.output, "abandoned-carts-part0"),
file_name_suffix='.json', coder=JsonCoder())
res[1] | 'Save Part1' >>
WriteToText(os.path.join(known_args.output, 'abandoned-carts-part1'),
file_name_suffix='.json', coder=JsonCoder())
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.WARN)
run()
I personally didn't like the way I have handled the idle sessions with a
custom class `OnlyExpired`, since I need to save a session variable. I
think is a bottleneck in case of streaming. How can I improve this code?
I also added a partition to split data when needed (in this case my
customer name).