You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by enrico canzonieri <ec...@gmail.com> on 2017/05/24 02:15:22 UTC

Tumbling window expects a time attribute for grouping in a stream environment

Hi,
I'm trying to window and groupBy a stream using the table api, but I get
ValidationException in the windowing function.
Here is the relevant code:

tableEnv.registerTableSource(schema.getName, src)
val table = tableEnv.scan(schema.getName)
val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host,
'w).select('host)

"time" is defined as Long in my schema. The error I get is:
Exception in thread "main" org.apache.flink.table.api.ValidationException:
TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window
expects a time attribute for grouping in a stream environment.

I also tried to define a window that was using processing time, but what
described in the documentation "Tumble over 1.minutes as 'w"  doesn't seem
to work anymore. Specifically it seems that a window now always expects the
"on" call.

Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.

thanks

Re: Tumbling window expects a time attribute for grouping in a stream environment

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Enrico,

that's (for now) the right approach. I agree, that the KafkaTableSource
should implement both DefinedXTimeAttribute interfaces.

Best, Fabian

2017-05-25 3:20 GMT+02:00 enrico canzonieri <ec...@gmail.com>:

> I solved this implementing a new Kafka09TableSource in my application. The
> class I implemented extends both DefinedRowTimeAttribute and
> DefinedProcTimeAttribute and it exposes the consumer so that I can assign
> the timestamp extractor.
>
> I'm not sure if this is the right approach, but if that's the case I
> wonder if we could make those changes into KafkaTableSource to make it more
> generic.
>
> On Wed, May 24, 2017 at 12:23 PM, enrico canzonieri <ecanzonieri@gmail.com
> > wrote:
>
>> Hi Timo, thanks for your help!
>>
>> I tried to follow the examples in the tests but I still have the same
>> issue.
>> I changed my schema and added an additional field "rowtime". My schema
>> now is:
>> root
>>  |-- rowtime: org.apache.flink.table.expressions.RowtimeAttribute(expr:
>> GenericType<org.apache.flink.table.expressions.Expression>)
>>  |-- time: Long
>>  |-- host: String
>>
>> If I run the code:
>> table.select('rowtime).toDataStream[Row].print()
>> I get:
>> RowtimeAttribute(1495580133000)
>> RowtimeAttribute(1495580143000)
>> RowtimeAttribute(1495580153000)
>>
>> But If I run:
>> table.window(Tumble over 1.minutes on 'rowtime as 'w).groupBy('host,
>> 'w).select('host)
>> I still get the previous error:
>> TumblingGroupWindow('w, 'rowtime, 60000.millis) is invalid: Tumbling
>> window expects a time attribute for grouping in a stream environment.
>>
>> I'm using a Kafka09TableSource as data source, but it doesn't allow me to
>> specify the timestamp assigner. I think the actual consumer is not exposed
>> to the user so I cannot really call assignTimestampsAndWatermarks. May
>> that be the problem? Should we expose that function so that we can assign
>> timestamp and watermark to a TableSource?
>>
>> The time characteristic in the execution environment is set to EventTime
>> in my code.
>>
>> Cheers,
>> Enrico
>>
>> On Wed, May 24, 2017 at 2:08 AM, Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi Enrico,
>>>
>>> the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they
>>> will be updated in the next days/1-2 weeks.
>>>
>>> We recently introduced so-called "time indicators". These are attributes
>>> that correspond to Flink's time and watermarks. You declare a logical field
>>> that represents Flink's internal time in a table program.
>>>
>>> In your example you need to append a "time.rowtime" or "time.proctime"
>>> to your table schema definition.
>>>
>>> You can find some examples here:
>>> https://github.com/apache/flink/blob/master/flink-libraries/
>>> flink-table/src/test/scala/org/apache/flink/table/runtime/
>>> datastream/TimeAttributesITCase.scala
>>>
>>> If you have further question, feel free to ask them. It helps us to
>>> improve the documenation.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>>
>>> Am 24.05.17 um 04:15 schrieb enrico canzonieri:
>>>
>>> Hi,
>>> I'm trying to window and groupBy a stream using the table api, but I get
>>> ValidationException in the windowing function.
>>> Here is the relevant code:
>>>
>>> tableEnv.registerTableSource(schema.getName, src)
>>> val table = tableEnv.scan(schema.getName)
>>> val t = table.window(Tumble over 1.minutes on 'time as
>>> 'w).groupBy('host, 'w).select('host)
>>>
>>> "time" is defined as Long in my schema. The error I get is:
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
>>> TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window
>>> expects a time attribute for grouping in a stream environment.
>>>
>>> I also tried to define a window that was using processing time, but what
>>> described in the documentation "Tumble over 1.minutes as 'w"  doesn't
>>> seem to work anymore. Specifically it seems that a window now always
>>> expects the "on" call.
>>>
>>> Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.
>>>
>>> thanks
>>>
>>>
>>>
>>
>

Re: Tumbling window expects a time attribute for grouping in a stream environment

Posted by enrico canzonieri <ec...@gmail.com>.
I solved this implementing a new Kafka09TableSource in my application. The
class I implemented extends both DefinedRowTimeAttribute and
DefinedProcTimeAttribute and it exposes the consumer so that I can assign
the timestamp extractor.

I'm not sure if this is the right approach, but if that's the case I wonder
if we could make those changes into KafkaTableSource to make it more
generic.

On Wed, May 24, 2017 at 12:23 PM, enrico canzonieri <ec...@gmail.com>
wrote:

> Hi Timo, thanks for your help!
>
> I tried to follow the examples in the tests but I still have the same
> issue.
> I changed my schema and added an additional field "rowtime". My schema now
> is:
> root
>  |-- rowtime: org.apache.flink.table.expressions.RowtimeAttribute(expr:
> GenericType<org.apache.flink.table.expressions.Expression>)
>  |-- time: Long
>  |-- host: String
>
> If I run the code:
> table.select('rowtime).toDataStream[Row].print()
> I get:
> RowtimeAttribute(1495580133000)
> RowtimeAttribute(1495580143000)
> RowtimeAttribute(1495580153000)
>
> But If I run:
> table.window(Tumble over 1.minutes on 'rowtime as 'w).groupBy('host,
> 'w).select('host)
> I still get the previous error:
> TumblingGroupWindow('w, 'rowtime, 60000.millis) is invalid: Tumbling
> window expects a time attribute for grouping in a stream environment.
>
> I'm using a Kafka09TableSource as data source, but it doesn't allow me to
> specify the timestamp assigner. I think the actual consumer is not exposed
> to the user so I cannot really call assignTimestampsAndWatermarks. May
> that be the problem? Should we expose that function so that we can assign
> timestamp and watermark to a TableSource?
>
> The time characteristic in the execution environment is set to EventTime
> in my code.
>
> Cheers,
> Enrico
>
> On Wed, May 24, 2017 at 2:08 AM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Enrico,
>>
>> the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they
>> will be updated in the next days/1-2 weeks.
>>
>> We recently introduced so-called "time indicators". These are attributes
>> that correspond to Flink's time and watermarks. You declare a logical field
>> that represents Flink's internal time in a table program.
>>
>> In your example you need to append a "time.rowtime" or "time.proctime" to
>> your table schema definition.
>>
>> You can find some examples here:
>> https://github.com/apache/flink/blob/master/flink-libraries/
>> flink-table/src/test/scala/org/apache/flink/table/
>> runtime/datastream/TimeAttributesITCase.scala
>>
>> If you have further question, feel free to ask them. It helps us to
>> improve the documenation.
>>
>> Regards,
>> Timo
>>
>>
>>
>> Am 24.05.17 um 04:15 schrieb enrico canzonieri:
>>
>> Hi,
>> I'm trying to window and groupBy a stream using the table api, but I get
>> ValidationException in the windowing function.
>> Here is the relevant code:
>>
>> tableEnv.registerTableSource(schema.getName, src)
>> val table = tableEnv.scan(schema.getName)
>> val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host,
>> 'w).select('host)
>>
>> "time" is defined as Long in my schema. The error I get is:
>> Exception in thread "main" org.apache.flink.table.api.ValidationException:
>> TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window
>> expects a time attribute for grouping in a stream environment.
>>
>> I also tried to define a window that was using processing time, but what
>> described in the documentation "Tumble over 1.minutes as 'w"  doesn't
>> seem to work anymore. Specifically it seems that a window now always
>> expects the "on" call.
>>
>> Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.
>>
>> thanks
>>
>>
>>
>

Re: Tumbling window expects a time attribute for grouping in a stream environment

Posted by enrico canzonieri <ec...@gmail.com>.
Hi Timo, thanks for your help!

I tried to follow the examples in the tests but I still have the same
issue.
I changed my schema and added an additional field "rowtime". My schema now
is:
root
 |-- rowtime: org.apache.flink.table.expressions.RowtimeAttribute(expr:
GenericType<org.apache.flink.table.expressions.Expression>)
 |-- time: Long
 |-- host: String

If I run the code:
table.select('rowtime).toDataStream[Row].print()
I get:
RowtimeAttribute(1495580133000)
RowtimeAttribute(1495580143000)
RowtimeAttribute(1495580153000)

But If I run:
table.window(Tumble over 1.minutes on 'rowtime as 'w).groupBy('host,
'w).select('host)
I still get the previous error:
TumblingGroupWindow('w, 'rowtime, 60000.millis) is invalid: Tumbling window
expects a time attribute for grouping in a stream environment.

I'm using a Kafka09TableSource as data source, but it doesn't allow me to
specify the timestamp assigner. I think the actual consumer is not exposed
to the user so I cannot really call assignTimestampsAndWatermarks. May that
be the problem? Should we expose that function so that we can assign
timestamp and watermark to a TableSource?

The time characteristic in the execution environment is set to EventTime in
my code.

Cheers,
Enrico

On Wed, May 24, 2017 at 2:08 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Enrico,
>
> the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they
> will be updated in the next days/1-2 weeks.
>
> We recently introduced so-called "time indicators". These are attributes
> that correspond to Flink's time and watermarks. You declare a logical field
> that represents Flink's internal time in a table program.
>
> In your example you need to append a "time.rowtime" or "time.proctime" to
> your table schema definition.
>
> You can find some examples here:
> https://github.com/apache/flink/blob/master/flink-
> libraries/flink-table/src/test/scala/org/apache/flink/
> table/runtime/datastream/TimeAttributesITCase.scala
>
> If you have further question, feel free to ask them. It helps us to
> improve the documenation.
>
> Regards,
> Timo
>
>
>
> Am 24.05.17 um 04:15 schrieb enrico canzonieri:
>
> Hi,
> I'm trying to window and groupBy a stream using the table api, but I get
> ValidationException in the windowing function.
> Here is the relevant code:
>
> tableEnv.registerTableSource(schema.getName, src)
> val table = tableEnv.scan(schema.getName)
> val t = table.window(Tumble over 1.minutes on 'time as 'w).groupBy('host,
> 'w).select('host)
>
> "time" is defined as Long in my schema. The error I get is:
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling window
> expects a time attribute for grouping in a stream environment.
>
> I also tried to define a window that was using processing time, but what
> described in the documentation "Tumble over 1.minutes as 'w"  doesn't
> seem to work anymore. Specifically it seems that a window now always
> expects the "on" call.
>
> Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.
>
> thanks
>
>
>

Re: Tumbling window expects a time attribute for grouping in a stream environment

Posted by Timo Walther <tw...@apache.org>.
Hi Enrico,

the docs of the 1.3-SNAPSHOT are a bit out of sync right now, but they 
will be updated in the next days/1-2 weeks.

We recently introduced so-called "time indicators". These are attributes 
that correspond to Flink's time and watermarks. You declare a logical 
field that represents Flink's internal time in a table program.

In your example you need to append a "time.rowtime" or "time.proctime" 
to your table schema definition.

You can find some examples here:
https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala

If you have further question, feel free to ask them. It helps us to 
improve the documenation.

Regards,
Timo



Am 24.05.17 um 04:15 schrieb enrico canzonieri:
> Hi,
> I'm trying to window and groupBy a stream using the table api, but I 
> get ValidationException in the windowing function.
> Here is the relevant code:
>
> tableEnv.registerTableSource(schema.getName, src)
> val table = tableEnv.scan(schema.getName)
> val t = table.window(Tumble over 1.minutes on 'time as 
> 'w).groupBy('host, 'w).select('host)
>
> "time" is defined as Long in my schema. The error I get is:
> Exception in thread "main" 
> org.apache.flink.table.api.ValidationException: 
> TumblingGroupWindow('w, 'time, 60000.millis) is invalid: Tumbling 
> window expects a time attribute for grouping in a stream environment.
>
> I also tried to define a window that was using processing time, but 
> what described in the documentation "Tumble over 1.minutes as 'w"  
> doesn't seem to work anymore. Specifically it seems that a window now 
> always expects the "on" call.
>
> Has anybody encountered this issue? I'm using Flink 1.3-SNAPSHOT.
>
> thanks