You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ray Ruvinskiy <ra...@arcticwolf.com> on 2016/12/21 17:05:11 UTC

One-to-many mapping between unbounded input source and pipelines with session windows

Hello,

I am trying to figure out if Apache Beam is the right framework for my use case. I have an unbounded stream, and there are a number of questions I would like to ask regarding the records in the stream:

- For example, one question may be trying to find a record with attribute A followed within no more than a minute by a record with attribute B followed within no more than 5 minutes by a record with attribute C.
- Another question may be trying to find a sequence of at least N records with attribute X within 5 hours of each other, followed by an record with attribute Y no more than an hour later.
- A third question would be seeing if there exist a record with attribute K *not* followed by a record with attribute L in the next 10 minutes.

Every time I encounter the pattern of records I’m looking for, I would like to perform an action. If I understand the Beam model correctly, each question would correspond to a separate pipeline I would create, and it also sounds like I’m looking for session windows. However, I’m assuming I would need to tee the input source to all the separate pipelines? I have tried to look for documentation and/or examples on whether Apache Beam can be used to express such a setup and how to do it if so, but I haven’t been able to find anything concrete. Any help would be appreciated.

Thanks!

Ray



Re: One-to-many mapping between unbounded input source and pipelines with session windows

Posted by Lukasz Cwik <lc...@google.com>.
Yes, the second suggestion is more appropriate if either type A or B or is
common. The first suggestion is only good if type A and type B are rare
allowing for the session windows to close regularly.


On Thu, Dec 22, 2016 at 9:17 AM, Ray Ruvinskiy <ray.ruvinskiy@arcticwolf.com
> wrote:

> Interesting. Thanks, Lukasz!
>
> Suppose type A is common but type B is rate. Would your second suggestion
> be more appropriate in this case?
>
> Ray
>
> From: Lukasz Cwik <lc...@google.com>
> Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
> Date: Wednesday, December 21, 2016 at 8:14 PM
> To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> Globally will not perform as well because you reduce the inherent level of
> parallelism.
>
> Lets tackle this one problem and you can see if you can apply the same
> principles to the other problems:
> - Any time we see a record (call this type A) with record[“id”] == 1 &&
> record[“field_6”] == “some_value” *not* followed by a record (call this
> type B) with record[“id”] == 2 && record[“field_7”] == “other_value” in the
> subsequent 10 minutes.
>
> One idea:
> If type A and type B records are rare. You can use session windows with a
> gap duration of 10 minutes. Whenever you see a record of type A or type B,
> you convert them to a KV<common key, record data>. You pass this through a
> GBK which will produce a KV<common key, iterable<record data>> that is
> guaranteed to have all the records which of type A and type B that are
> within 10 minutes of each other. Then you scan through the iterable and
> output all records of type A that are not followed by a record of type B
> within 10 minutes. The reasoning why they need to be rare is that you don't
> want the session to continue forever.
>
> Another idea:
> Convert all type A and type B records to be a KV<common key, record> using
> a sliding window of size 20 mins being output every 10 mins. pass these
> through GBK, and similarly as above scan through the iterale output all
> records of type A that are not followed by a record of type B within 10
> minutes.
>
>
>
>
> On Wed, Dec 21, 2016 at 2:56 PM, Ray Ruvinskiy <
> ray.ruvinskiy@arcticwolf.com> wrote:
> The records have a property value in common, yes. For example,
> record[“record_kind”] == “foo” or record[“record_kind”] == “bar.” However,
> I’d be curious if the answer changes if I wanted to do this globally for
> the whole stream.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <lc...@google.com>
> Reply-To: "user@beam.incubator.apache.org" <user@beam.incubator.apache.org
> >
> Date: Wednesday, December 21, 2016 at 5:52 PM
> To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> My first question was about how do you know two or more records are
> related or is this global for the entire stream?
>
> The reason I was asking about whether you can map the qualifiers onto a
> fixed set of states is because I was wondering if there was a way to either
> use the State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and
> timers API (WIP https://issues.apache.org/jira/browse/BEAM-27) and just
> transition between a fixed number of states or create composite session
> keys based upon the "id" plus some small set of qualifiers and do a GBK to
> do a join.
>
> In this example, how do you know the two records are related to each other
> (do the share a common attribute or can a common attribute be computed)?
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
>
>
> On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <
> ray.ruvinskiy@arcticwolf.com> wrote:
> I’m unsure about your first question. Are you asking whether there’s an
> attribute that all the records have in common?
>
> I think I’m looking for more flexibility than a fixed set of values but
> perhaps I’m overlooking something. To flesh out the example, let’s say the
> records are JSON documents, with fields. So, to express my examples again,
> I want to know:
> - Any time we see record_1[“type”] == “type1” && record_1[“field1”] ==
> “value1”, followed within no more than a minute by record_2[“type”] ==
> “type1” && record_2[“field2”].contains(“some_substring”), followed within
> no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”]
> == “value3”
> - Any time we see N records where record[“id”] == 123 within 5 hours of
> each other, followed by another record with record[“id”] == 456 no more
> than an hour later than the group of N records
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
> If data is late, *ideally* it’s taken into account, but we don’t need to
> account for data being late for an arbitrary amount of time. We can say
> that if a data is, for instance, less than an hour later it should be taken
> into account, but if it’s more than an hour late we can ignore it.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <lc...@google.com>
> Reply-To: "user@beam.incubator.apache.org" <user@beam.incubator.apache.org
> >
> Date: Wednesday, December 21, 2016 at 4:47 PM
> To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> Do the records have another attribute Z which joins them all together?
> Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of
> values like enums or can be mapped onto a certain number of states (like an
> attribute A > 50 can be mapped onto a state "exceeds threshold")?
> For your examples, what should occur when there is late data in your three
> scenarios?
>
>
> On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <
> ray.ruvinskiy@arcticwolf.com> wrote:
> Hello,
>
> I am trying to figure out if Apache Beam is the right framework for my use
> case. I have an unbounded stream, and there are a number of questions I
> would like to ask regarding the records in the stream:
>
> - For example, one question may be trying to find a record with attribute
> A followed within no more than a minute by a record with attribute B
> followed within no more than 5 minutes by a record with attribute C.
> - Another question may be trying to find a sequence of at least N records
> with attribute X within 5 hours of each other, followed by an record with
> attribute Y no more than an hour later.
> - A third question would be seeing if there exist a record with attribute
> K *not* followed by a record with attribute L in the next 10 minutes.
>
> Every time I encounter the pattern of records I’m looking for, I would
> like to perform an action. If I understand the Beam model correctly, each
> question would correspond to a separate pipeline I would create, and it
> also sounds like I’m looking for session windows. However, I’m assuming I
> would need to tee the input source to all the separate pipelines? I have
> tried to look for documentation and/or examples on whether Apache Beam can
> be used to express such a setup and how to do it if so, but I haven’t been
> able to find anything concrete. Any help would be appreciated.
>
> Thanks!
>
> Ray
>
>
>
>
>
>
>
>
>

Re: One-to-many mapping between unbounded input source and pipelines with session windows

Posted by Ray Ruvinskiy <ra...@arcticwolf.com>.
Interesting. Thanks, Lukasz!

Suppose type A is common but type B is rate. Would your second suggestion be more appropriate in this case?

Ray

From: Lukasz Cwik <lc...@google.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, December 21, 2016 at 8:14 PM
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Subject: Re: One-to-many mapping between unbounded input source and pipelines with session windows

Globally will not perform as well because you reduce the inherent level of parallelism. 

Lets tackle this one problem and you can see if you can apply the same principles to the other problems:
- Any time we see a record (call this type A) with record[“id”] == 1 && record[“field_6”] == “some_value” *not* followed by a record (call this type B) with record[“id”] == 2 && record[“field_7”] == “other_value” in the subsequent 10 minutes.

One idea:
If type A and type B records are rare. You can use session windows with a gap duration of 10 minutes. Whenever you see a record of type A or type B, you convert them to a KV<common key, record data>. You pass this through a GBK which will produce a KV<common key, iterable<record data>> that is guaranteed to have all the records which of type A and type B that are within 10 minutes of each other. Then you scan through the iterable and output all records of type A that are not followed by a record of type B within 10 minutes. The reasoning why they need to be rare is that you don't want the session to continue forever.

Another idea:
Convert all type A and type B records to be a KV<common key, record> using a sliding window of size 20 mins being output every 10 mins. pass these through GBK, and similarly as above scan through the iterale output all records of type A that are not followed by a record of type B within 10 minutes.




On Wed, Dec 21, 2016 at 2:56 PM, Ray Ruvinskiy <ra...@arcticwolf.com> wrote:
The records have a property value in common, yes. For example, record[“record_kind”] == “foo” or record[“record_kind”] == “bar.” However, I’d be curious if the answer changes if I wanted to do this globally for the whole stream.

Thanks!

Ray

From: Lukasz Cwik <lc...@google.com>
Reply-To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Date: Wednesday, December 21, 2016 at 5:52 PM
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Subject: Re: One-to-many mapping between unbounded input source and pipelines with session windows

My first question was about how do you know two or more records are related or is this global for the entire stream?

The reason I was asking about whether you can map the qualifiers onto a fixed set of states is because I was wondering if there was a way to either use the State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and timers API (WIP https://issues.apache.org/jira/browse/BEAM-27) and just transition between a fixed number of states or create composite session keys based upon the "id" plus some small set of qualifiers and do a GBK to do a join.

In this example, how do you know the two records are related to each other (do the share a common attribute or can a common attribute be computed)?
- Any time we see a record with record[“id”] == 1 && record[“field_6”] == “some_value” *not* followed by a record with record[“id”] == 2 && record[“field_7”] == “other_value” in the subsequent 10 minutes.



On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <ra...@arcticwolf.com> wrote:
I’m unsure about your first question. Are you asking whether there’s an attribute that all the records have in common?

I think I’m looking for more flexibility than a fixed set of values but perhaps I’m overlooking something. To flesh out the example, let’s say the records are JSON documents, with fields. So, to express my examples again, I want to know:
- Any time we see record_1[“type”] == “type1” && record_1[“field1”] == “value1”, followed within no more than a minute by record_2[“type”] == “type1” && record_2[“field2”].contains(“some_substring”), followed within no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”] == “value3”
- Any time we see N records where record[“id”] == 123 within 5 hours of each other, followed by another record with record[“id”] == 456 no more than an hour later than the group of N records
- Any time we see a record with record[“id”] == 1 && record[“field_6”] == “some_value” *not* followed by a record with record[“id”] == 2 && record[“field_7”] == “other_value” in the subsequent 10 minutes.

If data is late, *ideally* it’s taken into account, but we don’t need to account for data being late for an arbitrary amount of time. We can say that if a data is, for instance, less than an hour later it should be taken into account, but if it’s more than an hour late we can ignore it.

Thanks!

Ray

From: Lukasz Cwik <lc...@google.com>
Reply-To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Date: Wednesday, December 21, 2016 at 4:47 PM
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Subject: Re: One-to-many mapping between unbounded input source and pipelines with session windows

Do the records have another attribute Z which joins them all together?
Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of values like enums or can be mapped onto a certain number of states (like an attribute A > 50 can be mapped onto a state "exceeds threshold")?
For your examples, what should occur when there is late data in your three scenarios?


On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <ra...@arcticwolf.com> wrote:
Hello,

I am trying to figure out if Apache Beam is the right framework for my use case. I have an unbounded stream, and there are a number of questions I would like to ask regarding the records in the stream:

- For example, one question may be trying to find a record with attribute A followed within no more than a minute by a record with attribute B followed within no more than 5 minutes by a record with attribute C.
- Another question may be trying to find a sequence of at least N records with attribute X within 5 hours of each other, followed by an record with attribute Y no more than an hour later.
- A third question would be seeing if there exist a record with attribute K *not* followed by a record with attribute L in the next 10 minutes.

Every time I encounter the pattern of records I’m looking for, I would like to perform an action. If I understand the Beam model correctly, each question would correspond to a separate pipeline I would create, and it also sounds like I’m looking for session windows. However, I’m assuming I would need to tee the input source to all the separate pipelines? I have tried to look for documentation and/or examples on whether Apache Beam can be used to express such a setup and how to do it if so, but I haven’t been able to find anything concrete. Any help would be appreciated.

Thanks!

Ray









Re: One-to-many mapping between unbounded input source and pipelines with session windows

Posted by Lukasz Cwik <lc...@google.com>.
Globally will not perform as well because you reduce the inherent level of
parallelism.

Lets tackle this one problem and you can see if you can apply the same
principles to the other problems:
- Any time we see a record (call this type A) with record[“id”] == 1 &&
record[“field_6”] == “some_value” *not* followed by a record (call this
type B) with record[“id”] == 2 && record[“field_7”] == “other_value” in the
subsequent 10 minutes.

One idea:
If type A and type B records are rare. You can use session windows with a
gap duration of 10 minutes. Whenever you see a record of type A or type B,
you convert them to a KV<common key, record data>. You pass this through a
GBK which will produce a KV<common key, iterable<record data>> that is
guaranteed to have all the records which of type A and type B that are
within 10 minutes of each other. Then you scan through the iterable and
output all records of type A that are not followed by a record of type B
within 10 minutes. The reasoning why they need to be rare is that you don't
want the session to continue forever.

Another idea:
Convert all type A and type B records to be a KV<common key, record> using
a sliding window of size 20 mins being output every 10 mins. pass these
through GBK, and similarly as above scan through the iterale output all
records of type A that are not followed by a record of type B within 10
minutes.




On Wed, Dec 21, 2016 at 2:56 PM, Ray Ruvinskiy <ray.ruvinskiy@arcticwolf.com
> wrote:

> The records have a property value in common, yes. For example,
> record[“record_kind”] == “foo” or record[“record_kind”] == “bar.” However,
> I’d be curious if the answer changes if I wanted to do this globally for
> the whole stream.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <lc...@google.com>
> Reply-To: "user@beam.incubator.apache.org" <user@beam.incubator.apache.org
> >
> Date: Wednesday, December 21, 2016 at 5:52 PM
> To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> My first question was about how do you know two or more records are
> related or is this global for the entire stream?
>
> The reason I was asking about whether you can map the qualifiers onto a
> fixed set of states is because I was wondering if there was a way to either
> use the State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and
> timers API (WIP https://issues.apache.org/jira/browse/BEAM-27) and just
> transition between a fixed number of states or create composite session
> keys based upon the "id" plus some small set of qualifiers and do a GBK to
> do a join.
>
> In this example, how do you know the two records are related to each other
> (do the share a common attribute or can a common attribute be computed)?
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
>
>
> On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <
> ray.ruvinskiy@arcticwolf.com> wrote:
> I’m unsure about your first question. Are you asking whether there’s an
> attribute that all the records have in common?
>
> I think I’m looking for more flexibility than a fixed set of values but
> perhaps I’m overlooking something. To flesh out the example, let’s say the
> records are JSON documents, with fields. So, to express my examples again,
> I want to know:
> - Any time we see record_1[“type”] == “type1” && record_1[“field1”] ==
> “value1”, followed within no more than a minute by record_2[“type”] ==
> “type1” && record_2[“field2”].contains(“some_substring”), followed within
> no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”]
> == “value3”
> - Any time we see N records where record[“id”] == 123 within 5 hours of
> each other, followed by another record with record[“id”] == 456 no more
> than an hour later than the group of N records
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
> If data is late, *ideally* it’s taken into account, but we don’t need to
> account for data being late for an arbitrary amount of time. We can say
> that if a data is, for instance, less than an hour later it should be taken
> into account, but if it’s more than an hour late we can ignore it.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <lc...@google.com>
> Reply-To: "user@beam.incubator.apache.org" <user@beam.incubator.apache.org
> >
> Date: Wednesday, December 21, 2016 at 4:47 PM
> To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> Do the records have another attribute Z which joins them all together?
> Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of
> values like enums or can be mapped onto a certain number of states (like an
> attribute A > 50 can be mapped onto a state "exceeds threshold")?
> For your examples, what should occur when there is late data in your three
> scenarios?
>
>
> On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <
> ray.ruvinskiy@arcticwolf.com> wrote:
> Hello,
>
> I am trying to figure out if Apache Beam is the right framework for my use
> case. I have an unbounded stream, and there are a number of questions I
> would like to ask regarding the records in the stream:
>
> - For example, one question may be trying to find a record with attribute
> A followed within no more than a minute by a record with attribute B
> followed within no more than 5 minutes by a record with attribute C.
> - Another question may be trying to find a sequence of at least N records
> with attribute X within 5 hours of each other, followed by an record with
> attribute Y no more than an hour later.
> - A third question would be seeing if there exist a record with attribute
> K *not* followed by a record with attribute L in the next 10 minutes.
>
> Every time I encounter the pattern of records I’m looking for, I would
> like to perform an action. If I understand the Beam model correctly, each
> question would correspond to a separate pipeline I would create, and it
> also sounds like I’m looking for session windows. However, I’m assuming I
> would need to tee the input source to all the separate pipelines? I have
> tried to look for documentation and/or examples on whether Apache Beam can
> be used to express such a setup and how to do it if so, but I haven’t been
> able to find anything concrete. Any help would be appreciated.
>
> Thanks!
>
> Ray
>
>
>
>
>
>
>

Re: One-to-many mapping between unbounded input source and pipelines with session windows

Posted by Ray Ruvinskiy <ra...@arcticwolf.com>.
The records have a property value in common, yes. For example, record[“record_kind”] == “foo” or record[“record_kind”] == “bar.” However, I’d be curious if the answer changes if I wanted to do this globally for the whole stream.

Thanks!

Ray

From: Lukasz Cwik <lc...@google.com>
Reply-To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Date: Wednesday, December 21, 2016 at 5:52 PM
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Subject: Re: One-to-many mapping between unbounded input source and pipelines with session windows

My first question was about how do you know two or more records are related or is this global for the entire stream? 

The reason I was asking about whether you can map the qualifiers onto a fixed set of states is because I was wondering if there was a way to either use the State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and timers API (WIP https://issues.apache.org/jira/browse/BEAM-27) and just transition between a fixed number of states or create composite session keys based upon the "id" plus some small set of qualifiers and do a GBK to do a join. 

In this example, how do you know the two records are related to each other (do the share a common attribute or can a common attribute be computed)?
- Any time we see a record with record[“id”] == 1 && record[“field_6”] == “some_value” *not* followed by a record with record[“id”] == 2 && record[“field_7”] == “other_value” in the subsequent 10 minutes.



On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <ra...@arcticwolf.com> wrote:
I’m unsure about your first question. Are you asking whether there’s an attribute that all the records have in common?

I think I’m looking for more flexibility than a fixed set of values but perhaps I’m overlooking something. To flesh out the example, let’s say the records are JSON documents, with fields. So, to express my examples again, I want to know:
- Any time we see record_1[“type”] == “type1” && record_1[“field1”] == “value1”, followed within no more than a minute by record_2[“type”] == “type1” && record_2[“field2”].contains(“some_substring”), followed within no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”] == “value3”
- Any time we see N records where record[“id”] == 123 within 5 hours of each other, followed by another record with record[“id”] == 456 no more than an hour later than the group of N records
- Any time we see a record with record[“id”] == 1 && record[“field_6”] == “some_value” *not* followed by a record with record[“id”] == 2 && record[“field_7”] == “other_value” in the subsequent 10 minutes.

If data is late, *ideally* it’s taken into account, but we don’t need to account for data being late for an arbitrary amount of time. We can say that if a data is, for instance, less than an hour later it should be taken into account, but if it’s more than an hour late we can ignore it.

Thanks!

Ray

From: Lukasz Cwik <lc...@google.com>
Reply-To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Date: Wednesday, December 21, 2016 at 4:47 PM
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Subject: Re: One-to-many mapping between unbounded input source and pipelines with session windows

Do the records have another attribute Z which joins them all together?
Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of values like enums or can be mapped onto a certain number of states (like an attribute A > 50 can be mapped onto a state "exceeds threshold")?
For your examples, what should occur when there is late data in your three scenarios?


On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <ra...@arcticwolf.com> wrote:
Hello,

I am trying to figure out if Apache Beam is the right framework for my use case. I have an unbounded stream, and there are a number of questions I would like to ask regarding the records in the stream:

- For example, one question may be trying to find a record with attribute A followed within no more than a minute by a record with attribute B followed within no more than 5 minutes by a record with attribute C.
- Another question may be trying to find a sequence of at least N records with attribute X within 5 hours of each other, followed by an record with attribute Y no more than an hour later.
- A third question would be seeing if there exist a record with attribute K *not* followed by a record with attribute L in the next 10 minutes.

Every time I encounter the pattern of records I’m looking for, I would like to perform an action. If I understand the Beam model correctly, each question would correspond to a separate pipeline I would create, and it also sounds like I’m looking for session windows. However, I’m assuming I would need to tee the input source to all the separate pipelines? I have tried to look for documentation and/or examples on whether Apache Beam can be used to express such a setup and how to do it if so, but I haven’t been able to find anything concrete. Any help would be appreciated.

Thanks!

Ray







Re: One-to-many mapping between unbounded input source and pipelines with session windows

Posted by Lukasz Cwik <lc...@google.com>.
My first question was about how do you know two or more records are related
or is this global for the entire stream?

The reason I was asking about whether you can map the qualifiers onto a
fixed set of states is because I was wondering if there was a way to either
use the State API (WIP https://issues.apache.org/jira/browse/BEAM-25) and
timers API (WIP https://issues.apache.org/jira/browse/BEAM-27) and just
transition between a fixed number of states or create composite session
keys based upon the "id" plus some small set of qualifiers and do a GBK to
do a join.

In this example, how do you know the two records are related to each other
(do the share a common attribute or can a common attribute be computed)?
- Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
“some_value” *not* followed by a record with record[“id”] == 2 &&
record[“field_7”] == “other_value” in the subsequent 10 minutes.



On Wed, Dec 21, 2016 at 2:14 PM, Ray Ruvinskiy <ray.ruvinskiy@arcticwolf.com
> wrote:

> I’m unsure about your first question. Are you asking whether there’s an
> attribute that all the records have in common?
>
> I think I’m looking for more flexibility than a fixed set of values but
> perhaps I’m overlooking something. To flesh out the example, let’s say the
> records are JSON documents, with fields. So, to express my examples again,
> I want to know:
> - Any time we see record_1[“type”] == “type1” && record_1[“field1”] ==
> “value1”, followed within no more than a minute by record_2[“type”] ==
> “type1” && record_2[“field2”].contains(“some_substring”), followed within
> no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”]
> == “value3”
> - Any time we see N records where record[“id”] == 123 within 5 hours of
> each other, followed by another record with record[“id”] == 456 no more
> than an hour later than the group of N records
> - Any time we see a record with record[“id”] == 1 && record[“field_6”] ==
> “some_value” *not* followed by a record with record[“id”] == 2 &&
> record[“field_7”] == “other_value” in the subsequent 10 minutes.
>
> If data is late, *ideally* it’s taken into account, but we don’t need to
> account for data being late for an arbitrary amount of time. We can say
> that if a data is, for instance, less than an hour later it should be taken
> into account, but if it’s more than an hour late we can ignore it.
>
> Thanks!
>
> Ray
>
> From: Lukasz Cwik <lc...@google.com>
> Reply-To: "user@beam.incubator.apache.org" <user@beam.incubator.apache.org
> >
> Date: Wednesday, December 21, 2016 at 4:47 PM
> To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
> Subject: Re: One-to-many mapping between unbounded input source and
> pipelines with session windows
>
> Do the records have another attribute Z which joins them all together?
> Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of
> values like enums or can be mapped onto a certain number of states (like an
> attribute A > 50 can be mapped onto a state "exceeds threshold")?
> For your examples, what should occur when there is late data in your three
> scenarios?
>
>
> On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <
> ray.ruvinskiy@arcticwolf.com> wrote:
> Hello,
>
> I am trying to figure out if Apache Beam is the right framework for my use
> case. I have an unbounded stream, and there are a number of questions I
> would like to ask regarding the records in the stream:
>
> - For example, one question may be trying to find a record with attribute
> A followed within no more than a minute by a record with attribute B
> followed within no more than 5 minutes by a record with attribute C.
> - Another question may be trying to find a sequence of at least N records
> with attribute X within 5 hours of each other, followed by an record with
> attribute Y no more than an hour later.
> - A third question would be seeing if there exist a record with attribute
> K *not* followed by a record with attribute L in the next 10 minutes.
>
> Every time I encounter the pattern of records I’m looking for, I would
> like to perform an action. If I understand the Beam model correctly, each
> question would correspond to a separate pipeline I would create, and it
> also sounds like I’m looking for session windows. However, I’m assuming I
> would need to tee the input source to all the separate pipelines? I have
> tried to look for documentation and/or examples on whether Apache Beam can
> be used to express such a setup and how to do it if so, but I haven’t been
> able to find anything concrete. Any help would be appreciated.
>
> Thanks!
>
> Ray
>
>
>
>
>

Re: One-to-many mapping between unbounded input source and pipelines with session windows

Posted by Ray Ruvinskiy <ra...@arcticwolf.com>.
I’m unsure about your first question. Are you asking whether there’s an attribute that all the records have in common?

I think I’m looking for more flexibility than a fixed set of values but perhaps I’m overlooking something. To flesh out the example, let’s say the records are JSON documents, with fields. So, to express my examples again, I want to know:
- Any time we see record_1[“type”] == “type1” && record_1[“field1”] == “value1”, followed within no more than a minute by record_2[“type”] == “type1” && record_2[“field2”].contains(“some_substring”), followed within no more than 5 minutes by record_3[“type”] == “type2” && record_3[“field3”] == “value3”
- Any time we see N records where record[“id”] == 123 within 5 hours of each other, followed by another record with record[“id”] == 456 no more than an hour later than the group of N records
- Any time we see a record with record[“id”] == 1 && record[“field_6”] == “some_value” *not* followed by a record with record[“id”] == 2 && record[“field_7”] == “other_value” in the subsequent 10 minutes.

If data is late, *ideally* it’s taken into account, but we don’t need to account for data being late for an arbitrary amount of time. We can say that if a data is, for instance, less than an hour later it should be taken into account, but if it’s more than an hour late we can ignore it.

Thanks!

Ray

From: Lukasz Cwik <lc...@google.com>
Reply-To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Date: Wednesday, December 21, 2016 at 4:47 PM
To: "user@beam.incubator.apache.org" <us...@beam.incubator.apache.org>
Subject: Re: One-to-many mapping between unbounded input source and pipelines with session windows

Do the records have another attribute Z which joins them all together?
Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of values like enums or can be mapped onto a certain number of states (like an attribute A > 50 can be mapped onto a state "exceeds threshold")?
For your examples, what should occur when there is late data in your three scenarios?


On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <ra...@arcticwolf.com> wrote:
Hello,

I am trying to figure out if Apache Beam is the right framework for my use case. I have an unbounded stream, and there are a number of questions I would like to ask regarding the records in the stream:

- For example, one question may be trying to find a record with attribute A followed within no more than a minute by a record with attribute B followed within no more than 5 minutes by a record with attribute C.
- Another question may be trying to find a sequence of at least N records with attribute X within 5 hours of each other, followed by an record with attribute Y no more than an hour later.
- A third question would be seeing if there exist a record with attribute K *not* followed by a record with attribute L in the next 10 minutes.

Every time I encounter the pattern of records I’m looking for, I would like to perform an action. If I understand the Beam model correctly, each question would correspond to a separate pipeline I would create, and it also sounds like I’m looking for session windows. However, I’m assuming I would need to tee the input source to all the separate pipelines? I have tried to look for documentation and/or examples on whether Apache Beam can be used to express such a setup and how to do it if so, but I haven’t been able to find anything concrete. Any help would be appreciated.

Thanks!

Ray





Re: One-to-many mapping between unbounded input source and pipelines with session windows

Posted by Lukasz Cwik <lc...@google.com>.
Do the records have another attribute Z which joins them all together?
Are the set of attributes A, B, C, X, Y, K, L are from a fixed set of
values like enums or can be mapped onto a certain number of states (like an
attribute A > 50 can be mapped onto a state "exceeds threshold")?
For your examples, what should occur when there is late data in your three
scenarios?


On Wed, Dec 21, 2016 at 9:05 AM, Ray Ruvinskiy <ray.ruvinskiy@arcticwolf.com
> wrote:

> Hello,
>
> I am trying to figure out if Apache Beam is the right framework for my use
> case. I have an unbounded stream, and there are a number of questions I
> would like to ask regarding the records in the stream:
>
> - For example, one question may be trying to find a record with attribute
> A followed within no more than a minute by a record with attribute B
> followed within no more than 5 minutes by a record with attribute C.
> - Another question may be trying to find a sequence of at least N records
> with attribute X within 5 hours of each other, followed by an record with
> attribute Y no more than an hour later.
> - A third question would be seeing if there exist a record with attribute
> K *not* followed by a record with attribute L in the next 10 minutes.
>
> Every time I encounter the pattern of records I’m looking for, I would
> like to perform an action. If I understand the Beam model correctly, each
> question would correspond to a separate pipeline I would create, and it
> also sounds like I’m looking for session windows. However, I’m assuming I
> would need to tee the input source to all the separate pipelines? I have
> tried to look for documentation and/or examples on whether Apache Beam can
> be used to express such a setup and how to do it if so, but I haven’t been
> able to find anything concrete. Any help would be appreciated.
>
> Thanks!
>
> Ray
>
>
>