You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "A. V." <aa...@live.nl> on 2019/10/23 07:32:36 UTC
Problem creating tumbling windows based on number of rows
Hi,
I try to create a tumbling time window of 2 rows each in Flink Java. This must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT datatype) column. I've added below the code of two different code versions. The error messages I get I placed above the code.
When I print the datatypes of the Table object I see this: |-- mID: INT |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime: BIGINT |-- mType: STRING
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
Types.INT(),
Types.SQL_TIMESTAMP(),
Types.DOUBLE(),
Types.LONG(),
Types.STRING());
DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
tableEnv.toAppendStream(HTable, tupleType);
//When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, unixDateTime, mType");
DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
stream.print();
//When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported.
Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, measurementValue, unixDateTime, measurementType")
.window(Tumble.over("2.rows")
.on("dateTime")
.as("a"))
.groupBy("a")
.select("AVG(mValue)");
DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
stream.print();
Re: Problem creating tumbling windows based on number of rows
Posted by Manoj Kumar <ma...@bigzetta.com>.
Hi A.V.,
*//When I run below code I get this error: Caused by:
java.lang.RuntimeException: Rowtime timestamp is null. //Please make sure
that a proper TimestampAssigner is defined and the stream environment uses
the EventTime time characteristic.*
You need to assign Timestamp and watermarks to datastream
sample data stream code
DataStream<Tuple17<String, Timestamp>>
lineitem = bsEnv.socketTextStream("localhost", 12347).flatMap(new
Splitter2()).assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<Tuple2<String, Timestamp>>() {
@Override public long extractAscendingTimestamp(
Tuple2<String,Timestamp> inputrowtuple) {
return inputrowtuple.f2.getTime();
}
});
*//When I run below code I get this error: Exception in thread "main"
java.lang.UnsupportedOperationException: Event-time grouping windows on row
intervals are currently not supported.*
Currently this feature is only supported for Process time, change .rowtime
to .proctime in schema
On Wed, Oct 23, 2019 at 1:11 PM A. V. <aa...@live.nl> wrote:
> Hi,
>
> I try to create a tumbling time window of 2 rows each in Flink Java. This
> must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT
> datatype) column. I've added below the code of two different code versions.
> The error messages I get I placed above the code.
>
> When I print the datatypes of the Table object I see this: |-- mID: INT
> |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime:
> BIGINT |-- mType: STRING
>
> StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);
> fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
> TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
> Types.INT(),
> Types.SQL_TIMESTAMP(),
> Types.DOUBLE(),
> Types.LONG(),
> Types.STRING());
> DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
> tableEnv.toAppendStream(HTable, tupleType);
> //When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
> Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, unixDateTime, mType");
> DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
> stream.print();
> //When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported.
>
> Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, measurementValue, unixDateTime, measurementType").window(Tumble.over("2.rows").on("dateTime").as("a")).groupBy("a").select("AVG(mValue)");
> DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
> stream.print();
>
>
>
--
Regards,
Manoj Kumar