You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2021/03/19 09:59:00 UTC

[jira] [Created] (FLINK-21871) Support watermark for Hive and Filesystem streaming source

Jark Wu created FLINK-21871:
-------------------------------

             Summary: Support watermark for Hive and Filesystem streaming source
                 Key: FLINK-21871
                 URL: https://issues.apache.org/jira/browse/FLINK-21871
             Project: Flink
          Issue Type: New Feature
          Components: Connectors / FileSystem, Connectors / Hive, Table SQL / API
            Reporter: Jark Wu
            Assignee: Jark Wu
             Fix For: 1.13.0


Hive and Filesystem already support streaming source. However, they doesn't support watermark on the source. That means users can't leverage the streaming source to perform the Flink powerful streaming analysis, e.g. window aggregate, interval join, and so on. 

In order to make more Hive users can leverage Flink to perform streaming analysis, and also cooperate with the new optimized window-TVF operations (FLIP-145), we need to support watermark for Hive and Filesystem. 

### How to emit watermark in Hive and Filesystem

Factual data in Hive are usually partitioned by date time, e.g. {{pt_day=2021-03-19, pt_hour=10}}. In this case, when the data of partition {{pt_day=2021-03-19, pt_hour=10}} are emitted, 
we should be able to know all the data before {{2021-03-19 11:00:00}} have been arrived, so we can emit a watermark value of {{2021-03-19 11:00:00}}. We call this partition watermark. 

The partition watermark is much better than record watermark (extract watermark from record, e.g. {{ts - INTERVAL '1' MINUTE}}). 
Because in above example, if we are using partition watermark, the window of [10:00, 11:00) will be triggered when pt_hour=10 is finished.
However, if we are using record watermark, the window of [10:00, 11:00) will be triggered when pt_hour=11 is arrived, that will make the pipeline have one more partition dely. 

Therefore, we firstly focus on support partition watermark for Hive and Filesystem.

### Example

In order to support such watermarks, we propose using the following DDL to define a Hive table with watermark defined:

{code:sql}
-- using hive dialect
CREATE TABLE hive_table (
  x int, 
  y string,
  z int,
  rowtime timestamp,
  WATERMARK FOR rowtime AS SOURCE_WATERMARK
) PARTITIONED BY (pt_day string, pt_hour string) 
TBLPROPERTIES (
  'streaming-source.enable'='true',
  'streaming-source.monitor-interval'='1s',
  'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
  'streaming-source.partition-interval'='1h'
);

-- window aggregate on the hive table
SELECT window_start, window_end, COUNT(*), MAX(y), SUM(z)
FROM TABLE(
   TUMBLE(TABLE hive_table, DESCRIPTOR(ts), INTERVAL '1' HOUR))
GROUP BY window_start, window_end;
{code}

For filesystem connector, the DDL can be:

{code:sql}
CREATE TABLE fs_table (
    x int,
    y string,
    z int,
    ts TIMESTAMP(3),
    pt_day string,
    pt_hour string,
    WATERMARK FOR ts AS SOURCE_WATERMARK
) PARTITIONED BY (pt_day, pt_hour)
  WITH (
    'connector' = 'filesystem',
    'path' = '/path/to/file',
    'format' = 'parquet',

    'streaming-source.enable'='true',
    'streaming-source.monitor-interval'='1s',
    'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
    'streaming-source.partition-interval'='1h'
);
{code}

I will explain the new function/configuration. 


### SOURCE_WATERMARK built-in function

FLIP-66[1] proposed {{SYSTEM_WATERMARK}} function for watermarks preserved in underlying source system. 
However, the SYSTEM prefix sounds like a Flink system generated value, but actually, this is a SOURCE system generated value. 
So I propose to use {{SOURCE_WATERMARK}} intead, this also keeps the concept align with the API of {{org.apache.flink.table.descriptors.Rowtime#watermarksFromSource}}.


### Table Options for Watermark

- {{partition.time-extractor.timestamp-pattern}}: this option already exists. This is used to extract/convert partition value to a timestamp value.
- {{streaming-source.partition-interval}}: this is a new option. It indicates the minimal time interval of the partitions. It's used to calculate the correct watermark when a partition is finished. The watermark = partition-timestamp + time-inteval.

### How to support watermark for existing Hive tables

We all know that we can't create a new table for an existing Hive table. So we should support altering existing Hive table to add the watermark inforamtion. 
This can be supported by the new ALTER TABLE syntax proposed in FLINK-21634. Because watermark, computed column, table options are all encoded in Hive table parameters, 
so other systems (e.g. Hive MR, Spark) can still read this Hive table as usual. 

{code:sql}
ALTER TABLE hive_table ADD (
  WATERMARK FOR ts AS SOURCE_WATERMARK
);

ALTER TABLE hive_table SET (
  'streaming-source.enable'='true',
  'streaming-source.monitor-interval'='1s',
  'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',
  'streaming-source.partition-watermark.advance'='1h'
);
{code}

### Implementation Details

1. SplitEnumerator: monitors new partitions throught {{PartitionMonitor}}, sorts partitions by partition name, adds new splits of new partitions to {{SplitAssigner}}, and tags the last split of each partition.
2. SourceReader: request split to SplitEnumerator, when setup or read out a split.
3. SplitEnumerator: get split from SplitAssigner, assigned it to the requested reader. If the split is the last one of the partition, then broadcast a watermark event to all the readers.
4. SourceReader receive split: start to read data of the assigned split
5. SourceReader recieve watermark: If there is assigned splits, output received watermark when splits are read out. If no assigned splits, output received watermark right now. 

Note: the SplitAssigner should assign splits in FIFO order. 

The above implementation doesn't require new interface or new method of FLIP-27 source. All can be implemented in Hive/Filesystem connector module. 


[1]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-66%3A+Support+Time+Attribute+in+SQL+DDL



--
This message was sent by Atlassian Jira
(v8.3.4#803005)