You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tucker Barbour <tu...@gmail.com> on 2021/01/18 11:26:36 UTC

Iterative algorithm in BEAM

I have a use-case where I'm extracting embedded items from
archive file formats which themselves have embedded items. For example a
zip file with emails with attachments. The goal in this example would be to
create a PCollection where each email is an element as well as each
attachment being an element. (No need to create a tree structure here.)
There are certain criteria which would prevent continuing embedded item
extraction, such as an item SHA being present in a "rejection" list. The
pipeline will perform a series of transformations on the items and then
continue to extract embedded items. This type of problem lends itself to be
solved with an iterative algorithm. My understanding is that BEAM does not
support iterative algorithms to the same extent Spark does. In BEAM I would
have to persist the results of each iteration and instantiate a new
pipeline for each iteration. This _works_ though isn't ideal. The
"rejection" list is a PCollection of a few million elements. Re-reading
this "rejection" list on each iteration isn't ideal.

Is there a way to write such an iterative algorithm in BEAM without having
to create a new pipeline on each iteration? Further, would something like
SplitableDoFn be a potential solution? Looking through the user's guide and
some existing implementations of SplitableDoFns I'm thinking not but I'm
still trying to understand SplitableDoFns.

Re: Iterative algorithm in BEAM

Posted by Tucker Barbour <tu...@gmail.com>.
The initial DoFn would basically look like this where the call to
zip.entries is able to just read the central directory of the zip file
instead of having to read the entire contents of the file

class SplitZip : DoFn<FileIO.ReadableFile, ZipEntryInfo>() {
  @ProcessElement
  fun processElement(context: ProcessElement) {
    val element = context.element()
    val zip = openZipFile(element)
    for (entry in zip.entries) {
        val info = ZipEntryInfo.of(
          offset = entry.offset,
          compressedSize = entry.compressedSize,
          compressionMethod = entry.compressionMethod
      )
      context.output(info)
    }
}

If I understand correctly a downstream SDF would help to prevent issues
with fusion of the output ZipEntryInfo PCollection. Though I'm not clear on
what an SDF would look like in this case. It needs the FileIO.ReadableFile
to get a SeekableByteChannel to read the zip and a set of ZipEntryInfo
elements from the PCollection output in the previous SplitZip DoFn. My
initial thought would be to randomly partition the PCollection of
ZipEntryInfo to avoid fusion issues by randomly generating keys, creating
KV pairs, grouping by key, and then stripping the key. Though this seems
like what SDF is trying to help prevent.

On Thu, Jan 21, 2021 at 9:00 PM Tucker Barbour <tu...@gmail.com>
wrote:

> I do have an indeterminate nested depth which has always be the source of
> issues. However, if I can use an SDF to read entries from a zip in parallel
> it might be acceptable to do the rest of the item extraction in a single
> DoFn, i.e. a call to `processElement` will recursively extract all
> attachments from an email and embedded items from within those attachments.
> I'm looking at our data but my guess is that the breadth and depth of the
> tree of items stemming from a single email is small. However, worst case,
> we get a zip file containing a bunch of large zip files.
>
> I'm looking at writing an SDF to handle splitting the zip. However, I
> don't think the provided restrictions will work in this case. The
> OffsetRange and ByteKeyRange don't seem to fit this model. Though I'm still
> thinking through the problem and trying to understand SDFs. It seems a
> ClaimedSet, as referenced in the SDF proposal in the Filepattern Watcher,
> might be closer to what I want. Though it seems like maybe what you're
> suggesting is a regular DoFn will read the central directory and generate
> the (offset, size) pairs and _then_ apply an SDF?
>
> Could you elaborate more on your last point about maximum depth? I'm not
> sure I fully understand. Is the suggestion to hard code a maximum number of
> iterations to compute rather than recursing through the indeterminate depth
> of the tree? Would this allow for doing the iteration "inside" the pipeline?
>
> And thank you for the response. This has been helpful.
>
> On Thu, Jan 21, 2021 at 7:18 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Your idea works: first read the directory metadata and then downstream an
>> SDF that jumps to file offsets. This is very much what SDF is for.
>> Splitting within a zip entry will not be useful.
>>
>> If you have indeterminate nesting depth, you do need iterative
>> computation. Beam doesn't directly support this. Your initial idea works,
>> doing iteration outside the pipeline, but I would guess is more expensive
>> than needed. In a streaming pipeline you can also create a back edge with a
>> Pubsub or Kafka queue. We don't have any library to do it for you because
>> the details of watermarks are complex and not fully worked out in the Beam
>> context.
>>
>> But in your case, there is a certain maximum depth of nesting that makes
>> sense. For example you can unroll the loop to have 100 stages of iteration,
>> and any element that is "done" skips the rest of the stages. You can have
>> the last stage of iteration write to a special sink for unusual messages.
>>
>> Adding dev@ to fact check me :-)
>>
>> Kenn
>>
>> On Thu, Jan 21, 2021 at 10:05 AM Tucker Barbour <tu...@gmail.com>
>> wrote:
>>
>>> Getting away from BEAM a bit, but there _could_ be a way to enable
>>> parallel reads of a single zip by first reading just the central directory
>>> and generating elements in an output PCollection containing a triple of
>>> byte offset of an entry, the compressed size of the entry, and the type of
>>> compression used for the entry. This triple (offset, compressedSize,
>>> compressionType) could enable parallel reads of a single zip file. However,
>>> I'm not sure this would work with SplittableDoFn -- haven't figured that
>>> out yet.
>>>
>>> In terms of the do/while loop, that seems to be the only way to do this
>>> type of looping with BEAM. Otherwise, I'd have to consider a different
>>> algorithm to avoid a loop all together.
>>>
>>> On Thu, Jan 21, 2021 at 5:07 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> I guess that the main issue here is that Zip format is not splittable.
>>>> So if it’s one huge zip file, you still won’t be able to parallelise the
>>>> read. So, if you could unzip such file in advance and read the path with
>>>> these files from zip in your pipeline, then it should be automatically read
>>>> in parallel. Otherwise, one zip file (despite of the size) can be processed
>>>> only by one single worker (instance of DoFn).
>>>>
>>>> On 21 Jan 2021, at 12:44, Tucker Barbour <tu...@gmail.com>
>>>> wrote:
>>>>
>>>> There isn't a problem with the output being a flattened PCollection. I
>>>> think that's actually what I'm intending to accomplish. A naive
>>>> implementation would have a DoFn take a zip file as input (probably as
>>>> FileIO.ReadableFile), recursively extract elements from the zip, i.e.
>>>> extract an email from the zip and then immediately extract the attachments
>>>> from an email and so on. For each item where an item is either an email or
>>>> an attachment we output each item as an individual record to the output
>>>> PCollection. This would create a flattened PCollection for all items in the
>>>> file hierarchy. However, this becomes problematic when the zip file is
>>>> large, several million files. To avoid doing all the work in a single DoFn,
>>>> I've implemented this iterative solution with a simple do/while:
>>>>
>>>> fun main(args: Array<String>) {
>>>>   do {
>>>>     val (pipeline, options) = createPipelineAndOptions(args)
>>>>
>>>>     val data = pipeline.apply(readFilesToExtract(options))
>>>>     val exclusionSet = pipeline.apply(readExclusionSet(options))
>>>>
>>>>     // Could use side input or group by key
>>>>     val inclusionSet = filter(data, exclusionSet)
>>>>
>>>>     val extractedItems =
>>>> inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))
>>>>
>>>>     extractedItems.apply(WriteExtractedItemsToDatabase())
>>>>   } while (hasExtractedItems(pipeline))
>>>> }
>>>>
>>>> Each call to ExtractEmbeddedItems only extracts a single level in the
>>>> file hierarchy. For example, the first iteration would extract all emails
>>>> in a zip but _not_ extract attachments. The second iteration would extract
>>>> all the attachments from the emails found in iteration 1. And so on. This
>>>> _works_ but isn't ideal. Especially figuring out whether to continue with
>>>> the do/while loop or terminate. It is better than extracting all
>>>> elements in a single DoFn since we gain parallelism in subsequent
>>>> iterations.
>>>>
>>>> After reading more about SplitableDoFn, it seems like a SplitableDoFn
>>>> might be a better option for solving this problem but I'm still trying to
>>>> figure that out. A DoFn can create element/restriction pairs where the
>>>> element is a FileIO.ReadableFile and the restriction is a set of items in
>>>> the Zip. I'm not sure using the OffsetRange works here because of how the
>>>> Zip format works so may need to explore writing a custom restriction and
>>>> tracker.
>>>>
>>>> On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> Thank you for details! I’ll try to share some of my thoughts on this.
>>>>>
>>>>> Well, maybe I don’t understand something but what is a problem to have
>>>>> a "flattened” PCollection as an output of your ParDo that reads input zip
>>>>> file(s)? For example, if input element in a file is an email, then your
>>>>> DoFn can create several outputs depending on the structure of this email.
>>>>> Of course, the order of elements in output PCollection won’t be guaranteed
>>>>> but all needed later information can be saved in the structure of output
>>>>> record (like POJO or AvroRecord).
>>>>>
>>>>> Also, if I understand correctly, then on every precessing step you
>>>>> need to reject some records depending on SHA from already known rejection
>>>>> list. So, if it’s possible to calculate this SHA on the “Read” step for
>>>>> every record, then you can use either SideInput or GroupByKey transform
>>>>> (where key is SHA) to filter the records.
>>>>>
>>>>> Please, let me know if I missed something.
>>>>>
>>>>> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> My initial thought is the latter -- the output PCollection would be a
>>>>> PCollection<Record> where Record can be either an email or attachment. A
>>>>> Record would still need to have an attribute referencing its "parent". For
>>>>> example an email Record would have a unique identifier, e.g. ID, and any
>>>>> attachment Record would have a reference to it's parent email, e.g.
>>>>> parentID. However, outputting pairs might also work and may be a better
>>>>> option considering the need to maintain the relationship between a parent
>>>>> and child. We're basically building a tree. An additional wrinkle is that
>>>>> attachments may themselves have embedded items which would also need to be
>>>>> represented in the output PCollection as Records. For example, an email
>>>>> with an attachment which itself is a zip of Word documents. The structure
>>>>> of this file hierarchy is not known ahead of time.
>>>>>
>>>>> The input is expected to be a PCollection of one or more (though
>>>>> usually in the order of 10s not anything like millions) zip files or other
>>>>> archive file formats. The output is expected to be a PCollection whose
>>>>> elements are nodes in the file hierarchy. If a zip file where to have the
>>>>> following structure
>>>>>
>>>>> - Top Level Zip File
>>>>> `-> Email 001
>>>>>   `-> Attachment A
>>>>>     `-> Embedded Document A01
>>>>>     `-> Embedded Document A02
>>>>>  `-> Attachment B
>>>>>    `-> Embedded Document B01
>>>>> `-> Email 002
>>>>>   `-> Attachment C
>>>>>     `-> Embedded Document C01
>>>>>
>>>>> We'd expect an output PCollection whose elements are:
>>>>>
>>>>> Email001, Attachment A, Embedded Document A01, Embedded Document A02,
>>>>> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
>>>>> Document C01.
>>>>>
>>>>> We'd then perform further PTransforms on this "flattened" PCollection.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>
>>>>>> What is exactly an output PCollection in your example? Is it just a
>>>>>> PCollection of pairs (email and attachment) or it’s like
>>>>>> PCollection<Record>, where Record can be either email or attachment? Or it
>>>>>> is something else?
>>>>>>
>>>>>> Could you add a simple example with expected input/output of your
>>>>>> pipeline?
>>>>>>
>>>>>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > I have a use-case where I'm extracting embedded items from archive
>>>>>> file formats which themselves have embedded items. For example a zip file
>>>>>> with emails with attachments. The goal in this example would be to create a
>>>>>> PCollection where each email is an element as well as each attachment being
>>>>>> an element. (No need to create a tree structure here.) There are certain
>>>>>> criteria which would prevent continuing embedded item extraction, such as
>>>>>> an item SHA being present in a "rejection" list. The pipeline will perform
>>>>>> a series of transformations on the items and then continue to extract
>>>>>> embedded items. This type of problem lends itself to be solved with an
>>>>>> iterative algorithm. My understanding is that BEAM does not support
>>>>>> iterative algorithms to the same extent Spark does. In BEAM I would have to
>>>>>> persist the results of each iteration and instantiate a new pipeline for
>>>>>> each iteration. This _works_ though isn't ideal. The "rejection" list is a
>>>>>> PCollection of a few million elements. Re-reading this "rejection" list on
>>>>>> each iteration isn't ideal.
>>>>>> >
>>>>>> > Is there a way to write such an iterative algorithm in BEAM without
>>>>>> having to create a new pipeline on each iteration? Further, would something
>>>>>> like SplitableDoFn be a potential solution? Looking through the user's
>>>>>> guide and some existing implementations of SplitableDoFns I'm thinking not
>>>>>> but I'm still trying to understand SplitableDoFns.
>>>>>>
>>>>>>
>>>>>
>>>>

Re: Iterative algorithm in BEAM

Posted by Tucker Barbour <tu...@gmail.com>.
The initial DoFn would basically look like this where the call to
zip.entries is able to just read the central directory of the zip file
instead of having to read the entire contents of the file

class SplitZip : DoFn<FileIO.ReadableFile, ZipEntryInfo>() {
  @ProcessElement
  fun processElement(context: ProcessElement) {
    val element = context.element()
    val zip = openZipFile(element)
    for (entry in zip.entries) {
        val info = ZipEntryInfo.of(
          offset = entry.offset,
          compressedSize = entry.compressedSize,
          compressionMethod = entry.compressionMethod
      )
      context.output(info)
    }
}

If I understand correctly a downstream SDF would help to prevent issues
with fusion of the output ZipEntryInfo PCollection. Though I'm not clear on
what an SDF would look like in this case. It needs the FileIO.ReadableFile
to get a SeekableByteChannel to read the zip and a set of ZipEntryInfo
elements from the PCollection output in the previous SplitZip DoFn. My
initial thought would be to randomly partition the PCollection of
ZipEntryInfo to avoid fusion issues by randomly generating keys, creating
KV pairs, grouping by key, and then stripping the key. Though this seems
like what SDF is trying to help prevent.

On Thu, Jan 21, 2021 at 9:00 PM Tucker Barbour <tu...@gmail.com>
wrote:

> I do have an indeterminate nested depth which has always be the source of
> issues. However, if I can use an SDF to read entries from a zip in parallel
> it might be acceptable to do the rest of the item extraction in a single
> DoFn, i.e. a call to `processElement` will recursively extract all
> attachments from an email and embedded items from within those attachments.
> I'm looking at our data but my guess is that the breadth and depth of the
> tree of items stemming from a single email is small. However, worst case,
> we get a zip file containing a bunch of large zip files.
>
> I'm looking at writing an SDF to handle splitting the zip. However, I
> don't think the provided restrictions will work in this case. The
> OffsetRange and ByteKeyRange don't seem to fit this model. Though I'm still
> thinking through the problem and trying to understand SDFs. It seems a
> ClaimedSet, as referenced in the SDF proposal in the Filepattern Watcher,
> might be closer to what I want. Though it seems like maybe what you're
> suggesting is a regular DoFn will read the central directory and generate
> the (offset, size) pairs and _then_ apply an SDF?
>
> Could you elaborate more on your last point about maximum depth? I'm not
> sure I fully understand. Is the suggestion to hard code a maximum number of
> iterations to compute rather than recursing through the indeterminate depth
> of the tree? Would this allow for doing the iteration "inside" the pipeline?
>
> And thank you for the response. This has been helpful.
>
> On Thu, Jan 21, 2021 at 7:18 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Your idea works: first read the directory metadata and then downstream an
>> SDF that jumps to file offsets. This is very much what SDF is for.
>> Splitting within a zip entry will not be useful.
>>
>> If you have indeterminate nesting depth, you do need iterative
>> computation. Beam doesn't directly support this. Your initial idea works,
>> doing iteration outside the pipeline, but I would guess is more expensive
>> than needed. In a streaming pipeline you can also create a back edge with a
>> Pubsub or Kafka queue. We don't have any library to do it for you because
>> the details of watermarks are complex and not fully worked out in the Beam
>> context.
>>
>> But in your case, there is a certain maximum depth of nesting that makes
>> sense. For example you can unroll the loop to have 100 stages of iteration,
>> and any element that is "done" skips the rest of the stages. You can have
>> the last stage of iteration write to a special sink for unusual messages.
>>
>> Adding dev@ to fact check me :-)
>>
>> Kenn
>>
>> On Thu, Jan 21, 2021 at 10:05 AM Tucker Barbour <tu...@gmail.com>
>> wrote:
>>
>>> Getting away from BEAM a bit, but there _could_ be a way to enable
>>> parallel reads of a single zip by first reading just the central directory
>>> and generating elements in an output PCollection containing a triple of
>>> byte offset of an entry, the compressed size of the entry, and the type of
>>> compression used for the entry. This triple (offset, compressedSize,
>>> compressionType) could enable parallel reads of a single zip file. However,
>>> I'm not sure this would work with SplittableDoFn -- haven't figured that
>>> out yet.
>>>
>>> In terms of the do/while loop, that seems to be the only way to do this
>>> type of looping with BEAM. Otherwise, I'd have to consider a different
>>> algorithm to avoid a loop all together.
>>>
>>> On Thu, Jan 21, 2021 at 5:07 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> I guess that the main issue here is that Zip format is not splittable.
>>>> So if it’s one huge zip file, you still won’t be able to parallelise the
>>>> read. So, if you could unzip such file in advance and read the path with
>>>> these files from zip in your pipeline, then it should be automatically read
>>>> in parallel. Otherwise, one zip file (despite of the size) can be processed
>>>> only by one single worker (instance of DoFn).
>>>>
>>>> On 21 Jan 2021, at 12:44, Tucker Barbour <tu...@gmail.com>
>>>> wrote:
>>>>
>>>> There isn't a problem with the output being a flattened PCollection. I
>>>> think that's actually what I'm intending to accomplish. A naive
>>>> implementation would have a DoFn take a zip file as input (probably as
>>>> FileIO.ReadableFile), recursively extract elements from the zip, i.e.
>>>> extract an email from the zip and then immediately extract the attachments
>>>> from an email and so on. For each item where an item is either an email or
>>>> an attachment we output each item as an individual record to the output
>>>> PCollection. This would create a flattened PCollection for all items in the
>>>> file hierarchy. However, this becomes problematic when the zip file is
>>>> large, several million files. To avoid doing all the work in a single DoFn,
>>>> I've implemented this iterative solution with a simple do/while:
>>>>
>>>> fun main(args: Array<String>) {
>>>>   do {
>>>>     val (pipeline, options) = createPipelineAndOptions(args)
>>>>
>>>>     val data = pipeline.apply(readFilesToExtract(options))
>>>>     val exclusionSet = pipeline.apply(readExclusionSet(options))
>>>>
>>>>     // Could use side input or group by key
>>>>     val inclusionSet = filter(data, exclusionSet)
>>>>
>>>>     val extractedItems =
>>>> inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))
>>>>
>>>>     extractedItems.apply(WriteExtractedItemsToDatabase())
>>>>   } while (hasExtractedItems(pipeline))
>>>> }
>>>>
>>>> Each call to ExtractEmbeddedItems only extracts a single level in the
>>>> file hierarchy. For example, the first iteration would extract all emails
>>>> in a zip but _not_ extract attachments. The second iteration would extract
>>>> all the attachments from the emails found in iteration 1. And so on. This
>>>> _works_ but isn't ideal. Especially figuring out whether to continue with
>>>> the do/while loop or terminate. It is better than extracting all
>>>> elements in a single DoFn since we gain parallelism in subsequent
>>>> iterations.
>>>>
>>>> After reading more about SplitableDoFn, it seems like a SplitableDoFn
>>>> might be a better option for solving this problem but I'm still trying to
>>>> figure that out. A DoFn can create element/restriction pairs where the
>>>> element is a FileIO.ReadableFile and the restriction is a set of items in
>>>> the Zip. I'm not sure using the OffsetRange works here because of how the
>>>> Zip format works so may need to explore writing a custom restriction and
>>>> tracker.
>>>>
>>>> On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> Thank you for details! I’ll try to share some of my thoughts on this.
>>>>>
>>>>> Well, maybe I don’t understand something but what is a problem to have
>>>>> a "flattened” PCollection as an output of your ParDo that reads input zip
>>>>> file(s)? For example, if input element in a file is an email, then your
>>>>> DoFn can create several outputs depending on the structure of this email.
>>>>> Of course, the order of elements in output PCollection won’t be guaranteed
>>>>> but all needed later information can be saved in the structure of output
>>>>> record (like POJO or AvroRecord).
>>>>>
>>>>> Also, if I understand correctly, then on every precessing step you
>>>>> need to reject some records depending on SHA from already known rejection
>>>>> list. So, if it’s possible to calculate this SHA on the “Read” step for
>>>>> every record, then you can use either SideInput or GroupByKey transform
>>>>> (where key is SHA) to filter the records.
>>>>>
>>>>> Please, let me know if I missed something.
>>>>>
>>>>> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> My initial thought is the latter -- the output PCollection would be a
>>>>> PCollection<Record> where Record can be either an email or attachment. A
>>>>> Record would still need to have an attribute referencing its "parent". For
>>>>> example an email Record would have a unique identifier, e.g. ID, and any
>>>>> attachment Record would have a reference to it's parent email, e.g.
>>>>> parentID. However, outputting pairs might also work and may be a better
>>>>> option considering the need to maintain the relationship between a parent
>>>>> and child. We're basically building a tree. An additional wrinkle is that
>>>>> attachments may themselves have embedded items which would also need to be
>>>>> represented in the output PCollection as Records. For example, an email
>>>>> with an attachment which itself is a zip of Word documents. The structure
>>>>> of this file hierarchy is not known ahead of time.
>>>>>
>>>>> The input is expected to be a PCollection of one or more (though
>>>>> usually in the order of 10s not anything like millions) zip files or other
>>>>> archive file formats. The output is expected to be a PCollection whose
>>>>> elements are nodes in the file hierarchy. If a zip file where to have the
>>>>> following structure
>>>>>
>>>>> - Top Level Zip File
>>>>> `-> Email 001
>>>>>   `-> Attachment A
>>>>>     `-> Embedded Document A01
>>>>>     `-> Embedded Document A02
>>>>>  `-> Attachment B
>>>>>    `-> Embedded Document B01
>>>>> `-> Email 002
>>>>>   `-> Attachment C
>>>>>     `-> Embedded Document C01
>>>>>
>>>>> We'd expect an output PCollection whose elements are:
>>>>>
>>>>> Email001, Attachment A, Embedded Document A01, Embedded Document A02,
>>>>> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
>>>>> Document C01.
>>>>>
>>>>> We'd then perform further PTransforms on this "flattened" PCollection.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>
>>>>>> What is exactly an output PCollection in your example? Is it just a
>>>>>> PCollection of pairs (email and attachment) or it’s like
>>>>>> PCollection<Record>, where Record can be either email or attachment? Or it
>>>>>> is something else?
>>>>>>
>>>>>> Could you add a simple example with expected input/output of your
>>>>>> pipeline?
>>>>>>
>>>>>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> > I have a use-case where I'm extracting embedded items from archive
>>>>>> file formats which themselves have embedded items. For example a zip file
>>>>>> with emails with attachments. The goal in this example would be to create a
>>>>>> PCollection where each email is an element as well as each attachment being
>>>>>> an element. (No need to create a tree structure here.) There are certain
>>>>>> criteria which would prevent continuing embedded item extraction, such as
>>>>>> an item SHA being present in a "rejection" list. The pipeline will perform
>>>>>> a series of transformations on the items and then continue to extract
>>>>>> embedded items. This type of problem lends itself to be solved with an
>>>>>> iterative algorithm. My understanding is that BEAM does not support
>>>>>> iterative algorithms to the same extent Spark does. In BEAM I would have to
>>>>>> persist the results of each iteration and instantiate a new pipeline for
>>>>>> each iteration. This _works_ though isn't ideal. The "rejection" list is a
>>>>>> PCollection of a few million elements. Re-reading this "rejection" list on
>>>>>> each iteration isn't ideal.
>>>>>> >
>>>>>> > Is there a way to write such an iterative algorithm in BEAM without
>>>>>> having to create a new pipeline on each iteration? Further, would something
>>>>>> like SplitableDoFn be a potential solution? Looking through the user's
>>>>>> guide and some existing implementations of SplitableDoFns I'm thinking not
>>>>>> but I'm still trying to understand SplitableDoFns.
>>>>>>
>>>>>>
>>>>>
>>>>

Re: Iterative algorithm in BEAM

Posted by Tucker Barbour <tu...@gmail.com>.
I do have an indeterminate nested depth which has always be the source of
issues. However, if I can use an SDF to read entries from a zip in parallel
it might be acceptable to do the rest of the item extraction in a single
DoFn, i.e. a call to `processElement` will recursively extract all
attachments from an email and embedded items from within those attachments.
I'm looking at our data but my guess is that the breadth and depth of the
tree of items stemming from a single email is small. However, worst case,
we get a zip file containing a bunch of large zip files.

I'm looking at writing an SDF to handle splitting the zip. However, I don't
think the provided restrictions will work in this case. The OffsetRange and
ByteKeyRange don't seem to fit this model. Though I'm still thinking
through the problem and trying to understand SDFs. It seems a ClaimedSet,
as referenced in the SDF proposal in the Filepattern Watcher, might be
closer to what I want. Though it seems like maybe what you're suggesting is
a regular DoFn will read the central directory and generate the (offset,
size) pairs and _then_ apply an SDF?

Could you elaborate more on your last point about maximum depth? I'm not
sure I fully understand. Is the suggestion to hard code a maximum number of
iterations to compute rather than recursing through the indeterminate depth
of the tree? Would this allow for doing the iteration "inside" the pipeline?

And thank you for the response. This has been helpful.

On Thu, Jan 21, 2021 at 7:18 PM Kenneth Knowles <ke...@apache.org> wrote:

> Your idea works: first read the directory metadata and then downstream an
> SDF that jumps to file offsets. This is very much what SDF is for.
> Splitting within a zip entry will not be useful.
>
> If you have indeterminate nesting depth, you do need iterative
> computation. Beam doesn't directly support this. Your initial idea works,
> doing iteration outside the pipeline, but I would guess is more expensive
> than needed. In a streaming pipeline you can also create a back edge with a
> Pubsub or Kafka queue. We don't have any library to do it for you because
> the details of watermarks are complex and not fully worked out in the Beam
> context.
>
> But in your case, there is a certain maximum depth of nesting that makes
> sense. For example you can unroll the loop to have 100 stages of iteration,
> and any element that is "done" skips the rest of the stages. You can have
> the last stage of iteration write to a special sink for unusual messages.
>
> Adding dev@ to fact check me :-)
>
> Kenn
>
> On Thu, Jan 21, 2021 at 10:05 AM Tucker Barbour <tu...@gmail.com>
> wrote:
>
>> Getting away from BEAM a bit, but there _could_ be a way to enable
>> parallel reads of a single zip by first reading just the central directory
>> and generating elements in an output PCollection containing a triple of
>> byte offset of an entry, the compressed size of the entry, and the type of
>> compression used for the entry. This triple (offset, compressedSize,
>> compressionType) could enable parallel reads of a single zip file. However,
>> I'm not sure this would work with SplittableDoFn -- haven't figured that
>> out yet.
>>
>> In terms of the do/while loop, that seems to be the only way to do this
>> type of looping with BEAM. Otherwise, I'd have to consider a different
>> algorithm to avoid a loop all together.
>>
>> On Thu, Jan 21, 2021 at 5:07 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> I guess that the main issue here is that Zip format is not splittable.
>>> So if it’s one huge zip file, you still won’t be able to parallelise the
>>> read. So, if you could unzip such file in advance and read the path with
>>> these files from zip in your pipeline, then it should be automatically read
>>> in parallel. Otherwise, one zip file (despite of the size) can be processed
>>> only by one single worker (instance of DoFn).
>>>
>>> On 21 Jan 2021, at 12:44, Tucker Barbour <tu...@gmail.com>
>>> wrote:
>>>
>>> There isn't a problem with the output being a flattened PCollection. I
>>> think that's actually what I'm intending to accomplish. A naive
>>> implementation would have a DoFn take a zip file as input (probably as
>>> FileIO.ReadableFile), recursively extract elements from the zip, i.e.
>>> extract an email from the zip and then immediately extract the attachments
>>> from an email and so on. For each item where an item is either an email or
>>> an attachment we output each item as an individual record to the output
>>> PCollection. This would create a flattened PCollection for all items in the
>>> file hierarchy. However, this becomes problematic when the zip file is
>>> large, several million files. To avoid doing all the work in a single DoFn,
>>> I've implemented this iterative solution with a simple do/while:
>>>
>>> fun main(args: Array<String>) {
>>>   do {
>>>     val (pipeline, options) = createPipelineAndOptions(args)
>>>
>>>     val data = pipeline.apply(readFilesToExtract(options))
>>>     val exclusionSet = pipeline.apply(readExclusionSet(options))
>>>
>>>     // Could use side input or group by key
>>>     val inclusionSet = filter(data, exclusionSet)
>>>
>>>     val extractedItems =
>>> inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))
>>>
>>>     extractedItems.apply(WriteExtractedItemsToDatabase())
>>>   } while (hasExtractedItems(pipeline))
>>> }
>>>
>>> Each call to ExtractEmbeddedItems only extracts a single level in the
>>> file hierarchy. For example, the first iteration would extract all emails
>>> in a zip but _not_ extract attachments. The second iteration would extract
>>> all the attachments from the emails found in iteration 1. And so on. This
>>> _works_ but isn't ideal. Especially figuring out whether to continue with
>>> the do/while loop or terminate. It is better than extracting all
>>> elements in a single DoFn since we gain parallelism in subsequent
>>> iterations.
>>>
>>> After reading more about SplitableDoFn, it seems like a SplitableDoFn
>>> might be a better option for solving this problem but I'm still trying to
>>> figure that out. A DoFn can create element/restriction pairs where the
>>> element is a FileIO.ReadableFile and the restriction is a set of items in
>>> the Zip. I'm not sure using the OffsetRange works here because of how the
>>> Zip format works so may need to explore writing a custom restriction and
>>> tracker.
>>>
>>> On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> Thank you for details! I’ll try to share some of my thoughts on this.
>>>>
>>>> Well, maybe I don’t understand something but what is a problem to have
>>>> a "flattened” PCollection as an output of your ParDo that reads input zip
>>>> file(s)? For example, if input element in a file is an email, then your
>>>> DoFn can create several outputs depending on the structure of this email.
>>>> Of course, the order of elements in output PCollection won’t be guaranteed
>>>> but all needed later information can be saved in the structure of output
>>>> record (like POJO or AvroRecord).
>>>>
>>>> Also, if I understand correctly, then on every precessing step you need
>>>> to reject some records depending on SHA from already known rejection list.
>>>> So, if it’s possible to calculate this SHA on the “Read” step for every
>>>> record, then you can use either SideInput or GroupByKey transform (where
>>>> key is SHA) to filter the records.
>>>>
>>>> Please, let me know if I missed something.
>>>>
>>>> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com>
>>>> wrote:
>>>>
>>>> My initial thought is the latter -- the output PCollection would be a
>>>> PCollection<Record> where Record can be either an email or attachment. A
>>>> Record would still need to have an attribute referencing its "parent". For
>>>> example an email Record would have a unique identifier, e.g. ID, and any
>>>> attachment Record would have a reference to it's parent email, e.g.
>>>> parentID. However, outputting pairs might also work and may be a better
>>>> option considering the need to maintain the relationship between a parent
>>>> and child. We're basically building a tree. An additional wrinkle is that
>>>> attachments may themselves have embedded items which would also need to be
>>>> represented in the output PCollection as Records. For example, an email
>>>> with an attachment which itself is a zip of Word documents. The structure
>>>> of this file hierarchy is not known ahead of time.
>>>>
>>>> The input is expected to be a PCollection of one or more (though
>>>> usually in the order of 10s not anything like millions) zip files or other
>>>> archive file formats. The output is expected to be a PCollection whose
>>>> elements are nodes in the file hierarchy. If a zip file where to have the
>>>> following structure
>>>>
>>>> - Top Level Zip File
>>>> `-> Email 001
>>>>   `-> Attachment A
>>>>     `-> Embedded Document A01
>>>>     `-> Embedded Document A02
>>>>  `-> Attachment B
>>>>    `-> Embedded Document B01
>>>> `-> Email 002
>>>>   `-> Attachment C
>>>>     `-> Embedded Document C01
>>>>
>>>> We'd expect an output PCollection whose elements are:
>>>>
>>>> Email001, Attachment A, Embedded Document A01, Embedded Document A02,
>>>> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
>>>> Document C01.
>>>>
>>>> We'd then perform further PTransforms on this "flattened" PCollection.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> What is exactly an output PCollection in your example? Is it just a
>>>>> PCollection of pairs (email and attachment) or it’s like
>>>>> PCollection<Record>, where Record can be either email or attachment? Or it
>>>>> is something else?
>>>>>
>>>>> Could you add a simple example with expected input/output of your
>>>>> pipeline?
>>>>>
>>>>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> > I have a use-case where I'm extracting embedded items from archive
>>>>> file formats which themselves have embedded items. For example a zip file
>>>>> with emails with attachments. The goal in this example would be to create a
>>>>> PCollection where each email is an element as well as each attachment being
>>>>> an element. (No need to create a tree structure here.) There are certain
>>>>> criteria which would prevent continuing embedded item extraction, such as
>>>>> an item SHA being present in a "rejection" list. The pipeline will perform
>>>>> a series of transformations on the items and then continue to extract
>>>>> embedded items. This type of problem lends itself to be solved with an
>>>>> iterative algorithm. My understanding is that BEAM does not support
>>>>> iterative algorithms to the same extent Spark does. In BEAM I would have to
>>>>> persist the results of each iteration and instantiate a new pipeline for
>>>>> each iteration. This _works_ though isn't ideal. The "rejection" list is a
>>>>> PCollection of a few million elements. Re-reading this "rejection" list on
>>>>> each iteration isn't ideal.
>>>>> >
>>>>> > Is there a way to write such an iterative algorithm in BEAM without
>>>>> having to create a new pipeline on each iteration? Further, would something
>>>>> like SplitableDoFn be a potential solution? Looking through the user's
>>>>> guide and some existing implementations of SplitableDoFns I'm thinking not
>>>>> but I'm still trying to understand SplitableDoFns.
>>>>>
>>>>>
>>>>
>>>

Re: Iterative algorithm in BEAM

Posted by Tucker Barbour <tu...@gmail.com>.
I do have an indeterminate nested depth which has always be the source of
issues. However, if I can use an SDF to read entries from a zip in parallel
it might be acceptable to do the rest of the item extraction in a single
DoFn, i.e. a call to `processElement` will recursively extract all
attachments from an email and embedded items from within those attachments.
I'm looking at our data but my guess is that the breadth and depth of the
tree of items stemming from a single email is small. However, worst case,
we get a zip file containing a bunch of large zip files.

I'm looking at writing an SDF to handle splitting the zip. However, I don't
think the provided restrictions will work in this case. The OffsetRange and
ByteKeyRange don't seem to fit this model. Though I'm still thinking
through the problem and trying to understand SDFs. It seems a ClaimedSet,
as referenced in the SDF proposal in the Filepattern Watcher, might be
closer to what I want. Though it seems like maybe what you're suggesting is
a regular DoFn will read the central directory and generate the (offset,
size) pairs and _then_ apply an SDF?

Could you elaborate more on your last point about maximum depth? I'm not
sure I fully understand. Is the suggestion to hard code a maximum number of
iterations to compute rather than recursing through the indeterminate depth
of the tree? Would this allow for doing the iteration "inside" the pipeline?

And thank you for the response. This has been helpful.

On Thu, Jan 21, 2021 at 7:18 PM Kenneth Knowles <ke...@apache.org> wrote:

> Your idea works: first read the directory metadata and then downstream an
> SDF that jumps to file offsets. This is very much what SDF is for.
> Splitting within a zip entry will not be useful.
>
> If you have indeterminate nesting depth, you do need iterative
> computation. Beam doesn't directly support this. Your initial idea works,
> doing iteration outside the pipeline, but I would guess is more expensive
> than needed. In a streaming pipeline you can also create a back edge with a
> Pubsub or Kafka queue. We don't have any library to do it for you because
> the details of watermarks are complex and not fully worked out in the Beam
> context.
>
> But in your case, there is a certain maximum depth of nesting that makes
> sense. For example you can unroll the loop to have 100 stages of iteration,
> and any element that is "done" skips the rest of the stages. You can have
> the last stage of iteration write to a special sink for unusual messages.
>
> Adding dev@ to fact check me :-)
>
> Kenn
>
> On Thu, Jan 21, 2021 at 10:05 AM Tucker Barbour <tu...@gmail.com>
> wrote:
>
>> Getting away from BEAM a bit, but there _could_ be a way to enable
>> parallel reads of a single zip by first reading just the central directory
>> and generating elements in an output PCollection containing a triple of
>> byte offset of an entry, the compressed size of the entry, and the type of
>> compression used for the entry. This triple (offset, compressedSize,
>> compressionType) could enable parallel reads of a single zip file. However,
>> I'm not sure this would work with SplittableDoFn -- haven't figured that
>> out yet.
>>
>> In terms of the do/while loop, that seems to be the only way to do this
>> type of looping with BEAM. Otherwise, I'd have to consider a different
>> algorithm to avoid a loop all together.
>>
>> On Thu, Jan 21, 2021 at 5:07 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> I guess that the main issue here is that Zip format is not splittable.
>>> So if it’s one huge zip file, you still won’t be able to parallelise the
>>> read. So, if you could unzip such file in advance and read the path with
>>> these files from zip in your pipeline, then it should be automatically read
>>> in parallel. Otherwise, one zip file (despite of the size) can be processed
>>> only by one single worker (instance of DoFn).
>>>
>>> On 21 Jan 2021, at 12:44, Tucker Barbour <tu...@gmail.com>
>>> wrote:
>>>
>>> There isn't a problem with the output being a flattened PCollection. I
>>> think that's actually what I'm intending to accomplish. A naive
>>> implementation would have a DoFn take a zip file as input (probably as
>>> FileIO.ReadableFile), recursively extract elements from the zip, i.e.
>>> extract an email from the zip and then immediately extract the attachments
>>> from an email and so on. For each item where an item is either an email or
>>> an attachment we output each item as an individual record to the output
>>> PCollection. This would create a flattened PCollection for all items in the
>>> file hierarchy. However, this becomes problematic when the zip file is
>>> large, several million files. To avoid doing all the work in a single DoFn,
>>> I've implemented this iterative solution with a simple do/while:
>>>
>>> fun main(args: Array<String>) {
>>>   do {
>>>     val (pipeline, options) = createPipelineAndOptions(args)
>>>
>>>     val data = pipeline.apply(readFilesToExtract(options))
>>>     val exclusionSet = pipeline.apply(readExclusionSet(options))
>>>
>>>     // Could use side input or group by key
>>>     val inclusionSet = filter(data, exclusionSet)
>>>
>>>     val extractedItems =
>>> inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))
>>>
>>>     extractedItems.apply(WriteExtractedItemsToDatabase())
>>>   } while (hasExtractedItems(pipeline))
>>> }
>>>
>>> Each call to ExtractEmbeddedItems only extracts a single level in the
>>> file hierarchy. For example, the first iteration would extract all emails
>>> in a zip but _not_ extract attachments. The second iteration would extract
>>> all the attachments from the emails found in iteration 1. And so on. This
>>> _works_ but isn't ideal. Especially figuring out whether to continue with
>>> the do/while loop or terminate. It is better than extracting all
>>> elements in a single DoFn since we gain parallelism in subsequent
>>> iterations.
>>>
>>> After reading more about SplitableDoFn, it seems like a SplitableDoFn
>>> might be a better option for solving this problem but I'm still trying to
>>> figure that out. A DoFn can create element/restriction pairs where the
>>> element is a FileIO.ReadableFile and the restriction is a set of items in
>>> the Zip. I'm not sure using the OffsetRange works here because of how the
>>> Zip format works so may need to explore writing a custom restriction and
>>> tracker.
>>>
>>> On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> Thank you for details! I’ll try to share some of my thoughts on this.
>>>>
>>>> Well, maybe I don’t understand something but what is a problem to have
>>>> a "flattened” PCollection as an output of your ParDo that reads input zip
>>>> file(s)? For example, if input element in a file is an email, then your
>>>> DoFn can create several outputs depending on the structure of this email.
>>>> Of course, the order of elements in output PCollection won’t be guaranteed
>>>> but all needed later information can be saved in the structure of output
>>>> record (like POJO or AvroRecord).
>>>>
>>>> Also, if I understand correctly, then on every precessing step you need
>>>> to reject some records depending on SHA from already known rejection list.
>>>> So, if it’s possible to calculate this SHA on the “Read” step for every
>>>> record, then you can use either SideInput or GroupByKey transform (where
>>>> key is SHA) to filter the records.
>>>>
>>>> Please, let me know if I missed something.
>>>>
>>>> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com>
>>>> wrote:
>>>>
>>>> My initial thought is the latter -- the output PCollection would be a
>>>> PCollection<Record> where Record can be either an email or attachment. A
>>>> Record would still need to have an attribute referencing its "parent". For
>>>> example an email Record would have a unique identifier, e.g. ID, and any
>>>> attachment Record would have a reference to it's parent email, e.g.
>>>> parentID. However, outputting pairs might also work and may be a better
>>>> option considering the need to maintain the relationship between a parent
>>>> and child. We're basically building a tree. An additional wrinkle is that
>>>> attachments may themselves have embedded items which would also need to be
>>>> represented in the output PCollection as Records. For example, an email
>>>> with an attachment which itself is a zip of Word documents. The structure
>>>> of this file hierarchy is not known ahead of time.
>>>>
>>>> The input is expected to be a PCollection of one or more (though
>>>> usually in the order of 10s not anything like millions) zip files or other
>>>> archive file formats. The output is expected to be a PCollection whose
>>>> elements are nodes in the file hierarchy. If a zip file where to have the
>>>> following structure
>>>>
>>>> - Top Level Zip File
>>>> `-> Email 001
>>>>   `-> Attachment A
>>>>     `-> Embedded Document A01
>>>>     `-> Embedded Document A02
>>>>  `-> Attachment B
>>>>    `-> Embedded Document B01
>>>> `-> Email 002
>>>>   `-> Attachment C
>>>>     `-> Embedded Document C01
>>>>
>>>> We'd expect an output PCollection whose elements are:
>>>>
>>>> Email001, Attachment A, Embedded Document A01, Embedded Document A02,
>>>> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
>>>> Document C01.
>>>>
>>>> We'd then perform further PTransforms on this "flattened" PCollection.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> What is exactly an output PCollection in your example? Is it just a
>>>>> PCollection of pairs (email and attachment) or it’s like
>>>>> PCollection<Record>, where Record can be either email or attachment? Or it
>>>>> is something else?
>>>>>
>>>>> Could you add a simple example with expected input/output of your
>>>>> pipeline?
>>>>>
>>>>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> > I have a use-case where I'm extracting embedded items from archive
>>>>> file formats which themselves have embedded items. For example a zip file
>>>>> with emails with attachments. The goal in this example would be to create a
>>>>> PCollection where each email is an element as well as each attachment being
>>>>> an element. (No need to create a tree structure here.) There are certain
>>>>> criteria which would prevent continuing embedded item extraction, such as
>>>>> an item SHA being present in a "rejection" list. The pipeline will perform
>>>>> a series of transformations on the items and then continue to extract
>>>>> embedded items. This type of problem lends itself to be solved with an
>>>>> iterative algorithm. My understanding is that BEAM does not support
>>>>> iterative algorithms to the same extent Spark does. In BEAM I would have to
>>>>> persist the results of each iteration and instantiate a new pipeline for
>>>>> each iteration. This _works_ though isn't ideal. The "rejection" list is a
>>>>> PCollection of a few million elements. Re-reading this "rejection" list on
>>>>> each iteration isn't ideal.
>>>>> >
>>>>> > Is there a way to write such an iterative algorithm in BEAM without
>>>>> having to create a new pipeline on each iteration? Further, would something
>>>>> like SplitableDoFn be a potential solution? Looking through the user's
>>>>> guide and some existing implementations of SplitableDoFns I'm thinking not
>>>>> but I'm still trying to understand SplitableDoFns.
>>>>>
>>>>>
>>>>
>>>

Re: Iterative algorithm in BEAM

Posted by Kenneth Knowles <ke...@apache.org>.
Your idea works: first read the directory metadata and then downstream an
SDF that jumps to file offsets. This is very much what SDF is for.
Splitting within a zip entry will not be useful.

If you have indeterminate nesting depth, you do need iterative computation.
Beam doesn't directly support this. Your initial idea works, doing
iteration outside the pipeline, but I would guess is more expensive than
needed. In a streaming pipeline you can also create a back edge with a
Pubsub or Kafka queue. We don't have any library to do it for you because
the details of watermarks are complex and not fully worked out in the Beam
context.

But in your case, there is a certain maximum depth of nesting that makes
sense. For example you can unroll the loop to have 100 stages of iteration,
and any element that is "done" skips the rest of the stages. You can have
the last stage of iteration write to a special sink for unusual messages.

Adding dev@ to fact check me :-)

Kenn

On Thu, Jan 21, 2021 at 10:05 AM Tucker Barbour <tu...@gmail.com>
wrote:

> Getting away from BEAM a bit, but there _could_ be a way to enable
> parallel reads of a single zip by first reading just the central directory
> and generating elements in an output PCollection containing a triple of
> byte offset of an entry, the compressed size of the entry, and the type of
> compression used for the entry. This triple (offset, compressedSize,
> compressionType) could enable parallel reads of a single zip file. However,
> I'm not sure this would work with SplittableDoFn -- haven't figured that
> out yet.
>
> In terms of the do/while loop, that seems to be the only way to do this
> type of looping with BEAM. Otherwise, I'd have to consider a different
> algorithm to avoid a loop all together.
>
> On Thu, Jan 21, 2021 at 5:07 PM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> I guess that the main issue here is that Zip format is not splittable. So
>> if it’s one huge zip file, you still won’t be able to parallelise the read.
>> So, if you could unzip such file in advance and read the path with these
>> files from zip in your pipeline, then it should be automatically read in
>> parallel. Otherwise, one zip file (despite of the size) can be processed
>> only by one single worker (instance of DoFn).
>>
>> On 21 Jan 2021, at 12:44, Tucker Barbour <tu...@gmail.com>
>> wrote:
>>
>> There isn't a problem with the output being a flattened PCollection. I
>> think that's actually what I'm intending to accomplish. A naive
>> implementation would have a DoFn take a zip file as input (probably as
>> FileIO.ReadableFile), recursively extract elements from the zip, i.e.
>> extract an email from the zip and then immediately extract the attachments
>> from an email and so on. For each item where an item is either an email or
>> an attachment we output each item as an individual record to the output
>> PCollection. This would create a flattened PCollection for all items in the
>> file hierarchy. However, this becomes problematic when the zip file is
>> large, several million files. To avoid doing all the work in a single DoFn,
>> I've implemented this iterative solution with a simple do/while:
>>
>> fun main(args: Array<String>) {
>>   do {
>>     val (pipeline, options) = createPipelineAndOptions(args)
>>
>>     val data = pipeline.apply(readFilesToExtract(options))
>>     val exclusionSet = pipeline.apply(readExclusionSet(options))
>>
>>     // Could use side input or group by key
>>     val inclusionSet = filter(data, exclusionSet)
>>
>>     val extractedItems =
>> inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))
>>
>>     extractedItems.apply(WriteExtractedItemsToDatabase())
>>   } while (hasExtractedItems(pipeline))
>> }
>>
>> Each call to ExtractEmbeddedItems only extracts a single level in the
>> file hierarchy. For example, the first iteration would extract all emails
>> in a zip but _not_ extract attachments. The second iteration would extract
>> all the attachments from the emails found in iteration 1. And so on. This
>> _works_ but isn't ideal. Especially figuring out whether to continue with
>> the do/while loop or terminate. It is better than extracting all
>> elements in a single DoFn since we gain parallelism in subsequent
>> iterations.
>>
>> After reading more about SplitableDoFn, it seems like a SplitableDoFn
>> might be a better option for solving this problem but I'm still trying to
>> figure that out. A DoFn can create element/restriction pairs where the
>> element is a FileIO.ReadableFile and the restriction is a set of items in
>> the Zip. I'm not sure using the OffsetRange works here because of how the
>> Zip format works so may need to explore writing a custom restriction and
>> tracker.
>>
>> On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Thank you for details! I’ll try to share some of my thoughts on this.
>>>
>>> Well, maybe I don’t understand something but what is a problem to have
>>> a "flattened” PCollection as an output of your ParDo that reads input zip
>>> file(s)? For example, if input element in a file is an email, then your
>>> DoFn can create several outputs depending on the structure of this email.
>>> Of course, the order of elements in output PCollection won’t be guaranteed
>>> but all needed later information can be saved in the structure of output
>>> record (like POJO or AvroRecord).
>>>
>>> Also, if I understand correctly, then on every precessing step you need
>>> to reject some records depending on SHA from already known rejection list.
>>> So, if it’s possible to calculate this SHA on the “Read” step for every
>>> record, then you can use either SideInput or GroupByKey transform (where
>>> key is SHA) to filter the records.
>>>
>>> Please, let me know if I missed something.
>>>
>>> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com>
>>> wrote:
>>>
>>> My initial thought is the latter -- the output PCollection would be a
>>> PCollection<Record> where Record can be either an email or attachment. A
>>> Record would still need to have an attribute referencing its "parent". For
>>> example an email Record would have a unique identifier, e.g. ID, and any
>>> attachment Record would have a reference to it's parent email, e.g.
>>> parentID. However, outputting pairs might also work and may be a better
>>> option considering the need to maintain the relationship between a parent
>>> and child. We're basically building a tree. An additional wrinkle is that
>>> attachments may themselves have embedded items which would also need to be
>>> represented in the output PCollection as Records. For example, an email
>>> with an attachment which itself is a zip of Word documents. The structure
>>> of this file hierarchy is not known ahead of time.
>>>
>>> The input is expected to be a PCollection of one or more (though usually
>>> in the order of 10s not anything like millions) zip files or other archive
>>> file formats. The output is expected to be a PCollection whose elements are
>>> nodes in the file hierarchy. If a zip file where to have the following
>>> structure
>>>
>>> - Top Level Zip File
>>> `-> Email 001
>>>   `-> Attachment A
>>>     `-> Embedded Document A01
>>>     `-> Embedded Document A02
>>>  `-> Attachment B
>>>    `-> Embedded Document B01
>>> `-> Email 002
>>>   `-> Attachment C
>>>     `-> Embedded Document C01
>>>
>>> We'd expect an output PCollection whose elements are:
>>>
>>> Email001, Attachment A, Embedded Document A01, Embedded Document A02,
>>> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
>>> Document C01.
>>>
>>> We'd then perform further PTransforms on this "flattened" PCollection.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> What is exactly an output PCollection in your example? Is it just a
>>>> PCollection of pairs (email and attachment) or it’s like
>>>> PCollection<Record>, where Record can be either email or attachment? Or it
>>>> is something else?
>>>>
>>>> Could you add a simple example with expected input/output of your
>>>> pipeline?
>>>>
>>>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
>>>> wrote:
>>>> >
>>>> > I have a use-case where I'm extracting embedded items from archive
>>>> file formats which themselves have embedded items. For example a zip file
>>>> with emails with attachments. The goal in this example would be to create a
>>>> PCollection where each email is an element as well as each attachment being
>>>> an element. (No need to create a tree structure here.) There are certain
>>>> criteria which would prevent continuing embedded item extraction, such as
>>>> an item SHA being present in a "rejection" list. The pipeline will perform
>>>> a series of transformations on the items and then continue to extract
>>>> embedded items. This type of problem lends itself to be solved with an
>>>> iterative algorithm. My understanding is that BEAM does not support
>>>> iterative algorithms to the same extent Spark does. In BEAM I would have to
>>>> persist the results of each iteration and instantiate a new pipeline for
>>>> each iteration. This _works_ though isn't ideal. The "rejection" list is a
>>>> PCollection of a few million elements. Re-reading this "rejection" list on
>>>> each iteration isn't ideal.
>>>> >
>>>> > Is there a way to write such an iterative algorithm in BEAM without
>>>> having to create a new pipeline on each iteration? Further, would something
>>>> like SplitableDoFn be a potential solution? Looking through the user's
>>>> guide and some existing implementations of SplitableDoFns I'm thinking not
>>>> but I'm still trying to understand SplitableDoFns.
>>>>
>>>>
>>>
>>

Re: Iterative algorithm in BEAM

Posted by Kenneth Knowles <ke...@apache.org>.
Your idea works: first read the directory metadata and then downstream an
SDF that jumps to file offsets. This is very much what SDF is for.
Splitting within a zip entry will not be useful.

If you have indeterminate nesting depth, you do need iterative computation.
Beam doesn't directly support this. Your initial idea works, doing
iteration outside the pipeline, but I would guess is more expensive than
needed. In a streaming pipeline you can also create a back edge with a
Pubsub or Kafka queue. We don't have any library to do it for you because
the details of watermarks are complex and not fully worked out in the Beam
context.

But in your case, there is a certain maximum depth of nesting that makes
sense. For example you can unroll the loop to have 100 stages of iteration,
and any element that is "done" skips the rest of the stages. You can have
the last stage of iteration write to a special sink for unusual messages.

Adding dev@ to fact check me :-)

Kenn

On Thu, Jan 21, 2021 at 10:05 AM Tucker Barbour <tu...@gmail.com>
wrote:

> Getting away from BEAM a bit, but there _could_ be a way to enable
> parallel reads of a single zip by first reading just the central directory
> and generating elements in an output PCollection containing a triple of
> byte offset of an entry, the compressed size of the entry, and the type of
> compression used for the entry. This triple (offset, compressedSize,
> compressionType) could enable parallel reads of a single zip file. However,
> I'm not sure this would work with SplittableDoFn -- haven't figured that
> out yet.
>
> In terms of the do/while loop, that seems to be the only way to do this
> type of looping with BEAM. Otherwise, I'd have to consider a different
> algorithm to avoid a loop all together.
>
> On Thu, Jan 21, 2021 at 5:07 PM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> I guess that the main issue here is that Zip format is not splittable. So
>> if it’s one huge zip file, you still won’t be able to parallelise the read.
>> So, if you could unzip such file in advance and read the path with these
>> files from zip in your pipeline, then it should be automatically read in
>> parallel. Otherwise, one zip file (despite of the size) can be processed
>> only by one single worker (instance of DoFn).
>>
>> On 21 Jan 2021, at 12:44, Tucker Barbour <tu...@gmail.com>
>> wrote:
>>
>> There isn't a problem with the output being a flattened PCollection. I
>> think that's actually what I'm intending to accomplish. A naive
>> implementation would have a DoFn take a zip file as input (probably as
>> FileIO.ReadableFile), recursively extract elements from the zip, i.e.
>> extract an email from the zip and then immediately extract the attachments
>> from an email and so on. For each item where an item is either an email or
>> an attachment we output each item as an individual record to the output
>> PCollection. This would create a flattened PCollection for all items in the
>> file hierarchy. However, this becomes problematic when the zip file is
>> large, several million files. To avoid doing all the work in a single DoFn,
>> I've implemented this iterative solution with a simple do/while:
>>
>> fun main(args: Array<String>) {
>>   do {
>>     val (pipeline, options) = createPipelineAndOptions(args)
>>
>>     val data = pipeline.apply(readFilesToExtract(options))
>>     val exclusionSet = pipeline.apply(readExclusionSet(options))
>>
>>     // Could use side input or group by key
>>     val inclusionSet = filter(data, exclusionSet)
>>
>>     val extractedItems =
>> inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))
>>
>>     extractedItems.apply(WriteExtractedItemsToDatabase())
>>   } while (hasExtractedItems(pipeline))
>> }
>>
>> Each call to ExtractEmbeddedItems only extracts a single level in the
>> file hierarchy. For example, the first iteration would extract all emails
>> in a zip but _not_ extract attachments. The second iteration would extract
>> all the attachments from the emails found in iteration 1. And so on. This
>> _works_ but isn't ideal. Especially figuring out whether to continue with
>> the do/while loop or terminate. It is better than extracting all
>> elements in a single DoFn since we gain parallelism in subsequent
>> iterations.
>>
>> After reading more about SplitableDoFn, it seems like a SplitableDoFn
>> might be a better option for solving this problem but I'm still trying to
>> figure that out. A DoFn can create element/restriction pairs where the
>> element is a FileIO.ReadableFile and the restriction is a set of items in
>> the Zip. I'm not sure using the OffsetRange works here because of how the
>> Zip format works so may need to explore writing a custom restriction and
>> tracker.
>>
>> On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Thank you for details! I’ll try to share some of my thoughts on this.
>>>
>>> Well, maybe I don’t understand something but what is a problem to have
>>> a "flattened” PCollection as an output of your ParDo that reads input zip
>>> file(s)? For example, if input element in a file is an email, then your
>>> DoFn can create several outputs depending on the structure of this email.
>>> Of course, the order of elements in output PCollection won’t be guaranteed
>>> but all needed later information can be saved in the structure of output
>>> record (like POJO or AvroRecord).
>>>
>>> Also, if I understand correctly, then on every precessing step you need
>>> to reject some records depending on SHA from already known rejection list.
>>> So, if it’s possible to calculate this SHA on the “Read” step for every
>>> record, then you can use either SideInput or GroupByKey transform (where
>>> key is SHA) to filter the records.
>>>
>>> Please, let me know if I missed something.
>>>
>>> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com>
>>> wrote:
>>>
>>> My initial thought is the latter -- the output PCollection would be a
>>> PCollection<Record> where Record can be either an email or attachment. A
>>> Record would still need to have an attribute referencing its "parent". For
>>> example an email Record would have a unique identifier, e.g. ID, and any
>>> attachment Record would have a reference to it's parent email, e.g.
>>> parentID. However, outputting pairs might also work and may be a better
>>> option considering the need to maintain the relationship between a parent
>>> and child. We're basically building a tree. An additional wrinkle is that
>>> attachments may themselves have embedded items which would also need to be
>>> represented in the output PCollection as Records. For example, an email
>>> with an attachment which itself is a zip of Word documents. The structure
>>> of this file hierarchy is not known ahead of time.
>>>
>>> The input is expected to be a PCollection of one or more (though usually
>>> in the order of 10s not anything like millions) zip files or other archive
>>> file formats. The output is expected to be a PCollection whose elements are
>>> nodes in the file hierarchy. If a zip file where to have the following
>>> structure
>>>
>>> - Top Level Zip File
>>> `-> Email 001
>>>   `-> Attachment A
>>>     `-> Embedded Document A01
>>>     `-> Embedded Document A02
>>>  `-> Attachment B
>>>    `-> Embedded Document B01
>>> `-> Email 002
>>>   `-> Attachment C
>>>     `-> Embedded Document C01
>>>
>>> We'd expect an output PCollection whose elements are:
>>>
>>> Email001, Attachment A, Embedded Document A01, Embedded Document A02,
>>> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
>>> Document C01.
>>>
>>> We'd then perform further PTransforms on this "flattened" PCollection.
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> What is exactly an output PCollection in your example? Is it just a
>>>> PCollection of pairs (email and attachment) or it’s like
>>>> PCollection<Record>, where Record can be either email or attachment? Or it
>>>> is something else?
>>>>
>>>> Could you add a simple example with expected input/output of your
>>>> pipeline?
>>>>
>>>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
>>>> wrote:
>>>> >
>>>> > I have a use-case where I'm extracting embedded items from archive
>>>> file formats which themselves have embedded items. For example a zip file
>>>> with emails with attachments. The goal in this example would be to create a
>>>> PCollection where each email is an element as well as each attachment being
>>>> an element. (No need to create a tree structure here.) There are certain
>>>> criteria which would prevent continuing embedded item extraction, such as
>>>> an item SHA being present in a "rejection" list. The pipeline will perform
>>>> a series of transformations on the items and then continue to extract
>>>> embedded items. This type of problem lends itself to be solved with an
>>>> iterative algorithm. My understanding is that BEAM does not support
>>>> iterative algorithms to the same extent Spark does. In BEAM I would have to
>>>> persist the results of each iteration and instantiate a new pipeline for
>>>> each iteration. This _works_ though isn't ideal. The "rejection" list is a
>>>> PCollection of a few million elements. Re-reading this "rejection" list on
>>>> each iteration isn't ideal.
>>>> >
>>>> > Is there a way to write such an iterative algorithm in BEAM without
>>>> having to create a new pipeline on each iteration? Further, would something
>>>> like SplitableDoFn be a potential solution? Looking through the user's
>>>> guide and some existing implementations of SplitableDoFns I'm thinking not
>>>> but I'm still trying to understand SplitableDoFns.
>>>>
>>>>
>>>
>>

Re: Iterative algorithm in BEAM

Posted by Tucker Barbour <tu...@gmail.com>.
Getting away from BEAM a bit, but there _could_ be a way to enable parallel
reads of a single zip by first reading just the central directory and
generating elements in an output PCollection containing a triple of byte
offset of an entry, the compressed size of the entry, and the type of
compression used for the entry. This triple (offset, compressedSize,
compressionType) could enable parallel reads of a single zip file. However,
I'm not sure this would work with SplittableDoFn -- haven't figured that
out yet.

In terms of the do/while loop, that seems to be the only way to do this
type of looping with BEAM. Otherwise, I'd have to consider a different
algorithm to avoid a loop all together.

On Thu, Jan 21, 2021 at 5:07 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> I guess that the main issue here is that Zip format is not splittable. So
> if it’s one huge zip file, you still won’t be able to parallelise the read.
> So, if you could unzip such file in advance and read the path with these
> files from zip in your pipeline, then it should be automatically read in
> parallel. Otherwise, one zip file (despite of the size) can be processed
> only by one single worker (instance of DoFn).
>
> On 21 Jan 2021, at 12:44, Tucker Barbour <tu...@gmail.com> wrote:
>
> There isn't a problem with the output being a flattened PCollection. I
> think that's actually what I'm intending to accomplish. A naive
> implementation would have a DoFn take a zip file as input (probably as
> FileIO.ReadableFile), recursively extract elements from the zip, i.e.
> extract an email from the zip and then immediately extract the attachments
> from an email and so on. For each item where an item is either an email or
> an attachment we output each item as an individual record to the output
> PCollection. This would create a flattened PCollection for all items in the
> file hierarchy. However, this becomes problematic when the zip file is
> large, several million files. To avoid doing all the work in a single DoFn,
> I've implemented this iterative solution with a simple do/while:
>
> fun main(args: Array<String>) {
>   do {
>     val (pipeline, options) = createPipelineAndOptions(args)
>
>     val data = pipeline.apply(readFilesToExtract(options))
>     val exclusionSet = pipeline.apply(readExclusionSet(options))
>
>     // Could use side input or group by key
>     val inclusionSet = filter(data, exclusionSet)
>
>     val extractedItems =
> inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))
>
>     extractedItems.apply(WriteExtractedItemsToDatabase())
>   } while (hasExtractedItems(pipeline))
> }
>
> Each call to ExtractEmbeddedItems only extracts a single level in the file
> hierarchy. For example, the first iteration would extract all emails in a
> zip but _not_ extract attachments. The second iteration would extract all
> the attachments from the emails found in iteration 1. And so on. This
> _works_ but isn't ideal. Especially figuring out whether to continue with
> the do/while loop or terminate. It is better than extracting all elements
> in a single DoFn since we gain parallelism in subsequent iterations.
>
> After reading more about SplitableDoFn, it seems like a SplitableDoFn
> might be a better option for solving this problem but I'm still trying to
> figure that out. A DoFn can create element/restriction pairs where the
> element is a FileIO.ReadableFile and the restriction is a set of items in
> the Zip. I'm not sure using the OffsetRange works here because of how the
> Zip format works so may need to explore writing a custom restriction and
> tracker.
>
> On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Thank you for details! I’ll try to share some of my thoughts on this.
>>
>> Well, maybe I don’t understand something but what is a problem to have
>> a "flattened” PCollection as an output of your ParDo that reads input zip
>> file(s)? For example, if input element in a file is an email, then your
>> DoFn can create several outputs depending on the structure of this email.
>> Of course, the order of elements in output PCollection won’t be guaranteed
>> but all needed later information can be saved in the structure of output
>> record (like POJO or AvroRecord).
>>
>> Also, if I understand correctly, then on every precessing step you need
>> to reject some records depending on SHA from already known rejection list.
>> So, if it’s possible to calculate this SHA on the “Read” step for every
>> record, then you can use either SideInput or GroupByKey transform (where
>> key is SHA) to filter the records.
>>
>> Please, let me know if I missed something.
>>
>> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com>
>> wrote:
>>
>> My initial thought is the latter -- the output PCollection would be a
>> PCollection<Record> where Record can be either an email or attachment. A
>> Record would still need to have an attribute referencing its "parent". For
>> example an email Record would have a unique identifier, e.g. ID, and any
>> attachment Record would have a reference to it's parent email, e.g.
>> parentID. However, outputting pairs might also work and may be a better
>> option considering the need to maintain the relationship between a parent
>> and child. We're basically building a tree. An additional wrinkle is that
>> attachments may themselves have embedded items which would also need to be
>> represented in the output PCollection as Records. For example, an email
>> with an attachment which itself is a zip of Word documents. The structure
>> of this file hierarchy is not known ahead of time.
>>
>> The input is expected to be a PCollection of one or more (though usually
>> in the order of 10s not anything like millions) zip files or other archive
>> file formats. The output is expected to be a PCollection whose elements are
>> nodes in the file hierarchy. If a zip file where to have the following
>> structure
>>
>> - Top Level Zip File
>> `-> Email 001
>>   `-> Attachment A
>>     `-> Embedded Document A01
>>     `-> Embedded Document A02
>>  `-> Attachment B
>>    `-> Embedded Document B01
>> `-> Email 002
>>   `-> Attachment C
>>     `-> Embedded Document C01
>>
>> We'd expect an output PCollection whose elements are:
>>
>> Email001, Attachment A, Embedded Document A01, Embedded Document A02,
>> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
>> Document C01.
>>
>> We'd then perform further PTransforms on this "flattened" PCollection.
>>
>>
>>
>>
>>
>> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> What is exactly an output PCollection in your example? Is it just a
>>> PCollection of pairs (email and attachment) or it’s like
>>> PCollection<Record>, where Record can be either email or attachment? Or it
>>> is something else?
>>>
>>> Could you add a simple example with expected input/output of your
>>> pipeline?
>>>
>>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
>>> wrote:
>>> >
>>> > I have a use-case where I'm extracting embedded items from archive
>>> file formats which themselves have embedded items. For example a zip file
>>> with emails with attachments. The goal in this example would be to create a
>>> PCollection where each email is an element as well as each attachment being
>>> an element. (No need to create a tree structure here.) There are certain
>>> criteria which would prevent continuing embedded item extraction, such as
>>> an item SHA being present in a "rejection" list. The pipeline will perform
>>> a series of transformations on the items and then continue to extract
>>> embedded items. This type of problem lends itself to be solved with an
>>> iterative algorithm. My understanding is that BEAM does not support
>>> iterative algorithms to the same extent Spark does. In BEAM I would have to
>>> persist the results of each iteration and instantiate a new pipeline for
>>> each iteration. This _works_ though isn't ideal. The "rejection" list is a
>>> PCollection of a few million elements. Re-reading this "rejection" list on
>>> each iteration isn't ideal.
>>> >
>>> > Is there a way to write such an iterative algorithm in BEAM without
>>> having to create a new pipeline on each iteration? Further, would something
>>> like SplitableDoFn be a potential solution? Looking through the user's
>>> guide and some existing implementations of SplitableDoFns I'm thinking not
>>> but I'm still trying to understand SplitableDoFns.
>>>
>>>
>>
>

Re: Iterative algorithm in BEAM

Posted by Alexey Romanenko <ar...@gmail.com>.
I guess that the main issue here is that Zip format is not splittable. So if it’s one huge zip file, you still won’t be able to parallelise the read. So, if you could unzip such file in advance and read the path with these files from zip in your pipeline, then it should be automatically read in parallel. Otherwise, one zip file (despite of the size) can be processed only by one single worker (instance of DoFn).

> On 21 Jan 2021, at 12:44, Tucker Barbour <tu...@gmail.com> wrote:
> 
> There isn't a problem with the output being a flattened PCollection. I think that's actually what I'm intending to accomplish. A naive implementation would have a DoFn take a zip file as input (probably as FileIO.ReadableFile), recursively extract elements from the zip, i.e. extract an email from the zip and then immediately extract the attachments from an email and so on. For each item where an item is either an email or an attachment we output each item as an individual record to the output PCollection. This would create a flattened PCollection for all items in the file hierarchy. However, this becomes problematic when the zip file is large, several million files. To avoid doing all the work in a single DoFn, I've implemented this iterative solution with a simple do/while:
> 
> fun main(args: Array<String>) {
>   do {
>     val (pipeline, options) = createPipelineAndOptions(args)
> 
>     val data = pipeline.apply(readFilesToExtract(options))
>     val exclusionSet = pipeline.apply(readExclusionSet(options))
> 
>     // Could use side input or group by key
>     val inclusionSet = filter(data, exclusionSet)
> 
>     val extractedItems = inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))
>     
>     extractedItems.apply(WriteExtractedItemsToDatabase())
>   } while (hasExtractedItems(pipeline))
> }
> 
> Each call to ExtractEmbeddedItems only extracts a single level in the file hierarchy. For example, the first iteration would extract all emails in a zip but _not_ extract attachments. The second iteration would extract all the attachments from the emails found in iteration 1. And so on. This _works_ but isn't ideal. Especially figuring out whether to continue with the do/while loop or terminate. It is better than extracting all elements in a single DoFn since we gain parallelism in subsequent iterations.
> 
> After reading more about SplitableDoFn, it seems like a SplitableDoFn might be a better option for solving this problem but I'm still trying to figure that out. A DoFn can create element/restriction pairs where the element is a FileIO.ReadableFile and the restriction is a set of items in the Zip. I'm not sure using the OffsetRange works here because of how the Zip format works so may need to explore writing a custom restriction and tracker. 
> 
> On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Thank you for details! I’ll try to share some of my thoughts on this.
> 
> Well, maybe I don’t understand something but what is a problem to have a "flattened” PCollection as an output of your ParDo that reads input zip file(s)? For example, if input element in a file is an email, then your DoFn can create several outputs depending on the structure of this email. Of course, the order of elements in output PCollection won’t be guaranteed but all needed later information can be saved in the structure of output record (like POJO or AvroRecord).
> 
> Also, if I understand correctly, then on every precessing step you need to reject some records depending on SHA from already known rejection list. So, if it’s possible to calculate this SHA on the “Read” step for every record, then you can use either SideInput or GroupByKey transform (where key is SHA) to filter the records.
> 
> Please, let me know if I missed something.
> 
>> On 19 Jan 2021, at 18:43, Tucker Barbour <tucker.barbour@gmail.com <ma...@gmail.com>> wrote:
>> 
>> My initial thought is the latter -- the output PCollection would be a PCollection<Record> where Record can be either an email or attachment. A Record would still need to have an attribute referencing its "parent". For example an email Record would have a unique identifier, e.g. ID, and any attachment Record would have a reference to it's parent email, e.g. parentID. However, outputting pairs might also work and may be a better option considering the need to maintain the relationship between a parent and child. We're basically building a tree. An additional wrinkle is that attachments may themselves have embedded items which would also need to be represented in the output PCollection as Records. For example, an email with an attachment which itself is a zip of Word documents. The structure of this file hierarchy is not known ahead of time.
>> 
>> The input is expected to be a PCollection of one or more (though usually in the order of 10s not anything like millions) zip files or other archive file formats. The output is expected to be a PCollection whose elements are nodes in the file hierarchy. If a zip file where to have the following structure
>> 
>> - Top Level Zip File
>> `-> Email 001
>>   `-> Attachment A
>>     `-> Embedded Document A01
>>     `-> Embedded Document A02
>>  `-> Attachment B
>>    `-> Embedded Document B01
>> `-> Email 002
>>   `-> Attachment C
>>     `-> Embedded Document C01
>> 
>> We'd expect an output PCollection whose elements are:
>> 
>> Email001, Attachment A, Embedded Document A01, Embedded Document A02, Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded Document C01. 
>> 
>> We'd then perform further PTransforms on this "flattened" PCollection. 
>> 
>> 
>> 
>> 
>> 
>> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> What is exactly an output PCollection in your example? Is it just a PCollection of pairs (email and attachment) or it’s like PCollection<Record>, where Record can be either email or attachment? Or it is something else?
>> 
>> Could you add a simple example with expected input/output of your pipeline?
>> 
>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tucker.barbour@gmail.com <ma...@gmail.com>> wrote:
>> > 
>> > I have a use-case where I'm extracting embedded items from archive file formats which themselves have embedded items. For example a zip file with emails with attachments. The goal in this example would be to create a PCollection where each email is an element as well as each attachment being an element. (No need to create a tree structure here.) There are certain criteria which would prevent continuing embedded item extraction, such as an item SHA being present in a "rejection" list. The pipeline will perform a series of transformations on the items and then continue to extract embedded items. This type of problem lends itself to be solved with an iterative algorithm. My understanding is that BEAM does not support iterative algorithms to the same extent Spark does. In BEAM I would have to persist the results of each iteration and instantiate a new pipeline for each iteration. This _works_ though isn't ideal. The "rejection" list is a PCollection of a few million elements. Re-reading this "rejection" list on each iteration isn't ideal.
>> > 
>> > Is there a way to write such an iterative algorithm in BEAM without having to create a new pipeline on each iteration? Further, would something like SplitableDoFn be a potential solution? Looking through the user's guide and some existing implementations of SplitableDoFns I'm thinking not but I'm still trying to understand SplitableDoFns.
>> 
> 


Re: Iterative algorithm in BEAM

Posted by Tucker Barbour <tu...@gmail.com>.
There isn't a problem with the output being a flattened PCollection. I
think that's actually what I'm intending to accomplish. A naive
implementation would have a DoFn take a zip file as input (probably as
FileIO.ReadableFile), recursively extract elements from the zip, i.e.
extract an email from the zip and then immediately extract the attachments
from an email and so on. For each item where an item is either an email or
an attachment we output each item as an individual record to the output
PCollection. This would create a flattened PCollection for all items in the
file hierarchy. However, this becomes problematic when the zip file is
large, several million files. To avoid doing all the work in a single DoFn,
I've implemented this iterative solution with a simple do/while:

fun main(args: Array<String>) {
  do {
    val (pipeline, options) = createPipelineAndOptions(args)

    val data = pipeline.apply(readFilesToExtract(options))
    val exclusionSet = pipeline.apply(readExclusionSet(options))

    // Could use side input or group by key
    val inclusionSet = filter(data, exclusionSet)

    val extractedItems =
inclusionSet.apply(ParDo.of(ExtractEmbeddedItems()))

    extractedItems.apply(WriteExtractedItemsToDatabase())
  } while (hasExtractedItems(pipeline))
}

Each call to ExtractEmbeddedItems only extracts a single level in the file
hierarchy. For example, the first iteration would extract all emails in a
zip but _not_ extract attachments. The second iteration would extract all
the attachments from the emails found in iteration 1. And so on. This
_works_ but isn't ideal. Especially figuring out whether to continue with
the do/while loop or terminate. It is better than extracting all elements
in a single DoFn since we gain parallelism in subsequent iterations.

After reading more about SplitableDoFn, it seems like a SplitableDoFn might
be a better option for solving this problem but I'm still trying to figure
that out. A DoFn can create element/restriction pairs where the element is
a FileIO.ReadableFile and the restriction is a set of items in the Zip. I'm
not sure using the OffsetRange works here because of how the Zip format
works so may need to explore writing a custom restriction and tracker.

On Wed, Jan 20, 2021 at 6:22 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> Thank you for details! I’ll try to share some of my thoughts on this.
>
> Well, maybe I don’t understand something but what is a problem to have
> a "flattened” PCollection as an output of your ParDo that reads input zip
> file(s)? For example, if input element in a file is an email, then your
> DoFn can create several outputs depending on the structure of this email.
> Of course, the order of elements in output PCollection won’t be guaranteed
> but all needed later information can be saved in the structure of output
> record (like POJO or AvroRecord).
>
> Also, if I understand correctly, then on every precessing step you need to
> reject some records depending on SHA from already known rejection list. So,
> if it’s possible to calculate this SHA on the “Read” step for every record,
> then you can use either SideInput or GroupByKey transform (where key is
> SHA) to filter the records.
>
> Please, let me know if I missed something.
>
> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com> wrote:
>
> My initial thought is the latter -- the output PCollection would be a
> PCollection<Record> where Record can be either an email or attachment. A
> Record would still need to have an attribute referencing its "parent". For
> example an email Record would have a unique identifier, e.g. ID, and any
> attachment Record would have a reference to it's parent email, e.g.
> parentID. However, outputting pairs might also work and may be a better
> option considering the need to maintain the relationship between a parent
> and child. We're basically building a tree. An additional wrinkle is that
> attachments may themselves have embedded items which would also need to be
> represented in the output PCollection as Records. For example, an email
> with an attachment which itself is a zip of Word documents. The structure
> of this file hierarchy is not known ahead of time.
>
> The input is expected to be a PCollection of one or more (though usually
> in the order of 10s not anything like millions) zip files or other archive
> file formats. The output is expected to be a PCollection whose elements are
> nodes in the file hierarchy. If a zip file where to have the following
> structure
>
> - Top Level Zip File
> `-> Email 001
>   `-> Attachment A
>     `-> Embedded Document A01
>     `-> Embedded Document A02
>  `-> Attachment B
>    `-> Embedded Document B01
> `-> Email 002
>   `-> Attachment C
>     `-> Embedded Document C01
>
> We'd expect an output PCollection whose elements are:
>
> Email001, Attachment A, Embedded Document A01, Embedded Document A02,
> Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
> Document C01.
>
> We'd then perform further PTransforms on this "flattened" PCollection.
>
>
>
>
>
> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> What is exactly an output PCollection in your example? Is it just a
>> PCollection of pairs (email and attachment) or it’s like
>> PCollection<Record>, where Record can be either email or attachment? Or it
>> is something else?
>>
>> Could you add a simple example with expected input/output of your
>> pipeline?
>>
>> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
>> wrote:
>> >
>> > I have a use-case where I'm extracting embedded items from archive file
>> formats which themselves have embedded items. For example a zip file with
>> emails with attachments. The goal in this example would be to create a
>> PCollection where each email is an element as well as each attachment being
>> an element. (No need to create a tree structure here.) There are certain
>> criteria which would prevent continuing embedded item extraction, such as
>> an item SHA being present in a "rejection" list. The pipeline will perform
>> a series of transformations on the items and then continue to extract
>> embedded items. This type of problem lends itself to be solved with an
>> iterative algorithm. My understanding is that BEAM does not support
>> iterative algorithms to the same extent Spark does. In BEAM I would have to
>> persist the results of each iteration and instantiate a new pipeline for
>> each iteration. This _works_ though isn't ideal. The "rejection" list is a
>> PCollection of a few million elements. Re-reading this "rejection" list on
>> each iteration isn't ideal.
>> >
>> > Is there a way to write such an iterative algorithm in BEAM without
>> having to create a new pipeline on each iteration? Further, would something
>> like SplitableDoFn be a potential solution? Looking through the user's
>> guide and some existing implementations of SplitableDoFns I'm thinking not
>> but I'm still trying to understand SplitableDoFns.
>>
>>
>

Re: Iterative algorithm in BEAM

Posted by Alexey Romanenko <ar...@gmail.com>.
Thank you for details! I’ll try to share some of my thoughts on this.

Well, maybe I don’t understand something but what is a problem to have a "flattened” PCollection as an output of your ParDo that reads input zip file(s)? For example, if input element in a file is an email, then your DoFn can create several outputs depending on the structure of this email. Of course, the order of elements in output PCollection won’t be guaranteed but all needed later information can be saved in the structure of output record (like POJO or AvroRecord).

Also, if I understand correctly, then on every precessing step you need to reject some records depending on SHA from already known rejection list. So, if it’s possible to calculate this SHA on the “Read” step for every record, then you can use either SideInput or GroupByKey transform (where key is SHA) to filter the records.

Please, let me know if I missed something.

> On 19 Jan 2021, at 18:43, Tucker Barbour <tu...@gmail.com> wrote:
> 
> My initial thought is the latter -- the output PCollection would be a PCollection<Record> where Record can be either an email or attachment. A Record would still need to have an attribute referencing its "parent". For example an email Record would have a unique identifier, e.g. ID, and any attachment Record would have a reference to it's parent email, e.g. parentID. However, outputting pairs might also work and may be a better option considering the need to maintain the relationship between a parent and child. We're basically building a tree. An additional wrinkle is that attachments may themselves have embedded items which would also need to be represented in the output PCollection as Records. For example, an email with an attachment which itself is a zip of Word documents. The structure of this file hierarchy is not known ahead of time.
> 
> The input is expected to be a PCollection of one or more (though usually in the order of 10s not anything like millions) zip files or other archive file formats. The output is expected to be a PCollection whose elements are nodes in the file hierarchy. If a zip file where to have the following structure
> 
> - Top Level Zip File
> `-> Email 001
>   `-> Attachment A
>     `-> Embedded Document A01
>     `-> Embedded Document A02
>  `-> Attachment B
>    `-> Embedded Document B01
> `-> Email 002
>   `-> Attachment C
>     `-> Embedded Document C01
> 
> We'd expect an output PCollection whose elements are:
> 
> Email001, Attachment A, Embedded Document A01, Embedded Document A02, Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded Document C01. 
> 
> We'd then perform further PTransforms on this "flattened" PCollection. 
> 
> 
> 
> 
> 
> On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> What is exactly an output PCollection in your example? Is it just a PCollection of pairs (email and attachment) or it’s like PCollection<Record>, where Record can be either email or attachment? Or it is something else?
> 
> Could you add a simple example with expected input/output of your pipeline?
> 
> > On 18 Jan 2021, at 12:26, Tucker Barbour <tucker.barbour@gmail.com <ma...@gmail.com>> wrote:
> > 
> > I have a use-case where I'm extracting embedded items from archive file formats which themselves have embedded items. For example a zip file with emails with attachments. The goal in this example would be to create a PCollection where each email is an element as well as each attachment being an element. (No need to create a tree structure here.) There are certain criteria which would prevent continuing embedded item extraction, such as an item SHA being present in a "rejection" list. The pipeline will perform a series of transformations on the items and then continue to extract embedded items. This type of problem lends itself to be solved with an iterative algorithm. My understanding is that BEAM does not support iterative algorithms to the same extent Spark does. In BEAM I would have to persist the results of each iteration and instantiate a new pipeline for each iteration. This _works_ though isn't ideal. The "rejection" list is a PCollection of a few million elements. Re-reading this "rejection" list on each iteration isn't ideal.
> > 
> > Is there a way to write such an iterative algorithm in BEAM without having to create a new pipeline on each iteration? Further, would something like SplitableDoFn be a potential solution? Looking through the user's guide and some existing implementations of SplitableDoFns I'm thinking not but I'm still trying to understand SplitableDoFns.
> 


Re: Iterative algorithm in BEAM

Posted by Tucker Barbour <tu...@gmail.com>.
My initial thought is the latter -- the output PCollection would be a
PCollection<Record> where Record can be either an email or attachment. A
Record would still need to have an attribute referencing its "parent". For
example an email Record would have a unique identifier, e.g. ID, and any
attachment Record would have a reference to it's parent email, e.g.
parentID. However, outputting pairs might also work and may be a better
option considering the need to maintain the relationship between a parent
and child. We're basically building a tree. An additional wrinkle is that
attachments may themselves have embedded items which would also need to be
represented in the output PCollection as Records. For example, an email
with an attachment which itself is a zip of Word documents. The structure
of this file hierarchy is not known ahead of time.

The input is expected to be a PCollection of one or more (though usually in
the order of 10s not anything like millions) zip files or other archive
file formats. The output is expected to be a PCollection whose elements are
nodes in the file hierarchy. If a zip file where to have the following
structure

- Top Level Zip File
`-> Email 001
  `-> Attachment A
    `-> Embedded Document A01
    `-> Embedded Document A02
 `-> Attachment B
   `-> Embedded Document B01
`-> Email 002
  `-> Attachment C
    `-> Embedded Document C01

We'd expect an output PCollection whose elements are:

Email001, Attachment A, Embedded Document A01, Embedded Document A02,
Attachment B, Embedded Document B01, Email 002, Attachment C, Embedded
Document C01.

We'd then perform further PTransforms on this "flattened" PCollection.





On Tue, Jan 19, 2021 at 5:14 PM Alexey Romanenko <ar...@gmail.com>
wrote:

> What is exactly an output PCollection in your example? Is it just a
> PCollection of pairs (email and attachment) or it’s like
> PCollection<Record>, where Record can be either email or attachment? Or it
> is something else?
>
> Could you add a simple example with expected input/output of your pipeline?
>
> > On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com>
> wrote:
> >
> > I have a use-case where I'm extracting embedded items from archive file
> formats which themselves have embedded items. For example a zip file with
> emails with attachments. The goal in this example would be to create a
> PCollection where each email is an element as well as each attachment being
> an element. (No need to create a tree structure here.) There are certain
> criteria which would prevent continuing embedded item extraction, such as
> an item SHA being present in a "rejection" list. The pipeline will perform
> a series of transformations on the items and then continue to extract
> embedded items. This type of problem lends itself to be solved with an
> iterative algorithm. My understanding is that BEAM does not support
> iterative algorithms to the same extent Spark does. In BEAM I would have to
> persist the results of each iteration and instantiate a new pipeline for
> each iteration. This _works_ though isn't ideal. The "rejection" list is a
> PCollection of a few million elements. Re-reading this "rejection" list on
> each iteration isn't ideal.
> >
> > Is there a way to write such an iterative algorithm in BEAM without
> having to create a new pipeline on each iteration? Further, would something
> like SplitableDoFn be a potential solution? Looking through the user's
> guide and some existing implementations of SplitableDoFns I'm thinking not
> but I'm still trying to understand SplitableDoFns.
>
>

Re: Iterative algorithm in BEAM

Posted by Alexey Romanenko <ar...@gmail.com>.
What is exactly an output PCollection in your example? Is it just a PCollection of pairs (email and attachment) or it’s like PCollection<Record>, where Record can be either email or attachment? Or it is something else?

Could you add a simple example with expected input/output of your pipeline?

> On 18 Jan 2021, at 12:26, Tucker Barbour <tu...@gmail.com> wrote:
> 
> I have a use-case where I'm extracting embedded items from archive file formats which themselves have embedded items. For example a zip file with emails with attachments. The goal in this example would be to create a PCollection where each email is an element as well as each attachment being an element. (No need to create a tree structure here.) There are certain criteria which would prevent continuing embedded item extraction, such as an item SHA being present in a "rejection" list. The pipeline will perform a series of transformations on the items and then continue to extract embedded items. This type of problem lends itself to be solved with an iterative algorithm. My understanding is that BEAM does not support iterative algorithms to the same extent Spark does. In BEAM I would have to persist the results of each iteration and instantiate a new pipeline for each iteration. This _works_ though isn't ideal. The "rejection" list is a PCollection of a few million elements. Re-reading this "rejection" list on each iteration isn't ideal.
> 
> Is there a way to write such an iterative algorithm in BEAM without having to create a new pipeline on each iteration? Further, would something like SplitableDoFn be a potential solution? Looking through the user's guide and some existing implementations of SplitableDoFns I'm thinking not but I'm still trying to understand SplitableDoFns.