You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jainik Vora <ja...@gmail.com> on 2020/02/13 03:30:24 UTC

Implementing custom session with max event/element count

Hi Everyone,

I am trying to implement session on events with three criteria
1. Gap Duration - eg. 10 mins
2. Max duration - eg. 1 hour
3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by implementing
a custom BoundedWindow keeping track of window size and max duration. But
I’m having difficulty implementing 3rd criteria which is - a session should
have maximum number of events.I’m trying to implement this by tracking
number of events in a window but while testing I noticed that mergeWindows
is called every 3 seconds and after mergeWindows is executed, windows in
that merge is lost, so is the metadata of number of events seen in that
window.Any example of pointers would be helpful on how to implement a
session with max element/event count. Below is the code I implemented a
custom WindowFn:

public class UserSessions extends WindowFn<KV<String, Event>, IntervalWindow> {
  private final Duration gapDuration;
  private Duration maxSize;
  private static final Duration DEFAULT_SIZE_DURATION =
Duration.standardHours(12L);
  public UserSessions(Duration gapDuration, Duration sizeDuration) {
    this.gapDuration = gapDuration;
    this.maxSize = sizeDuration;
  }
  public static UserSessions withGapDuration(Duration gapDuration) {
    return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
  }
  public UserSessions withMaxSize(Duration maxSize) {
    this.maxSize = maxSize;
    return this;
  }
  @Override
  public Collection<IntervalWindow> assignWindows(AssignContext
assignContext) throws Exception {
    return Arrays.asList(new IntervalWindow(assignContext.timestamp(),
gapDuration));
  }
  private Duration windowSize(IntervalWindow window) {
    return window == null
            ? new Duration(0)
            : new Duration(window.start(), window.end());
  }
  @Override
  public void mergeWindows(MergeContext mergeContext) throws Exception {
    List<IntervalWindow> sortedWindows = new ArrayList<>();
    for (IntervalWindow window : mergeContext.windows()) {
      sortedWindows.add(window);
    }
    Collections.sort(sortedWindows);
    List<MergeCandidate> merges = new ArrayList<>();
    MergeCandidate current = new MergeCandidate();
    for (IntervalWindow window : sortedWindows) {
      MergeCandidate next = new MergeCandidate(window);
      if (current.intersects(window)) {
        current.add(window);
        Duration currentWindow = windowSize(current.union);
        if (currentWindow.isShorterThan(maxSize) ||
currentWindow.isEqual(maxSize) || current.size() < 10)
          continue;
        // Current window exceeds bounds, so flush and move to next
        LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never hits.
        next = new MergeCandidate();
      }
      merges.add(current);
      current = next;
    }
    merges.add(current);
    for (MergeCandidate merge : merges) {
      merge.apply(mergeContext);
    }
  }
  @Override
  public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
    throw new UnsupportedOperationException("Sessions is not allowed
in side inputs");
  }
  @Override
  public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
  }
  @Override
  public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
  }
  private static class MergeCandidate {
    @Nullable
    private IntervalWindow union;
    private final List<IntervalWindow> parts;
    public MergeCandidate() {
      union = null;
      parts = new ArrayList<>();
    }
    public MergeCandidate(IntervalWindow window) {
      union = window;
      parts = new ArrayList<>(Arrays.asList(window));
    }
    public boolean intersects(IntervalWindow window) {
      return union == null || union.intersects(window);
    }
    public void add(IntervalWindow window) {
      union = union == null ? window : union.span(window);
      union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
      parts.add(window);
    }
    public void apply(WindowFn<?, IntervalWindow>.MergeContext c)
throws Exception {
      if (this.parts.size() > 1) {
        c.merge(parts, union);
      }
    }
    public int size() {
      return this.parts.size();
    }
    @Override
    public String toString() {
      return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
    }
  }
}

Thanks & Regards,

*Jainik Vora*

Re: Implementing custom session with max event/element count

Posted by Jainik Vora <ja...@gmail.com>.
Hi Kenn,

Let me know if I'm missing something in below email. I was going through
stateful processing blog posts but that wouldn't solve the issue since
Window cannot be broken on operations done after WindowFn. Wondering what I
could leverage to enforce max events in a Window that has a gap duration +
a max duration.

Appreciate the help on this,
Jainik

On Wed, Feb 12, 2020 at 8:22 PM Jainik Vora <ja...@gmail.com> wrote:

> Thanks for noticing Kenn, I did try with a custom implementation of
> InternalWindow storing windowEventCount in addition to start and end but it
> didn't work as expected. Missed to remove reference from code sample I
> pasted. Below is the custom IntervalWindow and a custom coder for that
> class.
>
> public class SessionIntervalWindow extends BoundedWindow implements Comparable<SessionIntervalWindow> {
>   /** Start of the interval, inclusive. */
>   private final Instant start;
>
>   /** End of the interval, exclusive. */
>   private final Instant end;
>
>   private int windowEventCount = 1;
>
>   /** Creates a new SessionIntervalWindow that represents the half-open time interval [start, end). */
>   public SessionIntervalWindow(Instant start, Instant end) {
>     this.start = start;
>     this.end = end;
>   }
>
>   public SessionIntervalWindow(Instant start, Instant end, int windowEventCount) {
>     this.start = start;
>     this.end = end;
>     this.windowEventCount = windowEventCount;
>   }
>
>   public SessionIntervalWindow(Instant start, ReadableDuration size) {
>     this.start = start;
>     this.end = start.plus(size);
> //    this.windowEventCount = windowEventCount;
>   }
>
>   /** Returns the start of this window, inclusive. */
>   public Instant start() {
>     return start;
>   }
>
>   /** Returns the end of this window, exclusive. */
>   public Instant end() {
>     return end;
>   }
>
>   public int getWindowEventCount() {
>     return this.windowEventCount;
>   }
>
>   public void incrementWindowEventCountBy(int increment) {
>     this.windowEventCount += increment;
>   }
>
>   /** Returns the largest timestamp that can be included in this window. */
>   @Override
>   public Instant maxTimestamp() {
>     // end not inclusive
>     return end.minus(1);
>   }
>
>   /** Returns whether this window contains the given window. */
>   public boolean contains(SessionIntervalWindow other) {
>     return !this.start.isAfter(other.start) && !this.end.isBefore(other.end);
>   }
>
>   /** Returns whether this window is disjoint from the given window. */
>   public boolean isDisjoint(SessionIntervalWindow other) {
>     return !this.end.isAfter(other.start) || !other.end.isAfter(this.start);
>   }
>
>   /** Returns whether this window intersects the given window. */
>   public boolean intersects(SessionIntervalWindow other) {
>     return !isDisjoint(other);
>   }
>
>   /** Returns the minimal window that includes both this window and the given window. */
>   public SessionIntervalWindow span(SessionIntervalWindow other) {
>     return new SessionIntervalWindow(
>             new Instant(Math.min(start.getMillis(), other.start.getMillis())),
>             new Instant(Math.max(end.getMillis(), other.end.getMillis())));
>   }
>
>   @Override
>   public boolean equals(Object o) {
>     return (o instanceof SessionIntervalWindow)
>             && ((SessionIntervalWindow) o).end.isEqual(end)
>             && ((SessionIntervalWindow) o).start.isEqual(start);
>   }
>
>   @Override
>   public int hashCode() {
>     // The end values are themselves likely to be arithmetic sequence, which
>     // is a poor distribution to use for a hashtable, so we
>     // add a highly non-linear transformation.
>     return (int) (start.getMillis() + modInverse((int) (end.getMillis() << 1) + 1));
>   }
>
>   /** Compute the inverse of (odd) x mod 2^32. */
>   private int modInverse(int x) {
>     // Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x.
>     int inverse = x * x * x;
>     // Newton iteration doubles correct bits at each step.
>     inverse *= 2 - x * inverse;
>     inverse *= 2 - x * inverse;
>     inverse *= 2 - x * inverse;
>     return inverse;
>   }
>
>   @Override
>   public String toString() {
>     return "[" + start + ".." + end + "), "+ windowEventCount + " )";
>   }
>
>
>
>   @Override
>   public int compareTo(SessionIntervalWindow o) {
>     if (start.isEqual(o.start)) {
>       return end.compareTo(o.end);
>     }
>     return start.compareTo(o.start);
>   }
>
>   /** Returns a {@link Coder} suitable for {@link SessionIntervalWindow}. */
>   public static Coder<SessionIntervalWindow> getCoder() {
>     return SessionIntervalWindow.IntervalWindowCoder.of();
>   }
>
>   /** Encodes an {@link SessionIntervalWindow} as a pair of its upper bound and duration. */
>   public static class IntervalWindowCoder extends StructuredCoder<SessionIntervalWindow> {
>
>     private static final SessionIntervalWindow.IntervalWindowCoder INSTANCE = new SessionIntervalWindow.IntervalWindowCoder();
>
>     private static final Coder<Instant> instantCoder = InstantCoder.of();
>     private static final Coder<ReadableDuration> durationCoder = DurationCoder.of();
>     private static final Coder<Integer> integerCoder = VarIntCoder.of();
>
>     public static SessionIntervalWindow.IntervalWindowCoder of() {
>       return INSTANCE;
>     }
>
>     @Override
>     public void encode(SessionIntervalWindow window, OutputStream outStream)
>             throws IOException, CoderException {
>       instantCoder.encode(window.end, outStream);
>       durationCoder.encode(new Duration(window.start, window.end), outStream);
>       integerCoder.encode(window.windowEventCount, outStream);
>     }
>
>     @Override
>     public SessionIntervalWindow decode(InputStream inStream) throws IOException, CoderException {
>       Instant end = instantCoder.decode(inStream);
>       ReadableDuration duration = durationCoder.decode(inStream);
>       int windowEventCount = integerCoder.decode(inStream);
>       return new SessionIntervalWindow(end.minus(duration), end, windowEventCount);
>     }
>
>     @Override
>     public void verifyDeterministic() throws NonDeterministicException {
>       instantCoder.verifyDeterministic();
>       durationCoder.verifyDeterministic();
>     }
>
>     @Override
>     public boolean consistentWithEquals() {
>       return instantCoder.consistentWithEquals() && durationCoder.consistentWithEquals();
>     }
>
>     @Override
>     public List<? extends Coder<?>> getCoderArguments() {
>       return Collections.emptyList();
>     }
>   }
> }
>
>
> On Wed, Feb 12, 2020 at 8:11 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> I notice that you use the name "IntervalWindow" but you are calling
>> methods that IntervalWindow does not have. Do you have a custom
>> implementation of this class? Do you have a custom coder for your version
>> of IntervalWindow?
>>
>> Kenn
>>
>> On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <ja...@gmail.com>
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> I am trying to implement session on events with three criteria
>>> 1. Gap Duration - eg. 10 mins
>>> 2. Max duration - eg. 1 hour
>>> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by
>>> implementing a custom BoundedWindow keeping track of window size and max
>>> duration. But I’m having difficulty implementing 3rd criteria which is - a
>>> session should have maximum number of events.I’m trying to implement
>>> this by tracking number of events in a window but while testing I noticed
>>> that mergeWindows is called every 3 seconds and after mergeWindows is
>>> executed, windows in that merge is lost, so is the metadata of number of
>>> events seen in that window.Any example of pointers would be helpful on
>>> how to implement a session with max element/event count. Below is the
>>> code I implemented a custom WindowFn:
>>>
>>> public class UserSessions extends WindowFn<KV<String, Event>, IntervalWindow> {
>>>   private final Duration gapDuration;
>>>   private Duration maxSize;
>>>   private static final Duration DEFAULT_SIZE_DURATION = Duration.standardHours(12L);
>>>   public UserSessions(Duration gapDuration, Duration sizeDuration) {
>>>     this.gapDuration = gapDuration;
>>>     this.maxSize = sizeDuration;
>>>   }
>>>   public static UserSessions withGapDuration(Duration gapDuration) {
>>>     return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
>>>   }
>>>   public UserSessions withMaxSize(Duration maxSize) {
>>>     this.maxSize = maxSize;
>>>     return this;
>>>   }
>>>   @Override
>>>   public Collection<IntervalWindow> assignWindows(AssignContext assignContext) throws Exception {
>>>     return Arrays.asList(new IntervalWindow(assignContext.timestamp(), gapDuration));
>>>   }
>>>   private Duration windowSize(IntervalWindow window) {
>>>     return window == null
>>>             ? new Duration(0)
>>>             : new Duration(window.start(), window.end());
>>>   }
>>>   @Override
>>>   public void mergeWindows(MergeContext mergeContext) throws Exception {
>>>     List<IntervalWindow> sortedWindows = new ArrayList<>();
>>>     for (IntervalWindow window : mergeContext.windows()) {
>>>       sortedWindows.add(window);
>>>     }
>>>     Collections.sort(sortedWindows);
>>>     List<MergeCandidate> merges = new ArrayList<>();
>>>     MergeCandidate current = new MergeCandidate();
>>>     for (IntervalWindow window : sortedWindows) {
>>>       MergeCandidate next = new MergeCandidate(window);
>>>       if (current.intersects(window)) {
>>>         current.add(window);
>>>         Duration currentWindow = windowSize(current.union);
>>>         if (currentWindow.isShorterThan(maxSize) || currentWindow.isEqual(maxSize) || current.size() < 10)
>>>           continue;
>>>         // Current window exceeds bounds, so flush and move to next
>>>         LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never hits.
>>>         next = new MergeCandidate();
>>>       }
>>>       merges.add(current);
>>>       current = next;
>>>     }
>>>     merges.add(current);
>>>     for (MergeCandidate merge : merges) {
>>>       merge.apply(mergeContext);
>>>     }
>>>   }
>>>   @Override
>>>   public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
>>>     throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
>>>   }
>>>   @Override
>>>   public boolean isCompatible(WindowFn<?, ?> other) {
>>>     return false;
>>>   }
>>>   @Override
>>>   public Coder<IntervalWindow> windowCoder() {
>>>     return IntervalWindow.getCoder();
>>>   }
>>>   private static class MergeCandidate {
>>>     @Nullable
>>>     private IntervalWindow union;
>>>     private final List<IntervalWindow> parts;
>>>     public MergeCandidate() {
>>>       union = null;
>>>       parts = new ArrayList<>();
>>>     }
>>>     public MergeCandidate(IntervalWindow window) {
>>>       union = window;
>>>       parts = new ArrayList<>(Arrays.asList(window));
>>>     }
>>>     public boolean intersects(IntervalWindow window) {
>>>       return union == null || union.intersects(window);
>>>     }
>>>     public void add(IntervalWindow window) {
>>>       union = union == null ? window : union.span(window);
>>>       union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
>>>       parts.add(window);
>>>     }
>>>     public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws Exception {
>>>       if (this.parts.size() > 1) {
>>>         c.merge(parts, union);
>>>       }
>>>     }
>>>     public int size() {
>>>       return this.parts.size();
>>>     }
>>>     @Override
>>>     public String toString() {
>>>       return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
>>>     }
>>>   }
>>> }
>>>
>>> Thanks & Regards,
>>>
>>> *Jainik Vora*
>>>
>>>
>
>

-- 


Thanks & Regards,

*Jainik Vora*

Re: Implementing custom session with max event/element count

Posted by Jainik Vora <ja...@gmail.com>.
Thanks for noticing Kenn, I did try with a custom implementation of
InternalWindow storing windowEventCount in addition to start and end but it
didn't work as expected. Missed to remove reference from code sample I
pasted. Below is the custom IntervalWindow and a custom coder for that
class.

public class SessionIntervalWindow extends BoundedWindow implements
Comparable<SessionIntervalWindow> {
  /** Start of the interval, inclusive. */
  private final Instant start;

  /** End of the interval, exclusive. */
  private final Instant end;

  private int windowEventCount = 1;

  /** Creates a new SessionIntervalWindow that represents the
half-open time interval [start, end). */
  public SessionIntervalWindow(Instant start, Instant end) {
    this.start = start;
    this.end = end;
  }

  public SessionIntervalWindow(Instant start, Instant end, int
windowEventCount) {
    this.start = start;
    this.end = end;
    this.windowEventCount = windowEventCount;
  }

  public SessionIntervalWindow(Instant start, ReadableDuration size) {
    this.start = start;
    this.end = start.plus(size);
//    this.windowEventCount = windowEventCount;
  }

  /** Returns the start of this window, inclusive. */
  public Instant start() {
    return start;
  }

  /** Returns the end of this window, exclusive. */
  public Instant end() {
    return end;
  }

  public int getWindowEventCount() {
    return this.windowEventCount;
  }

  public void incrementWindowEventCountBy(int increment) {
    this.windowEventCount += increment;
  }

  /** Returns the largest timestamp that can be included in this window. */
  @Override
  public Instant maxTimestamp() {
    // end not inclusive
    return end.minus(1);
  }

  /** Returns whether this window contains the given window. */
  public boolean contains(SessionIntervalWindow other) {
    return !this.start.isAfter(other.start) && !this.end.isBefore(other.end);
  }

  /** Returns whether this window is disjoint from the given window. */
  public boolean isDisjoint(SessionIntervalWindow other) {
    return !this.end.isAfter(other.start) || !other.end.isAfter(this.start);
  }

  /** Returns whether this window intersects the given window. */
  public boolean intersects(SessionIntervalWindow other) {
    return !isDisjoint(other);
  }

  /** Returns the minimal window that includes both this window and
the given window. */
  public SessionIntervalWindow span(SessionIntervalWindow other) {
    return new SessionIntervalWindow(
            new Instant(Math.min(start.getMillis(), other.start.getMillis())),
            new Instant(Math.max(end.getMillis(), other.end.getMillis())));
  }

  @Override
  public boolean equals(Object o) {
    return (o instanceof SessionIntervalWindow)
            && ((SessionIntervalWindow) o).end.isEqual(end)
            && ((SessionIntervalWindow) o).start.isEqual(start);
  }

  @Override
  public int hashCode() {
    // The end values are themselves likely to be arithmetic sequence, which
    // is a poor distribution to use for a hashtable, so we
    // add a highly non-linear transformation.
    return (int) (start.getMillis() + modInverse((int)
(end.getMillis() << 1) + 1));
  }

  /** Compute the inverse of (odd) x mod 2^32. */
  private int modInverse(int x) {
    // Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x.
    int inverse = x * x * x;
    // Newton iteration doubles correct bits at each step.
    inverse *= 2 - x * inverse;
    inverse *= 2 - x * inverse;
    inverse *= 2 - x * inverse;
    return inverse;
  }

  @Override
  public String toString() {
    return "[" + start + ".." + end + "), "+ windowEventCount + " )";
  }



  @Override
  public int compareTo(SessionIntervalWindow o) {
    if (start.isEqual(o.start)) {
      return end.compareTo(o.end);
    }
    return start.compareTo(o.start);
  }

  /** Returns a {@link Coder} suitable for {@link SessionIntervalWindow}. */
  public static Coder<SessionIntervalWindow> getCoder() {
    return SessionIntervalWindow.IntervalWindowCoder.of();
  }

  /** Encodes an {@link SessionIntervalWindow} as a pair of its upper
bound and duration. */
  public static class IntervalWindowCoder extends
StructuredCoder<SessionIntervalWindow> {

    private static final SessionIntervalWindow.IntervalWindowCoder
INSTANCE = new SessionIntervalWindow.IntervalWindowCoder();

    private static final Coder<Instant> instantCoder = InstantCoder.of();
    private static final Coder<ReadableDuration> durationCoder =
DurationCoder.of();
    private static final Coder<Integer> integerCoder = VarIntCoder.of();

    public static SessionIntervalWindow.IntervalWindowCoder of() {
      return INSTANCE;
    }

    @Override
    public void encode(SessionIntervalWindow window, OutputStream outStream)
            throws IOException, CoderException {
      instantCoder.encode(window.end, outStream);
      durationCoder.encode(new Duration(window.start, window.end), outStream);
      integerCoder.encode(window.windowEventCount, outStream);
    }

    @Override
    public SessionIntervalWindow decode(InputStream inStream) throws
IOException, CoderException {
      Instant end = instantCoder.decode(inStream);
      ReadableDuration duration = durationCoder.decode(inStream);
      int windowEventCount = integerCoder.decode(inStream);
      return new SessionIntervalWindow(end.minus(duration), end,
windowEventCount);
    }

    @Override
    public void verifyDeterministic() throws NonDeterministicException {
      instantCoder.verifyDeterministic();
      durationCoder.verifyDeterministic();
    }

    @Override
    public boolean consistentWithEquals() {
      return instantCoder.consistentWithEquals() &&
durationCoder.consistentWithEquals();
    }

    @Override
    public List<? extends Coder<?>> getCoderArguments() {
      return Collections.emptyList();
    }
  }
}


On Wed, Feb 12, 2020 at 8:11 PM Kenneth Knowles <ke...@apache.org> wrote:

> I notice that you use the name "IntervalWindow" but you are calling
> methods that IntervalWindow does not have. Do you have a custom
> implementation of this class? Do you have a custom coder for your version
> of IntervalWindow?
>
> Kenn
>
> On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <ja...@gmail.com> wrote:
>
>> Hi Everyone,
>>
>> I am trying to implement session on events with three criteria
>> 1. Gap Duration - eg. 10 mins
>> 2. Max duration - eg. 1 hour
>> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by
>> implementing a custom BoundedWindow keeping track of window size and max
>> duration. But I’m having difficulty implementing 3rd criteria which is - a
>> session should have maximum number of events.I’m trying to implement
>> this by tracking number of events in a window but while testing I noticed
>> that mergeWindows is called every 3 seconds and after mergeWindows is
>> executed, windows in that merge is lost, so is the metadata of number of
>> events seen in that window.Any example of pointers would be helpful on
>> how to implement a session with max element/event count. Below is the
>> code I implemented a custom WindowFn:
>>
>> public class UserSessions extends WindowFn<KV<String, Event>, IntervalWindow> {
>>   private final Duration gapDuration;
>>   private Duration maxSize;
>>   private static final Duration DEFAULT_SIZE_DURATION = Duration.standardHours(12L);
>>   public UserSessions(Duration gapDuration, Duration sizeDuration) {
>>     this.gapDuration = gapDuration;
>>     this.maxSize = sizeDuration;
>>   }
>>   public static UserSessions withGapDuration(Duration gapDuration) {
>>     return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
>>   }
>>   public UserSessions withMaxSize(Duration maxSize) {
>>     this.maxSize = maxSize;
>>     return this;
>>   }
>>   @Override
>>   public Collection<IntervalWindow> assignWindows(AssignContext assignContext) throws Exception {
>>     return Arrays.asList(new IntervalWindow(assignContext.timestamp(), gapDuration));
>>   }
>>   private Duration windowSize(IntervalWindow window) {
>>     return window == null
>>             ? new Duration(0)
>>             : new Duration(window.start(), window.end());
>>   }
>>   @Override
>>   public void mergeWindows(MergeContext mergeContext) throws Exception {
>>     List<IntervalWindow> sortedWindows = new ArrayList<>();
>>     for (IntervalWindow window : mergeContext.windows()) {
>>       sortedWindows.add(window);
>>     }
>>     Collections.sort(sortedWindows);
>>     List<MergeCandidate> merges = new ArrayList<>();
>>     MergeCandidate current = new MergeCandidate();
>>     for (IntervalWindow window : sortedWindows) {
>>       MergeCandidate next = new MergeCandidate(window);
>>       if (current.intersects(window)) {
>>         current.add(window);
>>         Duration currentWindow = windowSize(current.union);
>>         if (currentWindow.isShorterThan(maxSize) || currentWindow.isEqual(maxSize) || current.size() < 10)
>>           continue;
>>         // Current window exceeds bounds, so flush and move to next
>>         LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never hits.
>>         next = new MergeCandidate();
>>       }
>>       merges.add(current);
>>       current = next;
>>     }
>>     merges.add(current);
>>     for (MergeCandidate merge : merges) {
>>       merge.apply(mergeContext);
>>     }
>>   }
>>   @Override
>>   public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
>>     throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
>>   }
>>   @Override
>>   public boolean isCompatible(WindowFn<?, ?> other) {
>>     return false;
>>   }
>>   @Override
>>   public Coder<IntervalWindow> windowCoder() {
>>     return IntervalWindow.getCoder();
>>   }
>>   private static class MergeCandidate {
>>     @Nullable
>>     private IntervalWindow union;
>>     private final List<IntervalWindow> parts;
>>     public MergeCandidate() {
>>       union = null;
>>       parts = new ArrayList<>();
>>     }
>>     public MergeCandidate(IntervalWindow window) {
>>       union = window;
>>       parts = new ArrayList<>(Arrays.asList(window));
>>     }
>>     public boolean intersects(IntervalWindow window) {
>>       return union == null || union.intersects(window);
>>     }
>>     public void add(IntervalWindow window) {
>>       union = union == null ? window : union.span(window);
>>       union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
>>       parts.add(window);
>>     }
>>     public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws Exception {
>>       if (this.parts.size() > 1) {
>>         c.merge(parts, union);
>>       }
>>     }
>>     public int size() {
>>       return this.parts.size();
>>     }
>>     @Override
>>     public String toString() {
>>       return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
>>     }
>>   }
>> }
>>
>> Thanks & Regards,
>>
>> *Jainik Vora*
>>
>>

-- 


Thanks & Regards,

*Jainikkumar Vora*

Software Engineer 2

Data Fabric | Intuit

408-854-0311 | LinkedIn <https://www.linkedin.com/in/jainikvora>

Re: Implementing custom session with max event/element count

Posted by Kenneth Knowles <ke...@apache.org>.
I notice that you use the name "IntervalWindow" but you are calling methods
that IntervalWindow does not have. Do you have a custom implementation of
this class? Do you have a custom coder for your version of IntervalWindow?

Kenn

On Wed, Feb 12, 2020 at 7:30 PM Jainik Vora <ja...@gmail.com> wrote:

> Hi Everyone,
>
> I am trying to implement session on events with three criteria
> 1. Gap Duration - eg. 10 mins
> 2. Max duration - eg. 1 hour
> 3. Max events - eg. 500 eventsI’m able to implement 1 and 2 by
> implementing a custom BoundedWindow keeping track of window size and max
> duration. But I’m having difficulty implementing 3rd criteria which is - a
> session should have maximum number of events.I’m trying to implement this
> by tracking number of events in a window but while testing I noticed that
> mergeWindows is called every 3 seconds and after mergeWindows is executed,
> windows in that merge is lost, so is the metadata of number of events seen
> in that window.Any example of pointers would be helpful on how to
> implement a session with max element/event count. Below is the code I
> implemented a custom WindowFn:
>
> public class UserSessions extends WindowFn<KV<String, Event>, IntervalWindow> {
>   private final Duration gapDuration;
>   private Duration maxSize;
>   private static final Duration DEFAULT_SIZE_DURATION = Duration.standardHours(12L);
>   public UserSessions(Duration gapDuration, Duration sizeDuration) {
>     this.gapDuration = gapDuration;
>     this.maxSize = sizeDuration;
>   }
>   public static UserSessions withGapDuration(Duration gapDuration) {
>     return new UserSessions(gapDuration, DEFAULT_SIZE_DURATION);
>   }
>   public UserSessions withMaxSize(Duration maxSize) {
>     this.maxSize = maxSize;
>     return this;
>   }
>   @Override
>   public Collection<IntervalWindow> assignWindows(AssignContext assignContext) throws Exception {
>     return Arrays.asList(new IntervalWindow(assignContext.timestamp(), gapDuration));
>   }
>   private Duration windowSize(IntervalWindow window) {
>     return window == null
>             ? new Duration(0)
>             : new Duration(window.start(), window.end());
>   }
>   @Override
>   public void mergeWindows(MergeContext mergeContext) throws Exception {
>     List<IntervalWindow> sortedWindows = new ArrayList<>();
>     for (IntervalWindow window : mergeContext.windows()) {
>       sortedWindows.add(window);
>     }
>     Collections.sort(sortedWindows);
>     List<MergeCandidate> merges = new ArrayList<>();
>     MergeCandidate current = new MergeCandidate();
>     for (IntervalWindow window : sortedWindows) {
>       MergeCandidate next = new MergeCandidate(window);
>       if (current.intersects(window)) {
>         current.add(window);
>         Duration currentWindow = windowSize(current.union);
>         if (currentWindow.isShorterThan(maxSize) || currentWindow.isEqual(maxSize) || current.size() < 10)
>           continue;
>         // Current window exceeds bounds, so flush and move to next
>         LOG.info("********** EXCEEDS 10 Events CRITERIA."); // this never hits.
>         next = new MergeCandidate();
>       }
>       merges.add(current);
>       current = next;
>     }
>     merges.add(current);
>     for (MergeCandidate merge : merges) {
>       merge.apply(mergeContext);
>     }
>   }
>   @Override
>   public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
>     throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
>   }
>   @Override
>   public boolean isCompatible(WindowFn<?, ?> other) {
>     return false;
>   }
>   @Override
>   public Coder<IntervalWindow> windowCoder() {
>     return IntervalWindow.getCoder();
>   }
>   private static class MergeCandidate {
>     @Nullable
>     private IntervalWindow union;
>     private final List<IntervalWindow> parts;
>     public MergeCandidate() {
>       union = null;
>       parts = new ArrayList<>();
>     }
>     public MergeCandidate(IntervalWindow window) {
>       union = window;
>       parts = new ArrayList<>(Arrays.asList(window));
>     }
>     public boolean intersects(IntervalWindow window) {
>       return union == null || union.intersects(window);
>     }
>     public void add(IntervalWindow window) {
>       union = union == null ? window : union.span(window);
>       union.incrementWindowEventCountBy(window.getWindowEventCount() + 1);
>       parts.add(window);
>     }
>     public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws Exception {
>       if (this.parts.size() > 1) {
>         c.merge(parts, union);
>       }
>     }
>     public int size() {
>       return this.parts.size();
>     }
>     @Override
>     public String toString() {
>       return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
>     }
>   }
> }
>
> Thanks & Regards,
>
> *Jainik Vora*
>
>