You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Aaron Dixon <at...@gmail.com> on 2020/01/07 19:16:58 UTC

Custom window invariants and

I get an IllegalStateException "<window> is in more than one state address
window set" (stacktrace below).

What does this mean? What invariant of custom window implementation
& merging am I violating?

Thank you for any advise.

```
java.lang.IllegalStateException:
{[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
than one state address window set
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
(Preconditions.java:588)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
(MergingActiveWindowSet.java:334)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
(MergingActiveWindowSet.java:88)
at
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
(ReduceFnRunner.java:380)
at
org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
(StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
...
```

Re: Custom window invariants and

Posted by Aaron Dixon <at...@gmail.com>.
For future reference (for anyone searching mailing list for this or similar
issue), the ticket Kenneth pointed to (
https://issues.apache.org/jira/browse/BEAM-654) is precisely the use case I
have -- "session windows with a terminal/stop event" For now I've opted to
use the State API (
https://beam.apache.org/blog/2017/02/13/stateful-processing.html) to
achieve my particularly special windowing needs as it yields a
straightforward implementation that I can prove correct (and for my use
cases distributing the windowing logic for a key is not a performance win.)

However I'm very interested in seeing how the windowing merge semantics get
nailed down / evolve, and to explore what kind of innovative stuff could be
ultimately done with them..

On Fri, Jan 10, 2020 at 4:38 PM Aaron Dixon <at...@gmail.com> wrote:

> Once again this is a great help, thank you Kenneth
>
> On Wed, Jan 8, 2020 at 3:03 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> Hmm. I've seen this manifest in some other tweaked versions of Sessions.
>> Your invariants are right. In fact, the Nexmark queries have auctions that
>> truncate in a similar way. This prompted
>> https://issues.apache.org/jira/browse/BEAM-654.  I think we have not
>> really nailed down the right spec for merging, and we certainly aren't
>> enforcing it. To be robust, your merging should be associative and
>> commutative, which means that you can't have an "end of session" event that
>> contradicts a merge that occurred. OTOH I also know that Tyler has hacked
>> window functions that split... it is mostly unexplored, semantically.
>>
>> About the error, this may help debug: The "state address windows" for a
>> given merged window are all the windows that contribute to it. This means
>> that when windows A and B merge to become a window AB, we can leave the
>> accumulated state stored with A and B and just note that when we read from
>> AB we actually have to read from both A and B*. So suppose windows A and B
>> are about to merge. Before merge, the state address window map is:
>>
>> A -> [A]
>> B -> [B]
>>
>> After merge, there a new window AB and "window to state address window"
>> mapping
>>
>> AB -> [A, B]
>>
>> The error means that there is more than one merged window that will read
>> data from a pre-merged window. So there is a situation like
>>
>> AB -> [A, B]
>> BC -> [B, C]
>>
>> This is not intended to happen. It would be the consequence of B merging
>> into two different new windows. Hence it is an internal error. Most likely
>> a bug or a mismatch based on the assumptions. Note that this code/logic is
>> shared by all runners. I do think you can write a WindowFn that induces it.
>>
>> Kenn
>>
>> *this was intended to be a performance optimization, but eagerly copying
>> the data turned out faster so now it is a legacy compatibility thing that
>> we could remove it I think, but changing this code is tricky
>>
>> On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon <at...@gmail.com> wrote:
>>
>>> What I'm attempting is a variation on Session windows in which there may
>>> exist a "terminal" element in the stream that immediately stops the session
>>> (or perhaps after some configured delay.)
>>>
>>> My implementation behaves just like Sessions until any such "terminal"
>>> element is encountered in which case I mark the window as "terminal" and
>>> all windows "merge down" such that any terminal windows get to dictate the
>>> Interval.end()/Window.maxTimestamp().
>>>
>>> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75,
>>> terminal = true] then the merged result will be W3 [0, 75).
>>>
>>> I've been successful doing this so far but I've been inferring some
>>> invariants about windows that I'm not sure are official or documented
>>> anywhere.
>>>
>>> The invariants that I've inferred go like this:
>>>
>>> (I) Definition. An element is "in" window W if it originated in W or in
>>> a window that was merged into W (, recursively.)
>>>
>>> (II) Invariant. Any element, e, in window W MUST have e.timestamp <=
>>> W.maxTimestamp().
>>>
>>> So far, I think this is obvious and true stuff (I hope). (It would
>>> actually be better or great if there was a way for II to not have to hold,
>>> but that is a whole other separate discussion I think.)
>>>
>>> The main invariant I'm trying to formalize is one that allows me to
>>> "merge down" -- i.e., to merge in such a way that the merged window's
>>> (mergedResult's) maxTimestamp *is less than* one of the source's
>>> (toBeMerged's) windows' maxTimestamp.
>>>
>>> The (undocumented?) invariant I've been working from goes something like
>>> this:
>>>
>>> (III) Corollary. Windows W1 and W2 can merge such that either
>>> maxTimestamp() is regressed (moved backward in time aka "merge down") in
>>> the merged window -- however they cannot merge such that (II) is ever
>>> violated.
>>>
>>> Is this correct?
>>>
>>> (If you can this can be confirmed, I'll go back and ensure I'm not
>>> violating the merge() precondition and these invariants and post some code
>>> if needed..) Thank you for assistance heere!
>>>
>>>
>>> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Have you used Dataflow's update feature on this pipeline? Also, do
>>>> you have the code for your WindowFn?
>>>>
>>>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <at...@gmail.com> wrote:
>>>>
>>>>> Dataflow. (See stacktrace)
>>>>>
>>>>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Which runner are you using?
>>>>>>
>>>>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <at...@gmail.com> wrote:
>>>>>>
>>>>>>> I get an IllegalStateException "<window> is in more than one state
>>>>>>> address window set" (stacktrace below).
>>>>>>>
>>>>>>> What does this mean? What invariant of custom window implementation
>>>>>>> & merging am I violating?
>>>>>>>
>>>>>>> Thank you for any advise.
>>>>>>>
>>>>>>> ```
>>>>>>> java.lang.IllegalStateException:
>>>>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>>>>>>> than one state address window set
>>>>>>> at
>>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>>>> (Preconditions.java:588)
>>>>>>> at
>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>>>>> (MergingActiveWindowSet.java:334)
>>>>>>> at
>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>>>>> (MergingActiveWindowSet.java:88)
>>>>>>> at
>>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>>>>> (ReduceFnRunner.java:380)
>>>>>>> at
>>>>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>>>>> ...
>>>>>>> ```
>>>>>>>
>>>>>>

Re: Custom window invariants and

Posted by Aaron Dixon <at...@gmail.com>.
Once again this is a great help, thank you Kenneth

On Wed, Jan 8, 2020 at 3:03 PM Kenneth Knowles <ke...@apache.org> wrote:

> Hmm. I've seen this manifest in some other tweaked versions of Sessions.
> Your invariants are right. In fact, the Nexmark queries have auctions that
> truncate in a similar way. This prompted
> https://issues.apache.org/jira/browse/BEAM-654.  I think we have not
> really nailed down the right spec for merging, and we certainly aren't
> enforcing it. To be robust, your merging should be associative and
> commutative, which means that you can't have an "end of session" event that
> contradicts a merge that occurred. OTOH I also know that Tyler has hacked
> window functions that split... it is mostly unexplored, semantically.
>
> About the error, this may help debug: The "state address windows" for a
> given merged window are all the windows that contribute to it. This means
> that when windows A and B merge to become a window AB, we can leave the
> accumulated state stored with A and B and just note that when we read from
> AB we actually have to read from both A and B*. So suppose windows A and B
> are about to merge. Before merge, the state address window map is:
>
> A -> [A]
> B -> [B]
>
> After merge, there a new window AB and "window to state address window"
> mapping
>
> AB -> [A, B]
>
> The error means that there is more than one merged window that will read
> data from a pre-merged window. So there is a situation like
>
> AB -> [A, B]
> BC -> [B, C]
>
> This is not intended to happen. It would be the consequence of B merging
> into two different new windows. Hence it is an internal error. Most likely
> a bug or a mismatch based on the assumptions. Note that this code/logic is
> shared by all runners. I do think you can write a WindowFn that induces it.
>
> Kenn
>
> *this was intended to be a performance optimization, but eagerly copying
> the data turned out faster so now it is a legacy compatibility thing that
> we could remove it I think, but changing this code is tricky
>
> On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon <at...@gmail.com> wrote:
>
>> What I'm attempting is a variation on Session windows in which there may
>> exist a "terminal" element in the stream that immediately stops the session
>> (or perhaps after some configured delay.)
>>
>> My implementation behaves just like Sessions until any such "terminal"
>> element is encountered in which case I mark the window as "terminal" and
>> all windows "merge down" such that any terminal windows get to dictate the
>> Interval.end()/Window.maxTimestamp().
>>
>> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75,
>> terminal = true] then the merged result will be W3 [0, 75).
>>
>> I've been successful doing this so far but I've been inferring some
>> invariants about windows that I'm not sure are official or documented
>> anywhere.
>>
>> The invariants that I've inferred go like this:
>>
>> (I) Definition. An element is "in" window W if it originated in W or in a
>> window that was merged into W (, recursively.)
>>
>> (II) Invariant. Any element, e, in window W MUST have e.timestamp <=
>> W.maxTimestamp().
>>
>> So far, I think this is obvious and true stuff (I hope). (It would
>> actually be better or great if there was a way for II to not have to hold,
>> but that is a whole other separate discussion I think.)
>>
>> The main invariant I'm trying to formalize is one that allows me to
>> "merge down" -- i.e., to merge in such a way that the merged window's
>> (mergedResult's) maxTimestamp *is less than* one of the source's
>> (toBeMerged's) windows' maxTimestamp.
>>
>> The (undocumented?) invariant I've been working from goes something like
>> this:
>>
>> (III) Corollary. Windows W1 and W2 can merge such that either
>> maxTimestamp() is regressed (moved backward in time aka "merge down") in
>> the merged window -- however they cannot merge such that (II) is ever
>> violated.
>>
>> Is this correct?
>>
>> (If you can this can be confirmed, I'll go back and ensure I'm not
>> violating the merge() precondition and these invariants and post some code
>> if needed..) Thank you for assistance heere!
>>
>>
>> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Have you used Dataflow's update feature on this pipeline? Also, do
>>> you have the code for your WindowFn?
>>>
>>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <at...@gmail.com> wrote:
>>>
>>>> Dataflow. (See stacktrace)
>>>>
>>>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Which runner are you using?
>>>>>
>>>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <at...@gmail.com> wrote:
>>>>>
>>>>>> I get an IllegalStateException "<window> is in more than one state
>>>>>> address window set" (stacktrace below).
>>>>>>
>>>>>> What does this mean? What invariant of custom window implementation
>>>>>> & merging am I violating?
>>>>>>
>>>>>> Thank you for any advise.
>>>>>>
>>>>>> ```
>>>>>> java.lang.IllegalStateException:
>>>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>>>>>> than one state address window set
>>>>>> at
>>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>>> (Preconditions.java:588)
>>>>>> at
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>>>> (MergingActiveWindowSet.java:334)
>>>>>> at
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>>>> (MergingActiveWindowSet.java:88)
>>>>>> at
>>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>>>> (ReduceFnRunner.java:380)
>>>>>> at
>>>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>>>> ...
>>>>>> ```
>>>>>>
>>>>>

Re: Custom window invariants and

Posted by Kenneth Knowles <ke...@apache.org>.
Hmm. I've seen this manifest in some other tweaked versions of Sessions.
Your invariants are right. In fact, the Nexmark queries have auctions that
truncate in a similar way. This prompted
https://issues.apache.org/jira/browse/BEAM-654.  I think we have not really
nailed down the right spec for merging, and we certainly aren't enforcing
it. To be robust, your merging should be associative and commutative, which
means that you can't have an "end of session" event that contradicts a
merge that occurred. OTOH I also know that Tyler has hacked window
functions that split... it is mostly unexplored, semantically.

About the error, this may help debug: The "state address windows" for a
given merged window are all the windows that contribute to it. This means
that when windows A and B merge to become a window AB, we can leave the
accumulated state stored with A and B and just note that when we read from
AB we actually have to read from both A and B*. So suppose windows A and B
are about to merge. Before merge, the state address window map is:

A -> [A]
B -> [B]

After merge, there a new window AB and "window to state address window"
mapping

AB -> [A, B]

The error means that there is more than one merged window that will read
data from a pre-merged window. So there is a situation like

AB -> [A, B]
BC -> [B, C]

This is not intended to happen. It would be the consequence of B merging
into two different new windows. Hence it is an internal error. Most likely
a bug or a mismatch based on the assumptions. Note that this code/logic is
shared by all runners. I do think you can write a WindowFn that induces it.

Kenn

*this was intended to be a performance optimization, but eagerly copying
the data turned out faster so now it is a legacy compatibility thing that
we could remove it I think, but changing this code is tricky

On Tue, Jan 7, 2020 at 3:27 PM Aaron Dixon <at...@gmail.com> wrote:

> What I'm attempting is a variation on Session windows in which there may
> exist a "terminal" element in the stream that immediately stops the session
> (or perhaps after some configured delay.)
>
> My implementation behaves just like Sessions until any such "terminal"
> element is encountered in which case I mark the window as "terminal" and
> all windows "merge down" such that any terminal windows get to dictate the
> Interval.end()/Window.maxTimestamp().
>
> So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75,
> terminal = true] then the merged result will be W3 [0, 75).
>
> I've been successful doing this so far but I've been inferring some
> invariants about windows that I'm not sure are official or documented
> anywhere.
>
> The invariants that I've inferred go like this:
>
> (I) Definition. An element is "in" window W if it originated in W or in a
> window that was merged into W (, recursively.)
>
> (II) Invariant. Any element, e, in window W MUST have e.timestamp <=
> W.maxTimestamp().
>
> So far, I think this is obvious and true stuff (I hope). (It would
> actually be better or great if there was a way for II to not have to hold,
> but that is a whole other separate discussion I think.)
>
> The main invariant I'm trying to formalize is one that allows me to "merge
> down" -- i.e., to merge in such a way that the merged window's
> (mergedResult's) maxTimestamp *is less than* one of the source's
> (toBeMerged's) windows' maxTimestamp.
>
> The (undocumented?) invariant I've been working from goes something like
> this:
>
> (III) Corollary. Windows W1 and W2 can merge such that either
> maxTimestamp() is regressed (moved backward in time aka "merge down") in
> the merged window -- however they cannot merge such that (II) is ever
> violated.
>
> Is this correct?
>
> (If you can this can be confirmed, I'll go back and ensure I'm not
> violating the merge() precondition and these invariants and post some code
> if needed..) Thank you for assistance heere!
>
>
> On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <re...@google.com> wrote:
>
>> Have you used Dataflow's update feature on this pipeline? Also, do
>> you have the code for your WindowFn?
>>
>> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <at...@gmail.com> wrote:
>>
>>> Dataflow. (See stacktrace)
>>>
>>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Which runner are you using?
>>>>
>>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <at...@gmail.com> wrote:
>>>>
>>>>> I get an IllegalStateException "<window> is in more than one state
>>>>> address window set" (stacktrace below).
>>>>>
>>>>> What does this mean? What invariant of custom window implementation
>>>>> & merging am I violating?
>>>>>
>>>>> Thank you for any advise.
>>>>>
>>>>> ```
>>>>> java.lang.IllegalStateException:
>>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>>>>> than one state address window set
>>>>> at
>>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>>> (Preconditions.java:588)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>>> (MergingActiveWindowSet.java:334)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>>> (MergingActiveWindowSet.java:88)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>>> (ReduceFnRunner.java:380)
>>>>> at
>>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>>> ...
>>>>> ```
>>>>>
>>>>

Re: Custom window invariants and

Posted by Aaron Dixon <at...@gmail.com>.
What I'm attempting is a variation on Session windows in which there may
exist a "terminal" element in the stream that immediately stops the session
(or perhaps after some configured delay.)

My implementation behaves just like Sessions until any such "terminal"
element is encountered in which case I mark the window as "terminal" and
all windows "merge down" such that any terminal windows get to dictate the
Interval.end()/Window.maxTimestamp().

So, trivial example, if I have windows W1 [0, 100) and W2 [50, 75, terminal
= true] then the merged result will be W3 [0, 75).

I've been successful doing this so far but I've been inferring some
invariants about windows that I'm not sure are official or documented
anywhere.

The invariants that I've inferred go like this:

(I) Definition. An element is "in" window W if it originated in W or in a
window that was merged into W (, recursively.)

(II) Invariant. Any element, e, in window W MUST have e.timestamp <=
W.maxTimestamp().

So far, I think this is obvious and true stuff (I hope). (It would actually
be better or great if there was a way for II to not have to hold, but that
is a whole other separate discussion I think.)

The main invariant I'm trying to formalize is one that allows me to "merge
down" -- i.e., to merge in such a way that the merged window's
(mergedResult's) maxTimestamp *is less than* one of the source's
(toBeMerged's) windows' maxTimestamp.

The (undocumented?) invariant I've been working from goes something like
this:

(III) Corollary. Windows W1 and W2 can merge such that either
maxTimestamp() is regressed (moved backward in time aka "merge down") in
the merged window -- however they cannot merge such that (II) is ever
violated.

Is this correct?

(If you can this can be confirmed, I'll go back and ensure I'm not
violating the merge() precondition and these invariants and post some code
if needed..) Thank you for assistance heere!


On Tue, Jan 7, 2020 at 4:21 PM Reuven Lax <re...@google.com> wrote:

> Have you used Dataflow's update feature on this pipeline? Also, do
> you have the code for your WindowFn?
>
> On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <at...@gmail.com> wrote:
>
>> Dataflow. (See stacktrace)
>>
>> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Which runner are you using?
>>>
>>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <at...@gmail.com> wrote:
>>>
>>>> I get an IllegalStateException "<window> is in more than one state
>>>> address window set" (stacktrace below).
>>>>
>>>> What does this mean? What invariant of custom window implementation
>>>> & merging am I violating?
>>>>
>>>> Thank you for any advise.
>>>>
>>>> ```
>>>> java.lang.IllegalStateException:
>>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>>>> than one state address window set
>>>> at
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>>> (Preconditions.java:588)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>>> (MergingActiveWindowSet.java:334)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>>> (MergingActiveWindowSet.java:88)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>>> (ReduceFnRunner.java:380)
>>>> at
>>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>>> ...
>>>> ```
>>>>
>>>

Re: Custom window invariants and

Posted by Reuven Lax <re...@google.com>.
Have you used Dataflow's update feature on this pipeline? Also, do you have
the code for your WindowFn?

On Tue, Jan 7, 2020 at 12:05 PM Aaron Dixon <at...@gmail.com> wrote:

> Dataflow. (See stacktrace)
>
> On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <re...@google.com> wrote:
>
>> Which runner are you using?
>>
>> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <at...@gmail.com> wrote:
>>
>>> I get an IllegalStateException "<window> is in more than one state
>>> address window set" (stacktrace below).
>>>
>>> What does this mean? What invariant of custom window implementation
>>> & merging am I violating?
>>>
>>> Thank you for any advise.
>>>
>>> ```
>>> java.lang.IllegalStateException:
>>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>>> than one state address window set
>>> at
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>>> (Preconditions.java:588)
>>> at
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>>> (MergingActiveWindowSet.java:334)
>>> at
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>>> (MergingActiveWindowSet.java:88)
>>> at
>>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>>> (ReduceFnRunner.java:380)
>>> at
>>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>>> ...
>>> ```
>>>
>>

Re: Custom window invariants and

Posted by Aaron Dixon <at...@gmail.com>.
Dataflow. (See stacktrace)

On Tue, Jan 7, 2020 at 1:50 PM Reuven Lax <re...@google.com> wrote:

> Which runner are you using?
>
> On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <at...@gmail.com> wrote:
>
>> I get an IllegalStateException "<window> is in more than one state
>> address window set" (stacktrace below).
>>
>> What does this mean? What invariant of custom window implementation
>> & merging am I violating?
>>
>> Thank you for any advise.
>>
>> ```
>> java.lang.IllegalStateException:
>> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
>> than one state address window set
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
>> (Preconditions.java:588)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
>> (MergingActiveWindowSet.java:334)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
>> (MergingActiveWindowSet.java:88)
>> at
>> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
>> (ReduceFnRunner.java:380)
>> at
>> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
>> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
>> ...
>> ```
>>
>

Re: Custom window invariants and

Posted by Reuven Lax <re...@google.com>.
Which runner are you using?

On Tue, Jan 7, 2020, 11:17 AM Aaron Dixon <at...@gmail.com> wrote:

> I get an IllegalStateException "<window> is in more than one state address
> window set" (stacktrace below).
>
> What does this mean? What invariant of custom window implementation
> & merging am I violating?
>
> Thank you for any advise.
>
> ```
> java.lang.IllegalStateException:
> {[2019-12-05T01:36:48.870Z..2019-12-05T01:36:48.871Z),terminal} is in more
> than one state address window set
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState
> (Preconditions.java:588)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.checkInvariants
> (MergingActiveWindowSet.java:334)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.MergingActiveWindowSet.persist
> (MergingActiveWindowSet.java:88)
> at
> org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.ReduceFnRunner.persist
> (ReduceFnRunner.java:380)
> at
> org.apache.beam.runners.dataflow.worker.StreamingGroupAlsoByWindowViaWindowSetFn.processElement
> (StreamingGroupAlsoByWindowViaWindowSetFn.java:96)
> ...
> ```
>