You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "A. V." <aa...@live.nl> on 2019/10/23 07:32:36 UTC

Problem creating tumbling windows based on number of rows

 Hi,

I try to create a tumbling time window of 2 rows each in Flink Java. This must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT datatype) column. I've added below the code of two different code versions. The error messages I get I placed above the code.

When I print the datatypes of the Table object I see this:  |-- mID: INT |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime: BIGINT |-- mType: STRING

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);
        fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
        Types.INT(),
        Types.SQL_TIMESTAMP(),
        Types.DOUBLE(),
        Types.LONG(),
        Types.STRING());
        DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
                tableEnv.toAppendStream(HTable, tupleType);

//When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.

Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, unixDateTime, mType");
   DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
stream.print();

//When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported.

   Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, measurementValue, unixDateTime, measurementType")
.window(Tumble.over("2.rows")
.on("dateTime")
.as("a"))
.groupBy("a")
.select("AVG(mValue)");
    DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
    stream.print();


Re: Problem creating tumbling windows based on number of rows

Posted by Manoj Kumar <ma...@bigzetta.com>.
Hi A.V.,


*//When I run below code I get this error: Caused by:
java.lang.RuntimeException: Rowtime timestamp is null. //Please make sure
that a proper TimestampAssigner is defined and the stream environment uses
the EventTime time characteristic.*

You need to assign Timestamp and watermarks to datastream

sample data stream code

   DataStream<Tuple17<String, Timestamp>>
        lineitem = bsEnv.socketTextStream("localhost", 12347).flatMap(new
Splitter2()).assignTimestampsAndWatermarks(
        new AscendingTimestampExtractor<Tuple2<String, Timestamp>>() {
          @Override public long extractAscendingTimestamp(
              Tuple2<String,Timestamp> inputrowtuple) {
            return inputrowtuple.f2.getTime();
          }
        });


*//When I run below code I get this error: Exception in thread "main"
java.lang.UnsupportedOperationException: Event-time grouping windows on row
intervals are currently not supported.*

Currently this feature is only supported for Process time, change .rowtime
to .proctime in schema

On Wed, Oct 23, 2019 at 1:11 PM A. V. <aa...@live.nl> wrote:

>  Hi,
>
> I try to create a tumbling time window of 2 rows each in Flink Java. This
> must based on the dateTime (TimeStamp3 datatype) or unixDateTime(BIGINT
> datatype) column. I've added below the code of two different code versions.
> The error messages I get I placed above the code.
>
> When I print the datatypes of the Table object I see this:  |-- mID: INT
> |-- dateTime: TIMESTAMP(3) *ROWTIME* |-- mValue: DOUBLE |-- unixDateTime:
> BIGINT |-- mType: STRING
>
> StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(fsEnv);
>         fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>     TupleTypeInfo<Tuple5<Integer, Timestamp, Double, Long, String>> tupleType = new TupleTypeInfo<>(
>         Types.INT(),
>         Types.SQL_TIMESTAMP(),
>         Types.DOUBLE(),
>         Types.LONG(),
>         Types.STRING());
>         DataStream<Tuple5<Integer, Timestamp, Double, Long, String>> dsTuple =
>                 tableEnv.toAppendStream(HTable, tupleType);
> //When I run below code I get this error: Caused by: java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
> Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, mValue, unixDateTime, mType");
>    DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
> stream.print();
> //When I run below code I get this error: Exception in thread "main" java.lang.UnsupportedOperationException: Event-time grouping windows on row intervals are currently not supported.
>
>    Table table = tableEnv.fromDataStream(dsTuple, "mID, dateTime.rowtime, measurementValue, unixDateTime, measurementType").window(Tumble.over("2.rows").on("dateTime").as("a")).groupBy("a").select("AVG(mValue)");
>     DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
>     stream.print();
>
>
>

-- 
Regards,
Manoj  Kumar