You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2018/05/01 16:00:18 UTC

Windows aligned with EDT ( with day light saving )

How do I align a Window with EDT with day light saving correction ?  The
offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 ,
18 and so on but on EDT.

Re: Windows aligned with EDT ( with day light saving )

Posted by Vishal Santoshi <vi...@gmail.com>.
Hey Fabian, it seems as simple as this ?



import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;

/**
 * A {@link WindowAssigner} that windows elements into windows based
on the timestamp of the
 * elements and the date zone the offset it computed off. Windows
cannot overlap.
 *
 * <p>For example, in order to window into windows of 1 minute:
 * <pre> {@code
 * DataStream<Tuple2<String, Integer>> in = ...;
 * KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
 * WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
 *   keyed.window(TimeZoneAwareTumblingEventTimeWindows.of(Time.minutes(1),
 ZoneId.of(AMERICA_NEW_YORK_ZONE)));
 * } </pre>
 */
public class TimeZoneAwareTumblingEventTimeWindows extends
WindowAssigner<Object, TimeWindow> {


    private static final long serialVersionUID = 1L;


    private final long size;

    private final ZoneId zoneID;


    protected TimeZoneAwareTumblingEventTimeWindows(long size, ZoneId zoneID) {
        this.size = size;
        this.zoneID = zoneID;
    }

    /**
     * Creates a new {@code TumblingEventTimeWindows} {@link
WindowAssigner} that assigns
     * elements to time windows based on the element timestamp.
     *
     * @param size The size of the generated windows.
     * @return The time policy.
     */
    public static TimeZoneAwareTumblingEventTimeWindows of(Time size,
ZoneId zoneID) {
        return new
TimeZoneAwareTumblingEventTimeWindows(size.toMilliseconds(), zoneID);
    }

    @Override
    public Collection<TimeWindow> assignWindows(Object element, long
timestamp, WindowAssignerContext context) {
        if (timestamp > Long.MIN_VALUE) {
            // Long.MIN_VALUE is currently assigned when no timestamp is present
            ZonedDateTime dateTime =
Instant.ofEpochMilli(timestamp).atZone(zoneID);
            long start =
TimeWindow.getWindowStartWithOffset(timestamp,
dateTime.getOffset().getTotalSeconds()*1000l, size);
            return Collections.singletonList(new TimeWindow(start,
start + size));
        } else {
            throw new RuntimeException("Record has Long.MIN_VALUE
timestamp (= no timestamp marker). " +
                    "Is the time characteristic set to
'ProcessingTime', or did you forget to call " +
                    "'DataStream.assignTimestampsAndWatermarks(...)'?");
        }
    }

    @Override
    public Trigger<Object, TimeWindow>
getDefaultTrigger(StreamExecutionEnvironment env) {
        return EventTimeTrigger.create();
    }

    @Override
    public String toString() {
        return "TumblingEventTimeWindows(" + size + ")";
    }

    @Override
    public TypeSerializer<TimeWindow>
getWindowSerializer(ExecutionConfig executionConfig) {
        return new TimeWindow.Serializer();
    }

    @Override
    public boolean isEventTime() {
        return true;
    }

}



On Wed, May 2, 2018 at 9:31 AM, Vishal Santoshi <vi...@gmail.com>
wrote:

> True that. Thanks. Wanted to be sure before I go down that path.
>
>
> On Wed, May 2, 2018 at 9:19 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> AFAIK it is not possible with Flink's default time windows.
>> However, it should be possible to implement a custom WindowAssigner for
>> your use case.
>> I'd have a look at the TumblingEventTimeWindows class and copy/modify it
>> to your needs.
>>
>> Best, Fabian
>>
>> 2018-05-02 15:12 GMT+02:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> This does not seem possible but need some confirmation. Anyone ?
>>>
>>> On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> How do I align a Window with EDT with day light saving correction ?
>>>> The offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12
>>>> , 18 and so on but on EDT.
>>>>
>>>
>>>
>>
>

Re: Windows aligned with EDT ( with day light saving )

Posted by Vishal Santoshi <vi...@gmail.com>.
True that. Thanks. Wanted to be sure before I go down that path.


On Wed, May 2, 2018 at 9:19 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Vishal,
>
> AFAIK it is not possible with Flink's default time windows.
> However, it should be possible to implement a custom WindowAssigner for
> your use case.
> I'd have a look at the TumblingEventTimeWindows class and copy/modify it
> to your needs.
>
> Best, Fabian
>
> 2018-05-02 15:12 GMT+02:00 Vishal Santoshi <vi...@gmail.com>:
>
>> This does not seem possible but need some confirmation. Anyone ?
>>
>> On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> How do I align a Window with EDT with day light saving correction ?  The
>>> offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 ,
>>> 18 and so on but on EDT.
>>>
>>
>>
>

Re: Windows aligned with EDT ( with day light saving )

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vishal,

AFAIK it is not possible with Flink's default time windows.
However, it should be possible to implement a custom WindowAssigner for
your use case.
I'd have a look at the TumblingEventTimeWindows class and copy/modify it to
your needs.

Best, Fabian

2018-05-02 15:12 GMT+02:00 Vishal Santoshi <vi...@gmail.com>:

> This does not seem possible but need some confirmation. Anyone ?
>
> On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> How do I align a Window with EDT with day light saving correction ?  The
>> offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 ,
>> 18 and so on but on EDT.
>>
>
>

Re: Windows aligned with EDT ( with day light saving )

Posted by Vishal Santoshi <vi...@gmail.com>.
This does not seem possible but need some confirmation. Anyone ?

On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> How do I align a Window with EDT with day light saving correction ?  The
> offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 ,
> 18 and so on but on EDT.
>