You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "giovani ....." <gb...@gmail.com> on 2022/01/17 10:12:20 UTC

Problem with windowing in python

Hello, could someone help me with this problem:
https://stackoverflow.com/questions/70644351/apache-beam-hanging-on-groupbykey-after-windowing-not-triggering
?

Quickly, I am having problems with python direct runner to aggregate a
data-driven window, simply after using a group by the data is not
outputted.

Maybe I am having some problem understanding beam concepts or it is
something reported with the direct runner, any help will be well
appreciated, thank you very much!
-- 
Att, Giovani Merlin
Estudante de Controle e Automação
Identificação Ufrgs : 00260676

Re: Problem with windowing in python

Posted by "giovani ....." <gb...@gmail.com>.
Hello, thank you a lot for your reply, I'm reading about the
offset_range_tracker and it can be really hard to implement - and I
can't find really good tutorials to implement and understand it.
Changing my strategy I'm reading now from a MongoDB using
ReadFromMongoDB but I'm facing exactly the same problem again...If I
change it to "GroupIntoBathces" I receive the error
"Transform node AppliedPTransform(Read
DB/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey,
_GroupByKeyOnly) was not replaced as expected." tested with python
3.7.9 and 3.9

the full code is shown below:

```
import json
import regex as re

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.mongodbio import ReadFromMongoDB


# Could parse the article in different ways, like getting the text per
paragraph, ignoring lists, depending on the objective. To match
Zeshel's and mewsli's (uses wikiextractor) we will just append all the
texts.
class ProcessPage(beam.DoFn):
def __init__(self, redirects_path):
self.redirects_path = redirects_path

def setup(self):
with open(self.redirects_path, "r") as f:
self.redirects_dict = json.load(f)

self.legends = re.compile(r"\w+\|\[\[[^\\]*\]\]")

def process(self, page):
import unicodedata
from scripts.wikipedia.wikipedia_articles.beam_module.documents import Mention

article_text = ""
mentions = []
text_size = 0
for section in page["sections"]:
for paragraph in section.get("paragraphs", []):
for sentence in paragraph["sentences"]:
# Fix double spaces
sentence_text = unicodedata.normalize("NFKD", sentence["text"])
sentence_text = sentence_text.replace(" ", " ")
if not self.legends.match(sentence_text):
# Adds text
article_text += sentence_text + " "
for link in sentence.get("links", []):
if link["type"] == "internal":
label = self.redirects_dict.get(link["page"], link["page"])
mention_text = link["text"]
# Or anchor text or label name
mention = mention_text if mention_text else link["page"]
# Strip bold and then italics
mention = mention.replace("'''", "").replace("''", "")
mention = mention.replace(
" ", " "
).strip() # wtf_wikipedia generates extra spaces when a link is in a new line
mention = unicodedata.normalize("NFKD", mention)
mention_position = sentence_text.find(mention)
if mention_position != -1:
start = mention_position + text_size
end = start + len(mention)
assert article_text[start:end] == mention
else:
print("Could not find mention:", mention)
mentions.append(
Mention(
start=start,
end=end,
mention=mention,
label=label,
source_doc=page["title"],
)
)
text_size = len(article_text)
yield mentions


connection_string = "mongodb://localhost:27017"
db_name = "dewiki"
collection_name = "pages"
redirect_path = "data/wikipedia/bef_format/de/redirects_table.json"
output_connection_string =
"sqlite:////home/giovani/wksp/BEF/wikipedia_cirrus.db"
_beam_pipeline_args = [
"--runner=DirectRunner",
"--streaming",
# "--direct_num_workers=1",
# "--direct_running_mode=multi_processing",
# "--setup_file=./setup.py",
]


class BatchIndexing(beam.DoFn):
def __init__(self, connection_string):
self._connection_string = connection_string

def setup(self):
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from scripts.wikipedia.wikipedia_articles.beam_module.documents import Base

engine = create_engine(self._connection_string, echo=False)
self.session = sessionmaker(bind=engine)(autocommit=False, autoflush=False)
self._lines = []
Base.metadata.create_all(engine)

def process(self, element):
self._lines.append(element)

def finish_bundle(self):
self.session.add_all(self._lines)
self.session.commit()


with beam.Pipeline(options=PipelineOptions(_beam_pipeline_args)) as pipeline:
pipeline = (
pipeline
| "Read DB"
>> ReadFromMongoDB(uri=connection_string, db=db_name, coll=collection_name, filter={"isRedirect": False})
| "Process element" >> beam.ParDo(ProcessPage(redirect_path))
| "Filter nones" >> beam.Filter(lambda data: data != [])
| "Unroll" >> beam.FlatMap(lambda data: data)
| "Map to tuple" >> beam.Map(lambda data: (None, data))
| "Group into batches" >> beam.GroupIntoBatches(30)
# | "window"
# >> beam.WindowInto(
# beam.window.FixedWindows(size=2),
# trigger=beam.transforms.trigger.Repeatedly(beam.transforms.trigger.AfterCount(10)),
# accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING,
# )
# | "Group all per window" >> beam.GroupByKey()
# | "Print" >> beam.Map(lambda data: print(data))
| "Index data" >> beam.ParDo(BatchIndexing(output_connection_string))
)
```

Do you have an idea of the problem? Thank you

On Mon, Jan 17, 2022 at 11:50 AM Jan Lukavský <je...@seznam.cz> wrote:
>
> Hi Giovanni,
>
> one thing that I overlooked when answering your SF question is that your
> read_records method ignores the provided offset_range_tracker. That
> seems that could be the root of the issues - the FileBasedSource is
> based in splittable DoFn [1], where your logic must cooperate with the
> offset tracker to be able to split and checkpoint reading of the source
> file.
>
> Regarding the GroupIntoBatches, I believe that should be the right
> solution, if your intent is simply to batch the input for optimizing
> some computation.
>
> Hope this helps, please feel free to reach out if you have any more
> questions.
>
> Best,
>
>   Jan
>
> [1]
> https://beam.apache.org/documentation/programming-guide/#splittable-dofns
>
> On 1/17/22 11:12, giovani ..... wrote:
> > Hello, could someone help me with this problem:
> > https://stackoverflow.com/questions/70644351/apache-beam-hanging-on-groupbykey-after-windowing-not-triggering
> > ?
> >
> > Quickly, I am having problems with python direct runner to aggregate a
> > data-driven window, simply after using a group by the data is not
> > outputted.
> >
> > Maybe I am having some problem understanding beam concepts or it is
> > something reported with the direct runner, any help will be well
> > appreciated, thank you very much!



-- 
Att, Giovani Merlin
Estudante de Controle e Automação
Identificação Ufrgs : 00260676

Re: Problem with windowing in python

Posted by Jan Lukavský <je...@seznam.cz>.
Hi Giovanni,

one thing that I overlooked when answering your SF question is that your 
read_records method ignores the provided offset_range_tracker. That 
seems that could be the root of the issues - the FileBasedSource is 
based in splittable DoFn [1], where your logic must cooperate with the 
offset tracker to be able to split and checkpoint reading of the source 
file.

Regarding the GroupIntoBatches, I believe that should be the right 
solution, if your intent is simply to batch the input for optimizing 
some computation.

Hope this helps, please feel free to reach out if you have any more 
questions.

Best,

  Jan

[1] 
https://beam.apache.org/documentation/programming-guide/#splittable-dofns

On 1/17/22 11:12, giovani ..... wrote:
> Hello, could someone help me with this problem:
> https://stackoverflow.com/questions/70644351/apache-beam-hanging-on-groupbykey-after-windowing-not-triggering
> ?
>
> Quickly, I am having problems with python direct runner to aggregate a
> data-driven window, simply after using a group by the data is not
> outputted.
>
> Maybe I am having some problem understanding beam concepts or it is
> something reported with the direct runner, any help will be well
> appreciated, thank you very much!