You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2018/05/01 16:00:18 UTC
Windows aligned with EDT ( with day light saving )
How do I align a Window with EDT with day light saving correction ? The
offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 ,
18 and so on but on EDT.
Re: Windows aligned with EDT ( with day light saving )
Posted by Vishal Santoshi <vi...@gmail.com>.
Hey Fabian, it seems as simple as this ?
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.Collections;
/**
* A {@link WindowAssigner} that windows elements into windows based
on the timestamp of the
* elements and the date zone the offset it computed off. Windows
cannot overlap.
*
* <p>For example, in order to window into windows of 1 minute:
* <pre> {@code
* DataStream<Tuple2<String, Integer>> in = ...;
* KeyedStream<Tuple2<String, Integer>, String> keyed = in.keyBy(...);
* WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowed =
* keyed.window(TimeZoneAwareTumblingEventTimeWindows.of(Time.minutes(1),
ZoneId.of(AMERICA_NEW_YORK_ZONE)));
* } </pre>
*/
public class TimeZoneAwareTumblingEventTimeWindows extends
WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final ZoneId zoneID;
protected TimeZoneAwareTumblingEventTimeWindows(long size, ZoneId zoneID) {
this.size = size;
this.zoneID = zoneID;
}
/**
* Creates a new {@code TumblingEventTimeWindows} {@link
WindowAssigner} that assigns
* elements to time windows based on the element timestamp.
*
* @param size The size of the generated windows.
* @return The time policy.
*/
public static TimeZoneAwareTumblingEventTimeWindows of(Time size,
ZoneId zoneID) {
return new
TimeZoneAwareTumblingEventTimeWindows(size.toMilliseconds(), zoneID);
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long
timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
ZonedDateTime dateTime =
Instant.ofEpochMilli(timestamp).atZone(zoneID);
long start =
TimeWindow.getWindowStartWithOffset(timestamp,
dateTime.getOffset().getTotalSeconds()*1000l, size);
return Collections.singletonList(new TimeWindow(start,
start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE
timestamp (= no timestamp marker). " +
"Is the time characteristic set to
'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@Override
public Trigger<Object, TimeWindow>
getDefaultTrigger(StreamExecutionEnvironment env) {
return EventTimeTrigger.create();
}
@Override
public String toString() {
return "TumblingEventTimeWindows(" + size + ")";
}
@Override
public TypeSerializer<TimeWindow>
getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
}
@Override
public boolean isEventTime() {
return true;
}
}
On Wed, May 2, 2018 at 9:31 AM, Vishal Santoshi <vi...@gmail.com>
wrote:
> True that. Thanks. Wanted to be sure before I go down that path.
>
>
> On Wed, May 2, 2018 at 9:19 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> AFAIK it is not possible with Flink's default time windows.
>> However, it should be possible to implement a custom WindowAssigner for
>> your use case.
>> I'd have a look at the TumblingEventTimeWindows class and copy/modify it
>> to your needs.
>>
>> Best, Fabian
>>
>> 2018-05-02 15:12 GMT+02:00 Vishal Santoshi <vi...@gmail.com>:
>>
>>> This does not seem possible but need some confirmation. Anyone ?
>>>
>>> On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> How do I align a Window with EDT with day light saving correction ?
>>>> The offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12
>>>> , 18 and so on but on EDT.
>>>>
>>>
>>>
>>
>
Re: Windows aligned with EDT ( with day light saving )
Posted by Vishal Santoshi <vi...@gmail.com>.
True that. Thanks. Wanted to be sure before I go down that path.
On Wed, May 2, 2018 at 9:19 AM, Fabian Hueske <fh...@gmail.com> wrote:
> Hi Vishal,
>
> AFAIK it is not possible with Flink's default time windows.
> However, it should be possible to implement a custom WindowAssigner for
> your use case.
> I'd have a look at the TumblingEventTimeWindows class and copy/modify it
> to your needs.
>
> Best, Fabian
>
> 2018-05-02 15:12 GMT+02:00 Vishal Santoshi <vi...@gmail.com>:
>
>> This does not seem possible but need some confirmation. Anyone ?
>>
>> On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> How do I align a Window with EDT with day light saving correction ? The
>>> offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 ,
>>> 18 and so on but on EDT.
>>>
>>
>>
>
Re: Windows aligned with EDT ( with day light saving )
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Vishal,
AFAIK it is not possible with Flink's default time windows.
However, it should be possible to implement a custom WindowAssigner for
your use case.
I'd have a look at the TumblingEventTimeWindows class and copy/modify it to
your needs.
Best, Fabian
2018-05-02 15:12 GMT+02:00 Vishal Santoshi <vi...@gmail.com>:
> This does not seem possible but need some confirmation. Anyone ?
>
> On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> How do I align a Window with EDT with day light saving correction ? The
>> offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 ,
>> 18 and so on but on EDT.
>>
>
>
Re: Windows aligned with EDT ( with day light saving )
Posted by Vishal Santoshi <vi...@gmail.com>.
This does not seem possible but need some confirmation. Anyone ?
On Tue, May 1, 2018 at 12:00 PM, Vishal Santoshi <vi...@gmail.com>
wrote:
> How do I align a Window with EDT with day light saving correction ? The
> offset takes a hardcoded value. I need 6 hour windows aligned to 00, 12 ,
> 18 and so on but on EDT.
>