You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Marco Villalobos <mv...@kineteque.com> on 2020/06/17 02:09:51 UTC

what is the "Flink" recommended way of assigning a backfill to an average on an event time keyed windowed stream?

I need to compute averages on time series data upon a 15 minute tumbling event time window that is backfilled.

The time series data is a Tuple3 of name: String, value: double, event_time: Timestamp (Instant).

I need to compute the average value of the name time series on a tumbling window of 15 minutes  with backfills such that

given the input:

a,10,2020-06-23T00:01:30.0000000Z
a,15,2020-06-23T00:02:30.0000000Z
a,20,2020-06-23T00:03:30.0000000Z
b,25,2020-06-23T00:03:30.0000000Z
b,30,2020-06-23T00:02:30.0000000Z
b,35,2020-06-23T00:01:30.0000000Z
b,35,2020-06-23T00:16:30.0000000Z

it yields the following averages with backfill:

a,15,2020-06-23 00:00:00.0
b,30,2020-06-23 00:00:00.0
a,15,2020-06-23 00:15:00.0
b,35,2020-06-23 00:15:00.0

Notice that although no value arrived "a" in the second quarter, the previous average was upserted.

I only got as far as computing the average, but I have not figured a recommended strategy for upserting the backfill.

I made a GitHub project to share my approach:

https://github.com/minmay/flink-patterns

And the following code demonstrates my approach thus far.

Can somebody please provide me guidance on what is the "Flink" recommended way of assigning a backfill to an average on keyed windowed stream?

package mvillalobos.flink.patterns.timeseries.average;

import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.Arrays;

public class TimeSeriesAverageApp {

    private final static Logger logger = LoggerFactory.getLogger(TimeSeriesAverageApp.class);

    public void stream(String inputFilePath) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        // GIVEN a SOURCE with a CSV input file
        // in which each line has a: String, double, Instant
        // THEN the MAP operator
        // transforms the line into a Tuple7
        // f0: name: String
        // f1: window_size: int
        // f2: value: double
        // f3: event_timestamp: Instant
        // f4: aggregate_sum: double
        // f5: aggregate_count double
        // f6: is_backfile: boolean
        // WHEN the map operation finishes
        // THEN the event time assigned using field f3
        final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath)
                .map(line -> {
                    final String[] split = line.split(",");
                    final String name = split[0];
                    final double value = Double.parseDouble(split[1]);
                    final Instant timestamp = Instant.parse(split[2]);
                    return Tuple7.of(name, 1, value, timestamp, value, 1, false);
                }).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, Types.BOOLEAN))
                .assignTimestampsAndWatermarks(
                        new AscendingTimestampExtractor<>() {
                            @Override
                            public long extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element) {
                                return element.f3.toEpochMilli();
                            }
                        }
                );

        final JDBCUpsertTableSink jdbcUpsertTableSink = buildJdbcUpsertTableSink();

        upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream);

        // GIVEN a data stream with Tuple7
        // f0: name: String
        // f1: window_size: int
        // f2: value: double
        // f3: event_timestamp: Instant
        // f4: aggregate_sum: double
        // f5: aggregate_count double
        // f6: is_backfile: boolean
        // THEN the stream is KEYED BY: f0: name:String, f1: window_size: int
        // THEN the stream is WINDOWED into a tumbling event time window of 15 minutes
        // THEN the window is configured to allow elements late by 1 hour
        // THEN a low-level process function is applied to the window that
        //      aggregates the time series by assigning the following tuple fields:
        //      f1: window_size = 15 minutes in miliseconds
        //      f2: value = average value in this 15 minute window
        //      f3: event_timestamp = the first epoch millisecond in this 15 minute window
        //      f4: aggregate_sum = sum of f2 values in this 15 minute window
        //      f5: aggregate_count = number of values in this 15 minute window
        final SingleOutputStreamOperator<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>>
                aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1)
                .window(TumblingEventTimeWindows.of(Time.minutes(15)))
                .allowedLateness(Time.hours(1))
                .process(new ProcessWindowFunction<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() {
                    @Override
                    public void process(
                            Tuple tuple,
                            Context context,
                            Iterable<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> elements,
                            Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out
                    ) throws Exception {

                        final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> aggregation = new Tuple7<>();

                        boolean is_window_initialized = false;
                        for (Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element : ImmutableList.copyOf(elements).reverse()) {

                            if (!is_window_initialized) {

                                final Instant timestamp = Instant.ofEpochMilli(context.window().getStart());

                                aggregation.f0 = element.f0;
                                aggregation.f1 = (int) Time.minutes(15).toMilliseconds();
                                aggregation.f2 = element.f2;
                                aggregation.f3 = timestamp;
                                aggregation.f4 = 0D;
                                aggregation.f5 = 0;
                                aggregation.f6 = false;
                                is_window_initialized = true;
                            }

                            aggregation.f4 += element.f2;
                            aggregation.f5++;
                            aggregation.f2 = aggregation.f4 / aggregation.f5;
                        }

                        out.collect(aggregation);
                    }
                });

        upsertToJDBC(jdbcUpsertTableSink, aggregateTimeSeriesStream);

        env.execute("time series");
    }

    private JDBCUpsertTableSink buildJdbcUpsertTableSink() {
        final JDBCUpsertTableSink jdbcUpsertTableSink = JDBCUpsertTableSink.builder()
                .setOptions(JDBCOptions.builder()
                        .setDBUrl("jdbc:derby:memory:flink")
                        .setTableName("time_series")
                        .build())
                .setTableSchema(TableSchema.builder()
                        .field("name", DataTypes.VARCHAR(50).notNull())
                        .field("window_size", DataTypes.INT().notNull())
                        .field("value", DataTypes.DOUBLE().notNull())
                        .field("event_timestamp", DataTypes.TIMESTAMP().notNull())
                        .field("aggregate_sum", DataTypes.DOUBLE().notNull())
                        .field("aggregate_count", DataTypes.INT().notNull())
                        .field("is_backfill", DataTypes.BOOLEAN().notNull())
                        .primaryKey("name", "window_size", "event_timestamp")
                        .build())
                .build();
        jdbcUpsertTableSink.setKeyFields(new String[]{"name", "window_size", "event_timestamp"});
        return jdbcUpsertTableSink;
    }

    private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink, DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream) {
        jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> {
            final Row row = new Row(7);
            row.setField(0, t.f0);
            row.setField(1, t.f1);
            row.setField(2, t.f2);
            row.setField(3, Timestamp.from(t.f3));
            row.setField(4, t.f4);
            row.setField(5, t.f5);
            row.setField(6, t.f6);
            return new Tuple2<>(true, row);
        }).returns(new TypeHint<Tuple2<Boolean, Row>>() {
        }));
    }

    public static void main(String[] args) throws Exception {
        logger.info("Command line arguments: {}", Arrays.toString(args));
        final String inputFilePath = args[0];
        logger.info("Reading input file: {}", inputFilePath);

        final String databaseURL = "jdbc:derby:memory:flink;create=true";
        try (final Connection con = DriverManager.getConnection(databaseURL)) {
            try (final Statement stmt = con.createStatement();) {
                stmt.execute("CREATE TABLE time_series (\n" +
                        "    id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" +
                        "    name VARCHAR(50) NOT NULL,\n" +
                        "    window_size INTEGER NOT NULL DEFAULT 1,\n" +
                        "    event_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
                        "    value DOUBLE PRECISION NOT NULL DEFAULT 0,\n" +
                        "    aggregate_sum DOUBLE PRECISION NOT NULL DEFAULT 0,\n" +
                        "    aggregate_count INTEGER NOT NULL DEFAULT 1,\n" +
                        "    is_backfill BOOLEAN NOT NULL DEFAULT false,\n" +
                        "    version INTEGER NOT NULL DEFAULT 1,\n" +
                        "    create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
                        "    modify_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
                        "    UNIQUE (name, window_size, event_timestamp)\n" +
                        ")");
            }

            TimeSeriesAverageApp app = new TimeSeriesAverageApp();
            app.stream(inputFilePath);

            try (final Statement stmt = con.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time FROM time_series");
                while (rs.next()) {
                    final long id = rs.getLong(1);
                    final String name = rs.getString(2);
                    final int window_size = rs.getInt(3);
                    final Timestamp event_timestamp = rs.getTimestamp(4);
                    final double value = rs.getDouble(5);
                    final double aggregate_sum = rs.getDouble(6);
                    final int aggregate_count = rs.getInt(7);
                    final boolean is_backfill = rs.getBoolean(8);
                    final int version = rs.getInt(9);
                    final Timestamp create_time = rs.getTimestamp(10);
                    final Timestamp modify_time = rs.getTimestamp(11);
                    logger.info(
                            "id: {}, name: \"{}\", window_size: {}, event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {}, is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"",
                            id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time
                    );
                }
            }
        }
    }
}



Re: what is the "Flink" recommended way of assigning a backfill to an average on an event time keyed windowed stream?

Posted by Arvid Heise <ar...@ververica.com>.
Hi Marco,

That's a lot of code to digest. So I'm sorry if I did get something wrong.

From your example, it looks like you want to use the average within a
tumble window. If no record for a particular key has been emitted in that
time, you want to repeat the last value.

I'd use a dummy record to force the windows to be triggered and then ignore
it on aggregation. Here is a sketch

aggregateTimeSeriesStream.keyBy(1).
  .process(<store all keys in a state, on 15 min timer, emit dummy record
for all keys>)
  .window(<tumble 15 min>)
  .reduce(<average, ignore dummy record, create dummy record if empty>)
  .process(<store last record per key, replace dummy record with last
value>)

This approach needs more functions, but they can be chained mostly.
However, it should be smaller pieces of work that should be easier to
maintain and test. You especially save the trouble of writing the window
logic yourself.

On Thu, Jun 18, 2020 at 7:11 PM Marco Villalobos <mv...@kineteque.com>
wrote:

> I came up with a solution for backfills. However, at this moment, I am not
> happy with my solution.
> I think there might be other facilities within Flink which allow me to
> implement a better more efficient or more scalable solution.
>
> In another post, rmetzger@apache.org suggested that I use a process
> function and a timer. He was right in that I should use that approach. I
> want to thank him.
>
> The averages are computed by a ProcessWindowFunction that keys by the name
> and window size and uses a tumbling event time window.
>
> However, after that average is complete, I then use a KeyedProcessFunction
> that is keyed by window size. I then use a somewhat brute force approach
> with ValueState<Set<String>> to track names that need a value and a MAP
> STATE to determine which values exist and which ones are backfilled.
> It also cleans up stale values.
>
> I committed my code to a branch
> https://github.com/minmay/flink-patterns/tree/feature/backfill , and I
> also created a pull request
> https://github.com/minmay/flink-patterns/pull/1/files to share my
> experience.
>
> I am open critical comments on my approach, lack of understanding of
> Flink, algorithms and data-structures used. Please refrain from comments on
> my code style though.
>
> I'll also copy and paste my solution below.
>
> package mvillalobos.flink.patterns.timeseries.average;
>
> import com.google.common.collect.ImmutableList;
> import org.apache.flink.api.common.state.MapState;
> import org.apache.flink.api.common.state.MapStateDescriptor;
> import org.apache.flink.api.common.state.ValueState;
> import org.apache.flink.api.common.state.ValueStateDescriptor;
> import org.apache.flink.api.common.typeinfo.TypeHint;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeinfo.Types;
> import org.apache.flink.api.java.io.jdbc.JDBCOptions;
> import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink;
> import org.apache.flink.api.java.tuple.Tuple;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple7;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import
> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
> import org.apache.flink.table.api.DataTypes;
> import org.apache.flink.table.api.TableSchema;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import picocli.CommandLine;
>
> import java.io.File;
> import java.sql.Connection;
> import java.sql.DriverManager;
> import java.sql.ResultSet;
> import java.sql.Statement;
> import java.sql.Timestamp;
> import java.time.Instant;
> import java.time.temporal.ChronoUnit;
> import java.util.Comparator;
> import java.util.List;
> import java.util.Map;
> import java.util.Set;
> import java.util.TreeSet;
> import java.util.concurrent.Callable;
> import java.util.stream.Collectors;
> import java.util.stream.StreamSupport;
>
> @CommandLine.Command(name = "Time Series Average",
> mixinStandardHelpOptions = true,
>         description = "Compute the average of the time series with a 15
> minute tumbling event time window and upsert the results into an Apache
> Derby database.")
> public class TimeSeriesAverageApp implements Callable<Integer> {
>
>     private final static Logger logger =
> LoggerFactory.getLogger(TimeSeriesAverageApp.class);
>
>     @CommandLine.Option(names = {"-f", "--input-file"}, description = "The
> CSV input file of time series data. Each line must be in the format:
> String, double, Instant.")
>     private File inputFile;
>
>     @Override
>     public Integer call() throws Exception {
>         stream(inputFile.toString());
>         return 0;
>     }
>
>     public void stream(String inputFilePath) throws Exception {
>
>         final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>         // GIVEN a SOURCE with a CSV input file
>         // in which each line has a: String, double, Instant
>         // THEN the MAP operator
>         // transforms the line into a Tuple7
>         // f0: name: String
>         // f1: window_size: int
>         // f2: value: double
>         // f3: event_timestamp: Instant
>         // f4: aggregate_sum: double
>         // f5: aggregate_count double
>         // f6: is_backfile: boolean
>         // WHEN the map operation finishes
>         // THEN the event time assigned using field f3
>         final DataStream<Tuple7<String, Integer, Double, Instant, Double,
> Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath)
>                 .map(line -> {
>                     final String[] split = line.split(",");
>                     final String name = split[0];
>                     final double value = Double.parseDouble(split[1]);
>                     final Instant timestamp = Instant.parse(split[2]);
>                     return Tuple7.of(name, 1, value, timestamp, value, 1,
> false);
>                 }).returns(Types.TUPLE(Types.STRING, Types.INT,
> Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT,
> Types.BOOLEAN))
>                 .name("time series stream")
>                 .assignTimestampsAndWatermarks(
>                         new AscendingTimestampExtractor<>() {
>                             @Override
>                             public long
> extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double,
> Integer, Boolean> element) {
>                                 return element.f3.toEpochMilli();
>                             }
>                         }
>                 );
>
>         final JDBCUpsertTableSink jdbcUpsertTableSink =
> buildJdbcUpsertTableSink();
>
>         upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream);
>
>         // GIVEN a data stream with Tuple7
>         // f0: name: String
>         // f1: window_size: int
>         // f2: value: double
>         // f3: event_timestamp: Instant
>         // f4: aggregate_sum: double
>         // f5: aggregate_count double
>         // f6: is_backfill: boolean
>         // THEN the stream is KEYED BY: f0: name:String, f1: window_size:
> int
>         // THEN the stream is WINDOWED into a tumbling event time window
> of 15 minutes
>         // THEN the window is configured to allow elements late by 1 hour
>         // THEN a low-level process window function is applied to the
> window that
>         //      aggregates the time series by assigning the following
> tuple fields:
>         //      f1: window_size = 15 minutes in miliseconds
>         //      f2: value = average value in this 15 minute window
>         //      f3: event_timestamp = the first epoch millisecond in this
> 15 minute window
>         //      f4: aggregate_sum = sum of f2 values in this 15 minute
> window
>         //      f5: aggregate_count = number of values in this 15 minute
> window
>         final DataStream<Tuple7<String, Integer, Double, Instant, Double,
> Integer, Boolean>>
>                 aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1)
>                 .window(TumblingEventTimeWindows.of(Time.minutes(15)))
>                 .allowedLateness(Time.hours(1))
>                 .process(new ProcessWindowFunction<Tuple7<String, Integer,
> Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double,
> Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() {
>                     @Override
>                     public void process(
>                             Tuple tuple,
>                             Context context,
>                             Iterable<Tuple7<String, Integer, Double,
> Instant, Double, Integer, Boolean>> elements,
>                             Collector<Tuple7<String, Integer, Double,
> Instant, Double, Integer, Boolean>> out
>                     ) throws Exception {
>
>                         final Tuple7<String, Integer, Double, Instant,
> Double, Integer, Boolean> aggregation = new Tuple7<>();
>
>                         boolean is_window_initialized = false;
>                         for (Tuple7<String, Integer, Double, Instant,
> Double, Integer, Boolean> element :
> ImmutableList.copyOf(elements).reverse()) {
>
>                             if (!is_window_initialized) {
>
>                                 final Instant timestamp =
> Instant.ofEpochMilli(context.window().getStart());
>
>                                 aggregation.f0 = element.f0;
>                                 aggregation.f1 = (int)
> Time.minutes(15).toMilliseconds();
>                                 aggregation.f2 = element.f2;
>                                 aggregation.f3 = timestamp;
>                                 aggregation.f4 = 0D;
>                                 aggregation.f5 = 0;
>                                 aggregation.f6 = false;
>                                 is_window_initialized = true;
>                             }
>
>                             aggregation.f4 += element.f2;
>                             aggregation.f5++;
>                             aggregation.f2 = aggregation.f4 /
> aggregation.f5;
>                         }
>
>                         logger.info("Added aggregation: {}", aggregation);
>                         out.collect(aggregation);
>                     }
>                 }).name("averaged keyed tumbling window event time
> stream");
>
>
>         // GIVEN a data-stream of tuple7
>         // f0: name: String
>         // f1: window_size: int
>         // f2: value: double
>         // f3: event_timestamp: Instant
>         // f4: aggregate_sum: double
>         // f5: aggregate_count double
>         // f6: is_backfill: boolean
>         // THAT was aggregated to compute the average on f2: value: double
>         // WITH a grouping of: f0: name:String, f1: window_size: int
>         // WITH a tumbling event time window of 15 minutes
>         // THEN the stream is KEYED BY: f1: window_size: int
>         // THEN a low-level keyed process function is applied to the
> window that
>         //      WHEN the keyed process function opens it
>         //          initializes a VALUE STATE of TreeSet<String> called
> "nameSet"
>         //          initializes a MAP STATE of
>         //            KEY of Tuple2: f0: String, f1: Instant
>         //            VALUE of Tuple7:
>         //              f0: String
>         //              f1: int
>         //              f2: double
>         //              f3: Instant
>         //              f4: double
>         //              f5: double
>         //              f6: boolean
>         //            called "backfillState"
>         //      WHEN the keyed process function processes an element it
>         //          adds each f0: name: String into the VALUE STATE
> "nameSet"
>         //          adds each
>         //              KEY of Tuple2: f0: name: String, f3:
> event_timestamp: Instant
>         //              VALUE of Tuple7:
>         //                  f0: name: String
>         //                  f1: window_size: int
>         //                  f2: value: double
>         //                  f3: event_timestamp: Instant
>         //                  f4: aggregate_sum: double
>         //                  f5: aggregate_count double
>         //                  f6: is_backfill: boolean
>         //              to the MAP STATE "backfillState"
>         //          fires an timer to occur at f3: event_timestamp:
> Instant + 15 minutes (at the end of a window)
>         //      WHEN the keyed process functions coalesced timers are
> handled it
>         //          calculates the current "event_time" to handle which is
> the timestamp - 15 minutes
>         //          iterates over each time series name in the "nameSet"
> for each "name":
>         //              IF MAP STATE "backfillState" contains a KEY of
> Tuple2: "name", "event_time" THEN
>         //                  collect the VALUE as an OUT result because it
> is not a back fill
>         //              ELSE
>         //                  the KEY of Tuple2: "name", "event_time"
> requires a back fill
>         //                  iterate over the MAP STATE "backfillState"
>         //                      filter the by "name" = Tuple2.f0
>         //                      filter by timestamp "event_time" >
> Tuple2.f1
>         //                      sort by key timestamp Tuple.f1 in
> ascending order
>         //                      collect into a List named "backfills"
>         //                  IF "backfills" is empty THEN there is no
> backfill
>         //                  ELSE
>         //                      the back fill is the last value in the list
>         //                      remove the other values in the list from
> MAP STATE "backfillState" as they are no longer needed
>         final DataStream<Tuple7<String, Integer, Double, Instant, Double,
> Integer, Boolean>> backfilledAggregateTimeSeriesStream =
>                 aggregateTimeSeriesStream.keyBy(1)
>                         .process(
>                                 new KeyedProcessFunction<>() {
>
>                                     private ValueState<Set<String>>
> namesState;
>
>                                     private MapState<Tuple2<String,
> Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer,
> Boolean>> backfillState;
>
>                                     @Override
>                                     public void open(Configuration
> parameters) {
>                                         MapStateDescriptor<Tuple2<String,
> Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer,
> Boolean>> backfillDescriptor =
>                                                 new MapStateDescriptor<>(
>                                                         "backfill-state",
>
> TypeInformation.of(new TypeHint<>() {}),
>
> TypeInformation.of(new TypeHint<>() {})
>                                                 );
>
>                                         backfillState =
> getRuntimeContext().getMapState(backfillDescriptor);
>
>                                         ValueStateDescriptor<Set<String>>
> namesDescriptor =
>                                                 new
> ValueStateDescriptor<>("names-value-state", TypeInformation.of(new
> TypeHint<>() {}));
>
>                                         namesState =
> getRuntimeContext().getState(namesDescriptor);
>                                     }
>
>                                     @Override
>                                     public void processElement(
>                                             Tuple7<String, Integer,
> Double, Instant, Double, Integer, Boolean> value,
>                                             Context ctx,
>                                             Collector<Tuple7<String,
> Integer, Double, Instant, Double, Integer, Boolean>> out
>                                     ) throws Exception {
>
>                                         if (namesState.value() == null) {
>                                             namesState.update(new
> TreeSet<>());
>                                         }
>
>                                         namesState.value().add(value.f0);
>
>                                         final Instant evenTime = value.f3;
>                                         final long timer =
> evenTime.toEpochMilli() + Time.minutes(15).toMilliseconds();
>
>                                         logger.info(
>                                                 "processElement with key:
> {}, value: {}.  registering timer: {}",
>                                                 ctx.getCurrentKey(),
>                                                 value,
>                                                 Instant.ofEpochMilli(timer)
>                                         );
>
> ctx.timerService().registerEventTimeTimer(timer);
>
>                                         final Tuple2<String, Instant>
> currentKey = new Tuple2<>(value.f0, value.f3);
>                                         backfillState.put(currentKey,
> value);
>                                     }
>
>                                     @Override
>                                     public void onTimer(
>                                             long timestamp,
>                                             OnTimerContext ctx,
>                                             Collector<Tuple7<String,
> Integer, Double, Instant, Double, Integer, Boolean>> out
>                                     ) throws Exception {
>
>                                         final Instant event_time =
> Instant.ofEpochMilli(timestamp).minus(15, ChronoUnit.MINUTES);
>
>                                         for (String name :
> namesState.value()) {
>                                             Tuple2<String, Instant> key =
> new Tuple2<>(name, event_time);
>                                             if
> (backfillState.contains(key)) {
>                                                 final Tuple7<String,
> Integer, Double, Instant, Double, Integer, Boolean> value =
> backfillState.get(key);
>                                                 logger.info(
>                                                         "onTimer with key:
> {} timestamp: {}, event_time: {}, has value: {}",
>
> ctx.getCurrentKey(),
>
> Instant.ofEpochMilli(timestamp),
>                                                         event_time,
>                                                         value
>                                                 );
>                                                 out.collect(value);
>                                             } else {
>                                                 final
> List<Map.Entry<Tuple2<String, Instant>, Tuple7<String, Integer, Double,
> Instant, Double, Integer, Boolean>>> backfills
>                                                         =
> StreamSupport.stream(backfillState.entries().spliterator(), false)
>                                                         .filter(entry ->
> name.equals(entry.getKey().f0))
>                                                         .filter(entry ->
> event_time.isAfter(entry.getKey().f1))
>
> .sorted(Comparator.comparing(entry -> entry.getKey().f1))
>
> .collect(Collectors.toList());
>
>                                                 if (!backfills.isEmpty()) {
>                                                     final Tuple7<String,
> Integer, Double, Instant, Double, Integer, Boolean> value =
> backfills.get(backfills.size() - 1).getValue();
>                                                     final Tuple7<String,
> Integer, Double, Instant, Double, Integer, Boolean> backfill = new Tuple7<>(
>                                                             value.f0,
> value.f1, value.f2, event_time, value.f4, value.f5, true
>                                                     );
>                                                     out.collect(backfill);
>
>                                                     for (int i = 0; i <
> backfills.size() - 1; i++) {
>
> backfillState.remove(backfills.get(i).getKey());
>                                                     }
>                                                     logger.info("onTimer
> with key: {} timestamp: {}, step: {}, has backfill: {}",
> ctx.getCurrentKey(), Instant.ofEpochMilli(timestamp), event_time, backfill);
>                                                 }
>                                             }
>                                         }
>                                         logger.info("*****************");
>                                     }
>                                 });
>
>         upsertToJDBC(jdbcUpsertTableSink,
> backfilledAggregateTimeSeriesStream);
>
>         env.execute("time series");
>     }
>
>     private JDBCUpsertTableSink buildJdbcUpsertTableSink() {
>         final JDBCUpsertTableSink jdbcUpsertTableSink =
> JDBCUpsertTableSink.builder()
>                 .setOptions(JDBCOptions.builder()
>                         .setDBUrl("jdbc:derby:memory:flink")
>                         .setTableName("time_series")
>                         .build())
>                 .setTableSchema(TableSchema.builder()
>                         .field("name", DataTypes.VARCHAR(50).notNull())
>                         .field("window_size", DataTypes.INT().notNull())
>                         .field("value", DataTypes.DOUBLE().notNull())
>                         .field("event_timestamp",
> DataTypes.TIMESTAMP().notNull())
>                         .field("aggregate_sum",
> DataTypes.DOUBLE().notNull())
>                         .field("aggregate_count",
> DataTypes.INT().notNull())
>                         .field("is_backfill",
> DataTypes.BOOLEAN().notNull())
>                         .primaryKey("name", "window_size",
> "event_timestamp")
>                         .build())
>                 .build();
>         jdbcUpsertTableSink.setKeyFields(new String[]{"name",
> "window_size", "event_timestamp"});
>         return jdbcUpsertTableSink;
>     }
>
>     private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink,
> DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer,
> Boolean>> timeSeriesStream) {
>         jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> {
>             final Row row = new Row(7);
>             row.setField(0, t.f0);
>             row.setField(1, t.f1);
>             row.setField(2, t.f2);
>             row.setField(3, Timestamp.from(t.f3));
>             row.setField(4, t.f4);
>             row.setField(5, t.f5);
>             row.setField(6, t.f6);
>             return new Tuple2<>(true, row);
>         }).returns(new TypeHint<Tuple2<Boolean, Row>>() {
>         })).name("upsert to JDBC");
>     }
>
>     public static void main(String[] args) throws Exception {
>
>         final String databaseURL = "jdbc:derby:memory:flink;create=true";
>         int exitCode;
>         try (final Connection con =
> DriverManager.getConnection(databaseURL)) {
>             try (final Statement stmt = con.createStatement();) {
>                 stmt.execute("CREATE TABLE time_series (\n" +
>                         "    id INTEGER NOT NULL GENERATED ALWAYS AS
> IDENTITY (START WITH 1, INCREMENT BY 1),\n" +
>                         "    name VARCHAR(50) NOT NULL,\n" +
>                         "    window_size INTEGER NOT NULL DEFAULT 1,\n" +
>                         "    event_timestamp TIMESTAMP NOT NULL DEFAULT
> CURRENT_TIMESTAMP,\n" +
>                         "    value DOUBLE PRECISION NOT NULL DEFAULT 0,\n"
> +
>                         "    aggregate_sum DOUBLE PRECISION NOT NULL
> DEFAULT 0,\n" +
>                         "    aggregate_count INTEGER NOT NULL DEFAULT
> 1,\n" +
>                         "    is_backfill BOOLEAN NOT NULL DEFAULT
> false,\n" +
>                         "    version INTEGER NOT NULL DEFAULT 1,\n" +
>                         "    create_time TIMESTAMP NOT NULL DEFAULT
> CURRENT_TIMESTAMP,\n" +
>                         "    modify_time TIMESTAMP NOT NULL DEFAULT
> CURRENT_TIMESTAMP,\n" +
>                         "    UNIQUE (name, window_size,
> event_timestamp)\n" +
>                         ")");
>             }
>
>             exitCode = new CommandLine(new
> TimeSeriesAverageApp()).execute(args);
>
>             try (final Statement stmt = con.createStatement()) {
>                 final ResultSet rs = stmt.executeQuery("SELECT id, name,
> window_size, event_timestamp, value, aggregate_sum, aggregate_count,
> is_backfill, version, create_time, modify_time FROM time_series ORDER BY
> window_size, event_timestamp, name");
>                 while (rs.next()) {
>                     final long id = rs.getLong(1);
>                     final String name = rs.getString(2);
>                     final int window_size = rs.getInt(3);
>                     final Timestamp event_timestamp = rs.getTimestamp(4);
>                     final double value = rs.getDouble(5);
>                     final double aggregate_sum = rs.getDouble(6);
>                     final int aggregate_count = rs.getInt(7);
>                     final boolean is_backfill = rs.getBoolean(8);
>                     final int version = rs.getInt(9);
>                     final Timestamp create_time = rs.getTimestamp(10);
>                     final Timestamp modify_time = rs.getTimestamp(11);
>                     logger.info(
>                             "id: {}, name: \"{}\", window_size: {},
> event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {},
> is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"",
>                             id, name, window_size, event_timestamp, value,
> aggregate_sum, aggregate_count, is_backfill, version, create_time,
> modify_time
>                     );
>                 }
>             }
>         }
>
>         System.exit(exitCode);
>     }
> }
>


-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: what is the "Flink" recommended way of assigning a backfill to an average on an event time keyed windowed stream?

Posted by Marco Villalobos <mv...@kineteque.com>.
I came up with a solution for backfills. However, at this moment, I am not happy with my solution.
I think there might be other facilities within Flink which allow me to implement a better more efficient or more scalable solution.

In another post, rmetzger@apache.org suggested that I use a process function and a timer. He was right in that I should use that approach. I want to thank him.

The averages are computed by a ProcessWindowFunction that keys by the name and window size and uses a tumbling event time window.

However, after that average is complete, I then use a KeyedProcessFunction that is keyed by window size. I then use a somewhat brute force approach with ValueState<Set<String>> to track names that need a value and a MAP STATE to determine which values exist and which ones are backfilled.
It also cleans up stale values.

I committed my code to a branch https://github.com/minmay/flink-patterns/tree/feature/backfill , and I also created a pull request https://github.com/minmay/flink-patterns/pull/1/files to share my experience.

I am open critical comments on my approach, lack of understanding of Flink, algorithms and data-structures used. Please refrain from comments on my code style though.

I'll also copy and paste my solution below.

package mvillalobos.flink.patterns.timeseries.average;

import com.google.common.collect.ImmutableList;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.io.jdbc.JDBCOptions;
import org.apache.flink.api.java.io.jdbc.JDBCUpsertTableSink;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine;

import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

@CommandLine.Command(name = "Time Series Average", mixinStandardHelpOptions = true,
        description = "Compute the average of the time series with a 15 minute tumbling event time window and upsert the results into an Apache Derby database.")
public class TimeSeriesAverageApp implements Callable<Integer> {

    private final static Logger logger = LoggerFactory.getLogger(TimeSeriesAverageApp.class);

    @CommandLine.Option(names = {"-f", "--input-file"}, description = "The CSV input file of time series data. Each line must be in the format: String, double, Instant.")
    private File inputFile;

    @Override
    public Integer call() throws Exception {
        stream(inputFile.toString());
        return 0;
    }

    public void stream(String inputFilePath) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);


        // GIVEN a SOURCE with a CSV input file
        // in which each line has a: String, double, Instant
        // THEN the MAP operator
        // transforms the line into a Tuple7
        // f0: name: String
        // f1: window_size: int
        // f2: value: double
        // f3: event_timestamp: Instant
        // f4: aggregate_sum: double
        // f5: aggregate_count double
        // f6: is_backfile: boolean
        // WHEN the map operation finishes
        // THEN the event time assigned using field f3
        final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream = env.readTextFile(inputFilePath)
                .map(line -> {
                    final String[] split = line.split(",");
                    final String name = split[0];
                    final double value = Double.parseDouble(split[1]);
                    final Instant timestamp = Instant.parse(split[2]);
                    return Tuple7.of(name, 1, value, timestamp, value, 1, false);
                }).returns(Types.TUPLE(Types.STRING, Types.INT, Types.DOUBLE, TypeInformation.of(Instant.class), Types.DOUBLE, Types.INT, Types.BOOLEAN))
                .name("time series stream")
                .assignTimestampsAndWatermarks(
                        new AscendingTimestampExtractor<>() {
                            @Override
                            public long extractAscendingTimestamp(Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element) {
                                return element.f3.toEpochMilli();
                            }
                        }
                );

        final JDBCUpsertTableSink jdbcUpsertTableSink = buildJdbcUpsertTableSink();

        upsertToJDBC(jdbcUpsertTableSink, timeSeriesStream);

        // GIVEN a data stream with Tuple7
        // f0: name: String
        // f1: window_size: int
        // f2: value: double
        // f3: event_timestamp: Instant
        // f4: aggregate_sum: double
        // f5: aggregate_count double
        // f6: is_backfill: boolean
        // THEN the stream is KEYED BY: f0: name:String, f1: window_size: int
        // THEN the stream is WINDOWED into a tumbling event time window of 15 minutes
        // THEN the window is configured to allow elements late by 1 hour
        // THEN a low-level process window function is applied to the window that
        //      aggregates the time series by assigning the following tuple fields:
        //      f1: window_size = 15 minutes in miliseconds
        //      f2: value = average value in this 15 minute window
        //      f3: event_timestamp = the first epoch millisecond in this 15 minute window
        //      f4: aggregate_sum = sum of f2 values in this 15 minute window
        //      f5: aggregate_count = number of values in this 15 minute window
        final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>>
                aggregateTimeSeriesStream = timeSeriesStream.keyBy(0, 1)
                .window(TumblingEventTimeWindows.of(Time.minutes(15)))
                .allowedLateness(Time.hours(1))
                .process(new ProcessWindowFunction<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>, Tuple, TimeWindow>() {
                    @Override
                    public void process(
                            Tuple tuple,
                            Context context,
                            Iterable<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> elements,
                            Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out
                    ) throws Exception {

                        final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> aggregation = new Tuple7<>();

                        boolean is_window_initialized = false;
                        for (Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> element : ImmutableList.copyOf(elements).reverse()) {

                            if (!is_window_initialized) {

                                final Instant timestamp = Instant.ofEpochMilli(context.window().getStart());

                                aggregation.f0 = element.f0;
                                aggregation.f1 = (int) Time.minutes(15).toMilliseconds();
                                aggregation.f2 = element.f2;
                                aggregation.f3 = timestamp;
                                aggregation.f4 = 0D;
                                aggregation.f5 = 0;
                                aggregation.f6 = false;
                                is_window_initialized = true;
                            }

                            aggregation.f4 += element.f2;
                            aggregation.f5++;
                            aggregation.f2 = aggregation.f4 / aggregation.f5;
                        }

                        logger.info("Added aggregation: {}", aggregation);
                        out.collect(aggregation);
                    }
                }).name("averaged keyed tumbling window event time stream");


        // GIVEN a data-stream of tuple7
        // f0: name: String
        // f1: window_size: int
        // f2: value: double
        // f3: event_timestamp: Instant
        // f4: aggregate_sum: double
        // f5: aggregate_count double
        // f6: is_backfill: boolean
        // THAT was aggregated to compute the average on f2: value: double
        // WITH a grouping of: f0: name:String, f1: window_size: int
        // WITH a tumbling event time window of 15 minutes
        // THEN the stream is KEYED BY: f1: window_size: int
        // THEN a low-level keyed process function is applied to the window that
        //      WHEN the keyed process function opens it
        //          initializes a VALUE STATE of TreeSet<String> called "nameSet"
        //          initializes a MAP STATE of
        //            KEY of Tuple2: f0: String, f1: Instant
        //            VALUE of Tuple7:
        //              f0: String
        //              f1: int
        //              f2: double
        //              f3: Instant
        //              f4: double
        //              f5: double
        //              f6: boolean
        //            called "backfillState"
        //      WHEN the keyed process function processes an element it
        //          adds each f0: name: String into the VALUE STATE "nameSet"
        //          adds each
        //              KEY of Tuple2: f0: name: String, f3: event_timestamp: Instant
        //              VALUE of Tuple7:
        //                  f0: name: String
        //                  f1: window_size: int
        //                  f2: value: double
        //                  f3: event_timestamp: Instant
        //                  f4: aggregate_sum: double
        //                  f5: aggregate_count double
        //                  f6: is_backfill: boolean
        //              to the MAP STATE "backfillState"
        //          fires an timer to occur at f3: event_timestamp: Instant + 15 minutes (at the end of a window)
        //      WHEN the keyed process functions coalesced timers are handled it
        //          calculates the current "event_time" to handle which is the timestamp - 15 minutes
        //          iterates over each time series name in the "nameSet" for each "name":
        //              IF MAP STATE "backfillState" contains a KEY of Tuple2: "name", "event_time" THEN
        //                  collect the VALUE as an OUT result because it is not a back fill
        //              ELSE
        //                  the KEY of Tuple2: "name", "event_time" requires a back fill
        //                  iterate over the MAP STATE "backfillState"
        //                      filter the by "name" = Tuple2.f0
        //                      filter by timestamp "event_time" > Tuple2.f1
        //                      sort by key timestamp Tuple.f1 in ascending order
        //                      collect into a List named "backfills"
        //                  IF "backfills" is empty THEN there is no backfill
        //                  ELSE
        //                      the back fill is the last value in the list
        //                      remove the other values in the list from MAP STATE "backfillState" as they are no longer needed
        final DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfilledAggregateTimeSeriesStream =
                aggregateTimeSeriesStream.keyBy(1)
                        .process(
                                new KeyedProcessFunction<>() {

                                    private ValueState<Set<String>> namesState;

                                    private MapState<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfillState;

                                    @Override
                                    public void open(Configuration parameters) {
                                        MapStateDescriptor<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> backfillDescriptor =
                                                new MapStateDescriptor<>(
                                                        "backfill-state",
                                                        TypeInformation.of(new TypeHint<>() {}),
                                                        TypeInformation.of(new TypeHint<>() {})
                                                );

                                        backfillState = getRuntimeContext().getMapState(backfillDescriptor);

                                        ValueStateDescriptor<Set<String>> namesDescriptor =
                                                new ValueStateDescriptor<>("names-value-state", TypeInformation.of(new TypeHint<>() {}));

                                        namesState = getRuntimeContext().getState(namesDescriptor);
                                    }

                                    @Override
                                    public void processElement(
                                            Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value,
                                            Context ctx,
                                            Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out
                                    ) throws Exception {

                                        if (namesState.value() == null) {
                                            namesState.update(new TreeSet<>());
                                        }

                                        namesState.value().add(value.f0);

                                        final Instant evenTime = value.f3;
                                        final long timer = evenTime.toEpochMilli() + Time.minutes(15).toMilliseconds();

                                        logger.info(
                                                "processElement with key: {}, value: {}.  registering timer: {}",
                                                ctx.getCurrentKey(),
                                                value,
                                                Instant.ofEpochMilli(timer)
                                        );
                                        ctx.timerService().registerEventTimeTimer(timer);

                                        final Tuple2<String, Instant> currentKey = new Tuple2<>(value.f0, value.f3);
                                        backfillState.put(currentKey, value);
                                    }

                                    @Override
                                    public void onTimer(
                                            long timestamp,
                                            OnTimerContext ctx,
                                            Collector<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> out
                                    ) throws Exception {

                                        final Instant event_time = Instant.ofEpochMilli(timestamp).minus(15, ChronoUnit.MINUTES);

                                        for (String name : namesState.value()) {
                                            Tuple2<String, Instant> key = new Tuple2<>(name, event_time);
                                            if (backfillState.contains(key)) {
                                                final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value = backfillState.get(key);
                                                logger.info(
                                                        "onTimer with key: {} timestamp: {}, event_time: {}, has value: {}",
                                                        ctx.getCurrentKey(),
                                                        Instant.ofEpochMilli(timestamp),
                                                        event_time,
                                                        value
                                                );
                                                out.collect(value);
                                            } else {
                                                final List<Map.Entry<Tuple2<String, Instant>, Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>>> backfills
                                                        = StreamSupport.stream(backfillState.entries().spliterator(), false)
                                                        .filter(entry -> name.equals(entry.getKey().f0))
                                                        .filter(entry -> event_time.isAfter(entry.getKey().f1))
                                                        .sorted(Comparator.comparing(entry -> entry.getKey().f1))
                                                        .collect(Collectors.toList());

                                                if (!backfills.isEmpty()) {
                                                    final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> value = backfills.get(backfills.size() - 1).getValue();
                                                    final Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean> backfill = new Tuple7<>(
                                                            value.f0, value.f1, value.f2, event_time, value.f4, value.f5, true
                                                    );
                                                    out.collect(backfill);

                                                    for (int i = 0; i < backfills.size() - 1; i++) {
                                                        backfillState.remove(backfills.get(i).getKey());
                                                    }
                                                    logger.info("onTimer with key: {} timestamp: {}, step: {}, has backfill: {}", ctx.getCurrentKey(), Instant.ofEpochMilli(timestamp), event_time, backfill);
                                                }
                                            }
                                        }
                                        logger.info("*****************");
                                    }
                                });

        upsertToJDBC(jdbcUpsertTableSink, backfilledAggregateTimeSeriesStream);

        env.execute("time series");
    }

    private JDBCUpsertTableSink buildJdbcUpsertTableSink() {
        final JDBCUpsertTableSink jdbcUpsertTableSink = JDBCUpsertTableSink.builder()
                .setOptions(JDBCOptions.builder()
                        .setDBUrl("jdbc:derby:memory:flink")
                        .setTableName("time_series")
                        .build())
                .setTableSchema(TableSchema.builder()
                        .field("name", DataTypes.VARCHAR(50).notNull())
                        .field("window_size", DataTypes.INT().notNull())
                        .field("value", DataTypes.DOUBLE().notNull())
                        .field("event_timestamp", DataTypes.TIMESTAMP().notNull())
                        .field("aggregate_sum", DataTypes.DOUBLE().notNull())
                        .field("aggregate_count", DataTypes.INT().notNull())
                        .field("is_backfill", DataTypes.BOOLEAN().notNull())
                        .primaryKey("name", "window_size", "event_timestamp")
                        .build())
                .build();
        jdbcUpsertTableSink.setKeyFields(new String[]{"name", "window_size", "event_timestamp"});
        return jdbcUpsertTableSink;
    }

    private void upsertToJDBC(JDBCUpsertTableSink jdbcUpsertTableSink, DataStream<Tuple7<String, Integer, Double, Instant, Double, Integer, Boolean>> timeSeriesStream) {
        jdbcUpsertTableSink.consumeDataStream(timeSeriesStream.map(t -> {
            final Row row = new Row(7);
            row.setField(0, t.f0);
            row.setField(1, t.f1);
            row.setField(2, t.f2);
            row.setField(3, Timestamp.from(t.f3));
            row.setField(4, t.f4);
            row.setField(5, t.f5);
            row.setField(6, t.f6);
            return new Tuple2<>(true, row);
        }).returns(new TypeHint<Tuple2<Boolean, Row>>() {
        })).name("upsert to JDBC");
    }

    public static void main(String[] args) throws Exception {

        final String databaseURL = "jdbc:derby:memory:flink;create=true";
        int exitCode;
        try (final Connection con = DriverManager.getConnection(databaseURL)) {
            try (final Statement stmt = con.createStatement();) {
                stmt.execute("CREATE TABLE time_series (\n" +
                        "    id INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),\n" +
                        "    name VARCHAR(50) NOT NULL,\n" +
                        "    window_size INTEGER NOT NULL DEFAULT 1,\n" +
                        "    event_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
                        "    value DOUBLE PRECISION NOT NULL DEFAULT 0,\n" +
                        "    aggregate_sum DOUBLE PRECISION NOT NULL DEFAULT 0,\n" +
                        "    aggregate_count INTEGER NOT NULL DEFAULT 1,\n" +
                        "    is_backfill BOOLEAN NOT NULL DEFAULT false,\n" +
                        "    version INTEGER NOT NULL DEFAULT 1,\n" +
                        "    create_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
                        "    modify_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,\n" +
                        "    UNIQUE (name, window_size, event_timestamp)\n" +
                        ")");
            }

            exitCode = new CommandLine(new TimeSeriesAverageApp()).execute(args);

            try (final Statement stmt = con.createStatement()) {
                final ResultSet rs = stmt.executeQuery("SELECT id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time FROM time_series ORDER BY window_size, event_timestamp, name");
                while (rs.next()) {
                    final long id = rs.getLong(1);
                    final String name = rs.getString(2);
                    final int window_size = rs.getInt(3);
                    final Timestamp event_timestamp = rs.getTimestamp(4);
                    final double value = rs.getDouble(5);
                    final double aggregate_sum = rs.getDouble(6);
                    final int aggregate_count = rs.getInt(7);
                    final boolean is_backfill = rs.getBoolean(8);
                    final int version = rs.getInt(9);
                    final Timestamp create_time = rs.getTimestamp(10);
                    final Timestamp modify_time = rs.getTimestamp(11);
                    logger.info(
                            "id: {}, name: \"{}\", window_size: {}, event_timestamp: \"{}\", value: {}, aggregate_sum: {}, aggregate_count: {}, is_backfill: {} version: {} create_time: \"{}\" modify_time: \"{}\"",
                            id, name, window_size, event_timestamp, value, aggregate_sum, aggregate_count, is_backfill, version, create_time, modify_time
                    );
                }
            }
        }

        System.exit(exitCode);
    }
}